大数据平台架构方法论、模型与实践深度观察

近日笔者对大数据平台架构做了广泛的调研,目的在于对当前的主流大数据架构的方法论、模型与实践有一个全面的观察,以便为电商、出行、制造业、金融、媒体、智慧城市、物流、车联网等各行业朋友在理论与实践上提供一点参考,尤其以实时流式计算为核心。

1 方法论

目前大数据架构主流的方法论有三种:

  • 2014 年 1 月份 Tweeter 的 Nathan Martz 提出的 Lambda
  • 2014 年 7 月份 Linkedin 的 Jay Kreps 提出的 Kappa
  • 2015 年 Google 的 Tyler Akidau 等提出的 Dataflow Model

这里不转述它们的内容,或做什么比较,网上很容易找到相关内容,个人建议最好看链接指向的原文。这里想特别指出的是,截止本文的写作,Dataflow 是最流行的模型,它不仅激励了 Apache Flink 的发展,而且 Dataflow 模型的作者 Tyler Akidau 等也是 Apache Beam 的 Founder 与 Commiter。Flink 和 Beam 都实现了 Dataflow 模型的很多概念,它可以广泛地应用于数据处理领域,包括 ETL、数据处理的服务等,不限于批流统一。

总的来说,大数据架构的整个发展历程可以简单地这样总结:首先,Lambda 批流并举、既独立又互补;然后,Kappa 倡导统一为流;现在,Dataflow 统一流、批、微批,并建立概念模型。不过 Kappa 只是提出了想法,而 Dataflow 将模型做了细化,包含了一些重要的实践探索与概念(Operator/Window/Trigger/Accumulate/Retract/Watermark/Session),并做了具体展开。

大数据架构发展到今天,批流矛盾早在前几年就显现了,各大互联网公司也早就遇到了这一矛盾。Google 提出 Dataflow 并不是空穴来风,它也有这种矛盾,分立的流计算 MillWheel 和批计算 FlumeJava,其实在发布 Dataflow 模型之前,Google 内部已然碰到了二者的矛盾问题及部分的解决。

Beam 最早在 2014 年作为 Google Cloud DataFlow SDK 发布,在 2016 年捐给了 ASF。Flink 和 Beam 都得到了三个有关论文的启发( MapReduce,MillWheel,DataFlow )。但是 Beam 并不是独立的 Runtime,它是通过 Runner 来与各种执行引擎集成的,包括:Apache Flink、Apache Spark、Apache Samza、Hazelcast Jet、Google Cloud Dataflow 等。Beam 支持各种语言,如:Java、Python、Go、SQL,而且还支持它们的 ML 库,如:Numpy、Pandas、Tensorflow。另外,Beam 以统一的 API 来处理流与批,而 Flink 是不同的 API(DataSet 和 DataStream)。有些大公司为平台支持语言少或者与机器学习框架集成问题而烦恼,也许 Beam 是不错的探索选项,Flink 官方鼓励 Beam+Flink 模式。

目前,虽然 ASF 的 Flink、Samza、Beam 都拥抱批流统一的理念,但其中,Flink 实现得最为充分、社区也最为繁荣。

2 Ingestion

2.1 模型及原则

总要有数据才涉及到对数据的处理,所以从数据的源发地开始看看数据的摄入情况,其基本的概念模型如下:

对 Figure 1 中模型的要求:

Ingestion 的原则:

  • 高可用、高吞吐量;
  • 限流或 Backlog 存储,也许 Ingestion 层本身资源不足、也许下游处理不过来;
  • Connector,可自行扩展,支持多种协议与技术;
  • 有丰富的 Built-in Connector,在 Ingestion 环节尤其看中流存储;
  • 容量可自动伸缩,Scalability;
  • 支持主流集群管理,如:Mesos/Yarn/Kubernetes;
  • 轻量的从 Source 到 Sink 的计算,完全可配置,计算不需要 Shuffle;
  • 不丢数据,Loss Tolerant;
  • 保证送达,Guaranteed Delivery;
  • 有全面的监控、告警;
  • Auto failover;

Ingestion 管理的原则:

  • 强大的 UI;
  • 动态配置;
  • 策略配置,包括资源调度;
  • 安全管控的配置;
  • 发布的版本控制与审计;
  • 集成元数据管理;
  • 身份认证、授权、多租户;

2.2 技术推荐

基于上述模型与要求,以下按数据的源头、实际上也是对应的 Ingestion 方式,分别进行分析与技术推荐。

Web page/Mobile App/MES/IoT/System Built-in Producer,都是从公网或内网发过来的情况,是被动接收。对于这类,可能很多公司都自研了,个人倒是建议采用 Apache NiFi ,虽然国内有 DataX,但是 NiFi 的能力要丰富得多。

简单来讲,可以把它想象为一个桥梁,在企业级的全景图中,如果想让数据从 A 流到 B,并在中间做些转换可以考虑,它是系统间的自动化的数据加工管道,不需要编程,可以在可视化的环境中拖拽完成。它不仅可以连接各种主流大数据框架,而且支持基于各种协议的侦听、读、写,甚至 IoT 的 MQTT;它也支持对各种文件格式的处理,如:Parquet、CSV、JSON、Avro、ORC 等,还支持各种脚本语言。

这里之所以推荐 NiFi,是因为它已经相当成熟,有了每天处理几十亿数据的案例,虽然它也有算子构成的 DAG,但是更侧重两端。多年以来它在流管理、柔性伸缩、安全、易用性、监控等方面都在不断发展,它最突出的一点是声称 Loss Tolerant、Guaranteed Delivery,通过 WAL 和可插拔的内容库。NiFi 实在是多面手,如果要在企业系统全景图中的构件之间架设一个轻量流式处理的桥梁,值得考虑。它可以承担在公网上大吞吐量地接收消息的角色。虽然单纯从功能上看,自研并不难,但是要综合考虑到管理、安全、运维、监控、数据不丢失、自动故障转移、多租户等各个方面,需要做的工作还是很多的,所以作为企业级大图景在云中的最外层构件,还是推荐这种已经有相当积累的框架。有些公司不仅有侵入式的埋点,还有非侵入式的 Javaagent、APM、Pinpoint 等,如果要直接通过网络发送数据,也不妨尝试用 NiFi 来接收,如果落为文件也可以用,下面会提到。

Database,首先可以是定期增量或全量批拉的方式,这里主要讲实时的 CDC 方式,数据是基于 CUD 事件被推过来的。如果是 MySQL,可以采用 Canal 或者用 MySQL-Binlog-Connector-Java 自己写。前者有个不好的地方在于,它是分 Server/Client 的,用后者自己写可以把这个逻辑写成一个定制的 NiFi Processor,然后也许需要对代码逻辑做一些中间处理或者用 NiFi 界面构造 DAG,最后流入 Kafka,也可以同时流入其它存储。这种方式的实时性非常高,可立即对 MySQL 中的 CUD 事件做出反应,两种方式都需要独特的数据库权限。

如果是 Oracle,也要求实时性,那么 Oracle GoldenGate for Big Data 是一个选项,可以把 Oracle 中的交易变化实时地反映到各种大数据存储中,包括 Kafka。如果是 PostgreSQL,也有 CDC,不过 Oracle 和 PostgreSQL 都没有内置支持。

如果 HBase 在一线业务应用中直接作为存储,可以考虑写一个基于 HBaseEndpoint 的 CDC(Change Data Capture)。如果是 Cassandra 作为一线业务存储,官方有 CDC 支持。业务数据库的 CDC 环节一定要加上监控、告警的机制,实际上这两种机制要在整个企业 IT 大图景中所有的地方都加上,才能随时感知应该知道的系统的健康状况。对于 RDBMS,如果可以接受批计算,当然传统的 Sqoop 是一个选项。

System log,可以用 Apache NiFi 中的 TailFile(支持 Rolling)这个 Processor,中间加上自己写或拖拽处需要的处理逻辑,最后吐到 Kafka 等存储中。或者,使用经典的 Flume 也是没毛病的,但是简单就是美,都用 NiFi 岂不是更好。

这么看来,其实用 NiFi 一个框架就可以解决所有数据的问题了,而且,NiFi 具备综合的管理能力,如:跟踪、监控、拖拽开发、多租户、HA 方案等。可以把 NiFi 看成是万能的 Integrator,但不要把复杂的计算逻辑让它来担当,毕竟那不是它的强项。有趣的一点是,它还有 Backpressure 的能力,这意味着在整个企业大数据架构的最外延、数据的入口处可以有一个控流阀来控制流速,也许在某种场景是个不错的强项。

3 元数据管理

元数据概念起源于 20 世纪下半叶,广为传知是在 RDBMS 时代。而到了当今这个数据时代,应该用更广袤的视野来看待它,包括:

  • 传统的 RDBMS 的元信息,库、表、字段、关系、UDF、触发器、存储过程等;
  • 关于计算体的元信息,DC、集群、产品、应用、系统、作业、服务、UDF、算子;
  • 关于人的元信息,地域、组织架构、人员、流程;
  • 关于网络的元信息,如:跨交换机是有距离的,延迟是不一样的,这部分也应当为计算所知;
  • 关于业务的元信息,各种业务术语、过程、领域、BU、渠道等;

从价值的维度看,目前数据已经被提升到资产的高度。既然数据是资产,对它的管理就应当向财务方向看起。现代的元数据管理是构建完善高效的企业 IT 管理、治理体系的基石。在企业范围内,可以这样比喻,如果把以上所有大数据体系构成要素统一抽象为大数据王国的公民,如果没有元数据管理,就相当于这些公民之间没有共同的语言,自然就会有歧义、有纷争、有困惑、有乱象、有翻译、有成本。其中人、组织的要素也非常重要,例如:资源的交接、权限、计费、计量、资源回收、预算、流程等都涉及。企业 Digital Twin 的运行也离不开元数据管理。车同轨、书同文、行同伦,天下大治期矣。

本文之所以这么早进入元数据章节,是认为它发挥作用应该越早越好,所以就放在了 Ingestion 之后,其实在 Ingestion 阶段,就应当做些必要的元数据方面的工作了。在数据进入整个企业大数据体系边界,就应当把该做的事情做了,不论是否为结构化数据,至少要先登记造册,哪怕是扔到数据湖里,如:

  • 什么时候到的;
  • 属于哪个业务领域、哪个部门的数据;
  • 来自哪个系统、哪个应用;
  • 基本度量;
  • Schema;
  • Format;

元数据管理涉及到数据治理、Lineage、监控、作业、业务信息、访问控制、审计、合规、可视化、数据标准、数据质量、版本控制、计算作业、发布管理、计量计费、告警等方方面面。它关系到企业如何有序地、高效地、稳定地、弹性地、智能地运行。虽然 Haddop 生态基本上用 HCatalog 来存储元数据,但仅仅存储是不够的,对于整个企业而言是非常局限的。一个字段名及其语义标签在一个企业中存在多种表达,而内容本质上却是一个,这不是什么稀奇的事。

在企业数据的大图景中,远不止纯粹数据层面的元数据信息需要整合管理,还包括各种其它对象,如 UDF(Kylin、Hive、Flink、Spark、Storm、Phoenix 等)、作业(Flink、Storm、Spark 等)、部门、人员、数据中心、集群、业务要素等,这样才能更充分、高效、系统、完整地保障企业的运行效率。对于作业,更理想的开发和使用方式应当是配置的、声明式的,而剩下的事情交由平台完成,这样才能使用户以更加一致的、统一的、低成本的、可追溯的方式来执行所需的计算任务,而这离不开针对作业的元数据管理。因此,有一个独立的、统一的元数据管理平台是上策。

3.1 技术推荐

Apache Altas 也许是一个不错的起步选项,它是针对 Hadoop 生态的元数据管理工具,以基于 HBase 的 GranusGraph 作为存储,Solr 用作搜索,用 Hook 感知 Hive/Kafka/HBase/Storm/Sqoop 的元数据的变化,遗憾的是没有 Flink 和 Pulsar,不过可以定制。Altas 支持 HA。

Atlas 用 Type/Entity 模型来组织所有的元数据对象,它们的关系相当于 OOP 中对应的 Class/Instance。Type 可以分为多个 Metatype:Metatype/Enum/Collection(Array,Map)/Composite(Entity, Struct, Classification, Relationship),Composite 可以有多 Attribute,而 Attribute 可以指向 Metatype 从而建立丰富的关系,有趣的是 Entity 和 Classification 是可以继承关系的,真正存放元数据信息的叫 Entity,例如:一张 Hive 表。Atlas 可以从 Kafka 收到关于 Entity 和 Classification 上的 CUD 事件。

Atlas 还提供了 REST API 来从 Atalas 查询和搜索 Entity、Lineage、关系、Type 信息,也可以对元数据做 CUD 操作,某种场合下这在各种流计算的算子中是非常有用的。Atlas 用 JanusGraph 管理丰富的元数据对象之间的关系。

业务元数据这样组织的,核心叫 Term,一个 Term 可以属于多个 Category,而 Category 是 Hierarchical 的,一个 Term 还可以关联多个 Entity,一个 Term 还可以有多个 Classification,Term 之间还可以建立丰富的关系,如:反义词、同义词等。Glossary 是颗粒度最大的,它下面有 Term 和 Category。

安全方面,支持双向 SSL、SPNEGO-based HTTP(Simple/Kerberos)、JAAS(Kafka 需要);Authentication 支持 LDAP、Kerberos 等几种方法;Authorization,可以针对在 Metatype 上的 CUD 操作、Entity 上的 CRUD 操作,可以插件方式定制自己的授权组件,除了默认的 Simple Authorizer,还可以用 Apache Ranger 来作为 Authorizer,Ranger 是为 Hadoop 生态提供的综合的数据安全管理框架,突出裨益在于集中管理、审计用户的访问行为和授权管理行为,其模型为 Role/Attribute Based Access Control,支持各种策略。

4 流存储

4.1 模型及关注点

Dataflow 模型需要一个相应的存储系统来支撑,流存储的基本概念模型如下:

流存储的核心能力是分布式的、流式的高速读写,这是流计算在存储上的必然要求。其它特征要求就不费周章了,也基本上名释其义,粗略理解还是可以的,可以结合自身的生产实践来从架构上作参考。

从实践的角度看,不管具体采用何种技术甚至自研,应当注重以下方面的投入:

容量的伸缩(Scalability)

好多大公司都在这方面遇到过挑战。数据如何做到整体分布尽可能均衡,一个 topic 还罢了,很多个怎么办?最好有一个全局范围的、不影响读写的、自动的 Rebalance。不论方式如何,有一个原则不能变,一个存储文件中不应当存在 1 个以上的 Topic,否则顺序读写就不成立了,而这正是流存储必须坚持的,这和 HBase 不一样。那么在此原则下,随着时间的流式,存在 Data Skew 是必然的,因此,Rebalance 也是必然的。流存储本身最初也不可能预知每个 Topic 将如何增长数据,当然加上预测能力自动 Rebalance 是个好主意。

不仅数据会 Skew,内存的使用也可能有 Skew 的情况,这也是节点宕机、或出现其它问题的原因之一,同样需要在不影响读写的情况下自动 Rebalance,而且不影响副本数量。

监控与告警

这是所有系统必须做到的,流存储作为 Dataflow 模型计算体系的必要组成部分,是数据的大 Buffer、大 Hub,它有了问题,影响可想而知。包括 CPU、Disk、Memory、Network、Producer、Consumer 等都要监控,一旦发生情况,就应当直接通过告警机制连通 On-call Shift 及时处置或者 Auto Heal。不论采用什么样的流存储技术,都严重依赖内存大小、磁盘吞吐率、网络延迟、文件系统调优,因为流存储的重点就在于分布式环境下流式的高速读写,所以必须把这些方面的监控做好。

磁盘用量的管控

要综合根据所有的 Retention(Time/Size Based)、复制的情况对总体磁盘用量有个预判,现有的磁盘容量在将来是否够用,要在使用的过程中动态判断,所以辅助一些 ML 手段去预测还是非常好的。如果用 Kafka,在计算中,要记得它是以 Segment 为单位进行删除,在 Retention 到限以后。

监控 **** 消费滞后

如果消费速度小于生产者的写入速度,意味着终端用户看到的是不新鲜的数据、不及时的数据,即:新鲜度不好、不及时。其实监控读、写滞后,都有意义,读 Lag 指读的 Offset 所对应的时间点与当时写的 Offset 对应的时间点之间的时间差,写滞后指与上一次写动作之间的时间差,它们反应的问题是不同的。也许技术上没问题,但是业务上一定会对数据的新鲜度有要求。

4.2 技术推荐

当前,大家首选 Kafka 的可能性比较大,它凭借对 Pagecache 的顺序读写和 Zero-Copy 完成高速读写,这非常棒,从模型角度看高速读写也应当是以 Dataflow 模型为核心的批流统一架构体系中的流存储所必须的能力。这里就不赘述它优秀特征,只对大家在实践中可能遇到的问题做个提示:

  • Topic 达到几千个可能就会有问题,主要是由于 file handler 的限制;
  • 租户隔离,当一个节点上的一个分区在一个消费者或一个生产者出现滞后的现象,那这个节点上所有其它分区上的动作都会滞后;
  • 缺乏针对多租户的计费计量能力,在生产者和消费者上;
  • 队列能力支持不够,如:Delay Queue;
  • 监控方面做得不大好,没有完整的监控工具;
  • 有时写生产者和消费者时,可能希望同时使用多个 Topic,如果对 Topic 有通配符能力可能更灵活;
  • API 是由不同的个人和公司维护的,与整体发展不同步;
  • Likedin 使用 Kafka 非常重,达到 7 Trillion Per Day,4000 Brokers。为此,Linkedin 内部做了补丁分支(不是 Fork),为了满足增减 Broker 的运维工作和特有的特征,已开放到 Github。Scalability 方面的主要问题是,内存造成的 Controller Failure 和 Slow,解决的方法是重用 UpdateMetadataRequest。还有一个问题是启动或关闭一个 Broker(一次只能操作一个)比较慢,改进方法是减少锁竞争。由于运维的需要,增减 Broker 会很频繁,Linkedin 给 Broker 增加了维护模式,以免有消息不断流入,这样才能安全地移除,最终保障副本的数量。
  • 沃尔玛也碰到了恢复失败节点而造成不好的影响,缺乏再分区的能力,如何跟踪 Rebalance 的进度,并确保期间生产与消费都正常还是满重要的;
  • 缺乏 Cloud/Rack/Azure Awareness;

Kafka 是大部分人首先想到的,毕竟早 5 年起家,在社区的丰富性和规模上都有优势。而作为后起之秀的 Pulsar 的社区在 Slack 上很活跃、在壮大、响应快,这里把 Apache Pulsar 的特点介绍一下,在生产实践中可能不失为一个可以考虑的选项(当然,任何技术在生产实践中都可能碰到问题):

  • 经历过每天处理上千亿的数据、上百万的 Topic 的生产考验,Battle-Tested;
  • 流与队列的合体,单一的 API,而 Kafka 侧重流;
  • 可以通过 Producer 或者 Broker 直接去重;
  • 分区不是必须的,不必像 Kafka 一样要考虑分区与 Consumer 的数量。它在 Topic 与 Consumer 之间加入了 Subscription(Exclusive/Failover/Shared/Key_shared)抽象,甚至于一个 Subscription 可以对应多个 Topic。分区时吞吐量可以非常大。
  • 资料表明,Kafka 加一个 Broker,再平衡分区会比较慢,而 Pulsar 把 Segments 分布式地用 Bookeeper 存储缓解了这个问题。这道理是讲得通的,因为在 kafka 中数据的存储逻辑细分下来依次是这样的一对多的关系:Topic/Broker/Partition/Segement/ 三个文件,因此,在 Kafka 中一个 Partition 的所有 Segement 都在一个 Broker 上,而在 Pulsar 中是分散到 Bookeeper;Pulsar 的负载均衡是自动的,它检测完全无状态的 Broker 的 Cpu/Mem/Network IO,自动执行 Rebalance;
  • 支持 Geo-replication,即:跨机房复制;
  • 阿里、滴滴等大公司支持的行业分析公司的基准测试表明,Pulsar 比 Kafka 在吞吐量和延迟方面都有显著的优势;
  • 支持多租户,每个租户可以有多个 Namespace,在加上 Quota、访问控制、Rate-limit,可以实现丰富的权限管理。通过 Namespace 可以实现集群之间的复制;
  • 以 Quorum 的方式副本复制,性能表现更为稳定;
  • 分层存储,Tiered Storage,允许把老的积压数据卸载到长期的、经济的存储中,如:HDFS。例如:用户行为数据需要长期保存,以便机器学习训练用于训练模型;
  • 生产者到消费者的消息加密,保障绝对的安全,Pulsar 自己不保管 Key;
  • 一个集群可以服务上百万的 Topic,但是 Kafka 达到几千个就出状况了;
  • 现成的 CDC Connector for MySQL/PostgreSQL,还有好多内置的 Source/Sink Connector;
  • 支持消息的 TTL;

5 流计算及结果的应用

5.1 主流技术参考与架构

5.2 Flink 及其生产实践

目前的流计算实现,Flink 几乎是不二之选,好多公司在做从 Sorm 和 Spark 到 Flink 的迁移,而且它符合 Dataflow 模型,很多大企业都有成功的运用,社区发展得如火如荼,改进也很迅速。虽然说 Beam 出自 Google 本家,但它的定位不一样,可以认为它是 Spark/Flink/Samza 等流计算引擎之上的一层东西,它为定义和执行数据处理流程提供了不同语言的 SDK,为不同的引擎提供了不同的 Runner。这里不介绍 Flink 特征,官网最好了,仅对生产实践可能有帮助的内容做一些分享:

  • Latency 与 Guaranteed Delivery 之间的权衡,对于 Mission-Critical 的作业,保存状态很重要,以备某种状况下的接续恢复执行、甚至倒带。然而,对于一个算子,如果采用 Exactly Once,只要它的算子存在多进或者多出的情况,就要做 Barrier Alignment,即:要等待第 n 个 Checkpoint 的数据都到达,否则不会处理已经到达的第 n+1 个 Checkpoint 的数据,这当然就增加 Latency;如果采用 At Least Once,则不发生 Barrier Alignment。所以要根据具体业务情况决策,另一个途径是降低 Parallelism,如果可以的话。
  • 存储状态造成的 Latency,三种 State Backend 默认都是以异步方式保存状态而不阻塞流的处理,但是 Heap+RocksDB 的组合方式是不支持 Timer State 的;
  • 资源隔离,Yarn+Cgroup 隔离 CPU 和 Memory;用 Yarn Node Label 实现机器、容器颗粒度的资源保障优先级和专用保障;
  • 使用 Asynchronous I/O 调用 dubbo 接口,Zuul、DB、HBase 等外部接口;
  • Blink Planner 有更好的性能表现;
  • App1(由多个作业构成)和 App2 可以共享中间结果,结果共享可以很大程度上节约资源;
  • 如果对一个流要做事件处理,但是可能有好多规则、不仅规则本身会变化、而且规则也会增减,配置驱动是最好的方式;
  • 宜早不宜迟
    • Java 对象的空间代价差不多是翻倍的,相对于 Primitive Type 而言。Flink 在网络计算、Checkpioint 的时候是要涉及 Serde(序列化、反序列化)的,所以在 Serde 上发力肯定会在内存优化、速度上大有收益的,例如:自己造一个时空高效的、完全基于 Primitive 的数据结构,也是完全可行的;在什么阶段实施这个要看情况了,因为个性的 Serde 可能意味着与周边的第三方系统不兼容,但时空收益那是肯定的;
    • Ingestion 阶段,可以把一个消息按不同的分区方式(即:用不同的 Key 值打散)产生多个流输入流存储中,这样可以以空间换时间,因为它可以让流计算阶段省却网络 Shuffle。
    • Ingestion 阶段,第一时间维护维表,Redis/HBase/es/Aerospike,如果采用外在存储方式保存维表的话;
    • Ingestion 阶段,第一时间把 ID 变成数值型,可以用 bitmap,在有些问题上很有用;
    • Pushdown,不论是使用 API 还是 SQL,都要尽可能把聚合、过滤等计算先在小数据集上完成、然后再 Join,而不是在 Join 后的更大的数据集上进行。能在一个 Task 中先做到的就在一个 Task 里先做尽、能在一个 Taskmanager 中先做到的就在一个 Taskmanager 里先做尽、能在一个 Table 中先做到的就在一个 Table 里先做尽、最后才是 Join 等需要网络 Shuffle 的动作,这是高效计算的次第;
    • Ingestion 阶段,分而治之,在使用上完全不相干的数据就不应当发到同一个 topic;
  • 数据仓库
    • 实时数仓中,需要用到维表,方法估计至少 5 种,其中以 Temporal Table Function 方式最佳,数据量大、实时性强;
    • 离线数仓,数据表明基于 Flink 的效率至少比 HiveOnTez 要高;
  • 流存储的分区数是 Flink Parallelism 的整数倍为好,以防数据 Skew;
  • 常见问题
    • 数据 Skew、频繁 GC、对周边慢读慢些、大窗口、Serde(Shuffle and State Backend);
    • 注意设置 State TTL,以免长时间跨度的计算需要数据的时候数据已经被 clean 了;
    • 监控不足,作业级、算子级、Backpressure、Executionstate、Jobstatus、ZK、上下游;
  • 作业发布
    • 检查与在运行的老版本的 state 的兼容性问题,回滚能力,选择 Sheckpoint、批量升级、审批流程;动态修改作业的执行逻辑是有风险的,如:SQL、代码,可能需要增强 State 的维护逻辑(如:状态比对)、甚至使用其它存储;State Processor API 挺有用的,可以读写分析 State,所以对 State 维护很有帮助;
    • 作业可以是基于 API 写的,但是更推荐可配置 SQL 的方式、并结合版本控制。一种是把 SQL 翻译成 DataSet API 或者 DataStream API;一种是把自定义的某种声明式的配置翻译成 API 的执行;还可以通过把 SQL 提交给 AthenaX 来提交作业;要么做自己的 SQL 执行机制,Query Parser/Optimizer/Planner,然后改造运行时,跳过 2 个 API,不过这就要改 Flink 了;
    • 还有一种情况是,在不重新启动作业的情况下,动态更新 SQL 及其它配置也是可行的,结合 ZK 的 Watch Push 机制;
    • 总之,SQL 具有可维护性好、接受度高、维护成本低、可动态修改、规范、可配置等优点,一直是不错的 User API,尽可能与具体实现技术解耦、保持 Portability,是个不错的方向,也更容易为没有专业技术背景的 Citizen Developer 服务,而不是让终端用户学习各种技术;
  • 如果想在流上应用 ML 模型,先把计算结果流入流存储(如:Kafka),然后加一层消费者作为模型应用层更为合适,由其是在应用多个模型在同一个流上的情况,这样可以把特征准备与模型运用解耦,互相不干扰、独立 Scale,而不和流计算混合在同一个进程中;
  • 复杂事件的处理
    • CEP,SQL 的 match_recognize(ISO 标准 SQL 的一部分)或者 DataStream 上的 API,它的强大在于通过 Pattern 来识别是否一个事件发生了,例如:统计没有点击的广告展现(涉及展现和点击两个事件,但是在规定时间内点击事件没有到达),就可以用这个。Flink-siddhi 是个值得参考的实现;
    • Stateful Function framework,融合了 Flink 与 FaaS(例如:AWS Lambda 和微软的 Virtual Stateful Actor Model),这是个复杂的框架,如果描述概念恐怕不好理解,这里举个例子:假设任务是这样的,从 Kafka 输入的消息为 Tuple(user_id=111,“张三”),作为结果向 Kafka 输出的消息为 Tuple(user_id=111,“欢迎张三的第 N 次访问”),这里显然要维护每个人的访问次数,该框架的做法是,当一个 Taskmanager 收到一个用户的消息的时候,可以把这个消息按用户路由到维护着该用户状态(访问次数)的函数去处理,而这个函数可能位于当前的 Taskmanager、也可能位于集群的另一个 Taskmanager,就是这样的。再扩展一下,在网约车的场景里,司机与乘客发生两个消息流进入系统,通过司机、乘客、Geo Index、Ride 等有状态函数之间的交互最后完成乘客的 Book 和司机的 Bid。这个很像电信领域的霸主 Erlang 的 Actor Model,其实 Flink 本身的协调也用到了 Actor Model(即:Akka)。总结,这个框架一方面可以用于复杂事件的处理(状态机),另一方面也可以用于极低延迟要求的动态 AI 模型的运用(每一个待预测个体都有自己的状态),不过像金融领域的情况可能还是比较挑战,因为特征计算本身可能就需要高强度计算,涉及 4、5 千个变量不稀奇。
  • 基本方针
    • 作为用户的开发者的自由度越小越好,从总体的规范度、代码质量、系统稳定性、可维护性、可控性、综合成本等方面考虑,通过提供 Scaffold、Framework、SDK、Platform、Configuration、Code Checker、UDF、SQL 等方式渗透到软件研发的全生命周期是非常可取的,当然这与开发者的探索欲望是矛盾的,但从企业的角度是非常值得的。
    • 要尽可减少网络 Shuffle 次数和 IO 的量、高效利用内存、索引、增量计算、少做无用功、复用累积成果、高效的 Serde;

5.3 支持 SQL 的技术

大家非常希望数据具有较高的新鲜度,即:在企业范围内从数据产生的那一刻直到被最终真正用起来的那一刻之间的时间越短越好。基于这样的期望,自然想到直接对接 Kafka 的 DB,然后用 SQL 直接得到结果,毕竟 SQL 有很多优势,以下介绍几种选项:

  • Rockset ,它是由一群原来在 Facebook 做搜索的、在 Google 做 Gmail 的、在 Oracle 做数据库的人在一起做出来的,支持各种数据源,尤其是 Amazon 云上的,包括 Kafka,它支持 Full ANSI SQL,通过 JDBC 或 REST 在其上可以直接用 Graphana/Tableau/RStudio/Superset/Jupyter/Redash/DataGrip,TB 级的数据量、毫秒级的 Latency,可见其强大,不过它不开源。
  • Presto-Pinot-Connector ,Presto 擅长 ANSI SQL Query,而 Pinot 擅长 OLAP,其实 Pinot 和 Rockset 在本质的精华上是一样的,就是对所有的字段做倒排索引,Pinot 也可以直接接 Kafka,这个东西也不开源,是 Uber 做的。Pinot 本身是开源的,有很强的的 OLAP 能力,2019 年社区强烈建议增加对 SQL 的支持,如果有资源也许可以尝试利用 Calcite 做自己的 Parser/Validation/Optimization/Plan 然后对接 Pinot。
  • SQL-on-Kafka-with-Presto ,一个 Presto 接多个 Kafka 集群,一个 Kafka 支持多个 Topic(Table),这样就可以通过 Presto 对 Kafka 使用 SQL 了,可以试试,没找到性能有关资料。不过应该都没有 Rockset 和 presto-pinot 快。
  • Couchbase ,虽然不是完全支持 ANSI SQL,但它有自己的查询语言 N1QL(SQL for JSON),N1QL 兼顾了大家对 SQL 的熟悉与 JSON 的数据结构,其结构、语法都非常像 SQL,虽然它没有 Join,但是支持子查询,支持 SDK 和 REST 两种方式来提交 SQL,还支持函数、窗口等好多特征,这些特点远胜于搜索引擎,但是索引要主动建立,这个不如搜索引擎。就 SQL 的支持程度而言,还可以考虑 Druid ,之所以和 Couchbase 放在一起,是因为它也支持子查询,它叫 Semi-join,虽然不支持 Join,但是 Druid 的优势在于它像搜索引擎一样不用有意识地建索引。
  • ksqlDB ,它的作者是 Kafka 的 Co-creator,ksqlDB 通过 REST 支持 SQL,支持负载均衡和自动故障转移,支持 Join。
  • Clickhouse 对 SQL 也有相当不错的支持。
  • 还有一个办法是把数据持续流入 Elasticsearch 或者 Solr ,不过这两个的 Join 能力都较弱,如果不需要 Join 那还是不错的。

5.4 一个优秀的 K/V 存储

Aerospike虽然不支持 Full SQL,但是真的是非常值得考虑的 HBase 的替换选项,尤其在要求更高速度的时候。Flink 中可以通过 TableFunction 实现对 Aerospike 的读写,Aerospike 的优势包括:

  • 采用 C 语言开发;
  • 读写明显比 HBase 快,K/V 查找速度 比 Redis 快 20-30%、比 Mongo 快 50%、比 Cassandra 快 45%,非常适合实时性高的应用;
  • Flash-optimized In-memory NoSQL(Key/Value),针对 SSD 做了专门的优化;
  • 支持二级索引;
  • 支持 UDF;
  • 本地集群的 Immediate Consistency 和跨数据中心的 Eventual Consistency;
  • 支持原子操作;
  • 支持基于用户与角色的访问控制;
  • 支持 ACID 操作;
  • Auto-sharding & Auto-healing,扩容无需手工干预;
  • 可以部署到 Kubernetes;
  • 开源,在 Reliability、Latency、Scalability 上都非常优异,广泛用在各个行业的关键业务的实时场景,如:金融、电讯、广告、游戏;
  • HA 简单;
  • 用了非常复杂的哈希算法来避免 Data Skew;
  • 支持多种语言;
  • 支持 Geo Index;
  • 和 RDBMS 相对应,其数据模型(括号中是关系型数据库的概念)是 Database/Namespace(Tablespace)/Set(Table)/Record(Row)/Bin(Column),但它是 Schemaless。Record 上可以有 TTL,Generation 相当于 HBase 的 Version。一个 Namespace 最多可以有 32k 不同的 Bin。