开源实时数据处理系统Pulsar:一套搞定Kafka+Flink+DB

实时数据处理在各个行业和领域中已经变得越来越关键。但是在实时数据栈中,消息、计算和存储三个部分的分离,给方案的实现带来了高复杂性,低可维护性,低效率等问题。

本文整理自 Streamlio 核心创始人翟佳在 QCon2018 北京站的演讲,在本次演讲中,翟佳介绍了Apache Pulsar的架构、特性和其生态系统的组成,并展示了 Apache Pulsar 在消息、计算和存储三个方面进行的协调、抽象和统一。

Messaging:Pulsar对 pub/sub 和queue两种模式提供统一的支持,同时保证了一致性,高性能和易扩展性。

实时数据处理在刚刚兴起的时候,一般企业会采用λ架构,维护两套系统:一套用来处理实时的数据;另一套用 batch 的方式处理历史数据。两套系统带来了资源的冗余占用和维护的不便。

为了消除冗余,逐渐演化出κ架构,使用一套系统来满足实时数据处理和历史数据处理的需求。

不管是λ架构还是κ架构,在实时处理的系统中,系统的核心由消息、计算和存储三个子系统组成,比如消息系统有Kafka、RabbitMQ、Flume 等;计算系统有 Spark Streaming、Flink、Heron 等;存储系统有各种分布式的文件系统,DB、K/V store 等。 由于三个部分中,每个部分都有相应的不同产品,三个部分之间也相互分隔和独立很少关联,这带来了一些问题,比如需要更多人力维护,部署复杂,调优难度大,监管难,数据丢失风险大等等。

优秀的数据持久性和顺序性。每一条消息都提供了全局唯一的 ID,多副本,并都是在实时刷盘后再返回给用户。

灵活的扩展性: 节点扩展的线性和瞬时完成,在扩展中不会有数据的拷贝和迁移。

高吞吐低延迟,在实时刷盘的前提下,依然提供了高带宽(180 万 messages/ 秒)和低延迟(5ms at 99%)。

除了这些特性,Apache Pulsar 也具备了优秀的企业级特性,比如多机房互联互备(Geo-replication),多租户等。

Apache Pulsar 在架构上最明显的优势是采用了消息服务和消息存储分层的策略。它包括了无状态的消息服务层(broker 节点)和消息存储层(BookKeeper 中 Bookie 是基本的存储节点)。这为系统带来了极好的扩展性和健壮性。

分层的架构带为服务和存储两层都带来了线性、瞬时的扩展性。如果需要增加和支持更多的 Producer 和 Consumer,只用对 broker 进行 Scale。如果存储空间紧张,或者想要消息的时间保持的时间更长,可以单独增加存储节点 Bookie。

在服务层中,broker 不会有相关的数据被持久化保存,是无状态的。对 Topic 的服务可以很容易地迁移。如果 broker 失效,可以很容易地将 topic 迁移到健康的 broker。

在存储层(Bookie)也是一样。每个 topic 的数据被打散并均匀partition到多个 segment,每个 segment 的数据又被分散存储在 Bookie 集群中。当想增加容量的时候,只需要添加新的 Bookie,数据会优先选择刚加入的 Bookie。

介绍完 Apache Pulsar 的总体架构和特性,下面会从消息、存储和计算三个方面分别介绍 Apache Pulsar 的设计理念,各层内部以及各层之间的协调、抽象和统一。

Tenant 代表系统里的租户。假设有一个 Pulsar 集群被多个组织共享,集群里的每个 Tenant 可以代表一个组织的团队、一个核心的功能或一个产品线。一个 Tenant 可以包含多个 namespace,一个 namespace 可以包含多个主题。

Tenant 是资源的隔离的单位。namespace 是资源使用和权限设置的单位,我们可以设置权限、调整复制选项、管理跨集群的数据复制、控制消息的过期时间等。namespace 下的 Topic 会继承 namespace 的配置。如果用户获取了 namespace 的写入权限就可以往 namespace 写入数据,如果要写入的 topic 不存在,就会创建该 topic。

为了支持异地多备,namespace 又分为两种,一种是本地的,只在集群内可见;一种是全局的,对多个集群可见。可以在不同的数据中心之间进行数据的交互和互备。

Apache Pulsar 支持 exclusive、failover 和 shared 三种订阅类型,它们可以共存在同一个 topic 上。数据虽然只写了一次,但是可以通过三种的消费方式被多次消费。

第三种是 shared 的消费模式,它属于 Queue 的模式,常见的RabbitMQ、ActiveMQ 均属于这种模式。如果三个 consumer 共同订阅同一个 subscription,每个 consumer 大概会消费这个 topic 中的三分之一的数据,如果想ß增加消费的带宽,只用单独增加 consumer 的数量而不需要改变 topic 和 partition,非常实用于一些 consumer 处理复杂度比较高的场景,比如视频,图片处理等。

除了这三种消费模式,Apache Pulsar 还提供了 reader 的 API 来读取消息,让用户可以更加灵活的控制和消费消息。

Ack 机制在在消息系统中是非常重要的。消息系统中的 broker 和 consumer 可能会出错或宕机,当有错误发生的时候,如果能够获取上次消费者消费的位置,然后从这个消费的位置再接着消费,这是非常有用的,这样可以避免丢失数据,避免把所有的处理过的数据再处理一遍。

Pulsar 通过维护一个专门的数据结构ManagedCursor 来管理 ack 的信息,每次 ack 的改变都会被持久化到硬盘中。

对于 cumulative 的 ack,在标记的消息之前,所有的数据都被消费过了;遇到出错的情况会从标记的位置再开始消费。

对于 individual 的消费模式,会单独标记已经被消费过的消息;遇到出错的情况,所有的未被标记 ack 的消息都会被重新发送。Individual 的 ack 模式主要支持 share 的消费模式。它是很有必要的,因为对一般的 share 的消费模式,都是单个的消息消费处理比较慢,所以才增加 consumer。单独的标记,能在出错的时候减少不必要的昂贵的处理。

消息的 retention 策略,管理着消息什么时候被删除。 其他的系统大多是通过时间来控制。有可能时间到了,但消息没有被消费,也被删除了。

Apache Pulsar 中,提供了比较全面的 retention 策略。一般情况下,借助 ack 的信息,当所有subscription都消费了消息之后,消息才会删除。数据还可以额外的设置 retention period,即使都消费了也能再将消息保存一段时间。另外也支持 TTL 的模式。

BookKeeper 为 append-only 的写入模式提供了优化,通过独特的设计提供了高带宽和低延迟。

BookKeeper 提供了强一致性和顺序性。通过实时刷盘和多备份保证数据的持久性。顺序性通过记录本身携带的全局唯一顺序 ID 来保证的。这样对很多对顺序要求比较高的应用场景。

高可用是说数据会同时写入多个 bookie 上,如果 bookie 发生错误,即使只有一台包含数据的 bookie 可用,仍能为应用提供服务,在其他 bookie 恢复或有新的 bookie 加入后,会自动检查并补全所需要的数据备份。

IO 隔离,对于 Bookie 的读和写是分别发生在不同的磁盘上的。这样不依赖于文件系统和 pagecache 的设计,能保证即使有大量的读的同时,也能保证写的高带宽和低延迟;在大量的写入的同时,读请求的服务质量也能得到保证。这也是能保证多租户的一个关键。

当 partition 的容量上限达到后,需要扩容的时候,如果现有的单台机器不能满足,Kafka 可能需要添加新的存储节点,将partition的数据搬移到更大的节点上。但是 Pulsar 只用添加新的 Bookie 存储节点,新加入的节点由于剩余的空间大,会被优先使用,更多的接收新的数据;而且其中不会涉及到任何的老的数据的拷贝和搬移。

Pulsar 在单个节点失败时也会体现同样的优势。如果 Pulsar 的服务节点 broker 失效,由于 broker 是无状态的,其他的 broker 可以很快的接管 topic,不会涉及 topic 数据的拷贝;如果存储节点 Bookie 失效,集群中其他的 Bookie 会从多个 Bookie 节点中并发读取数据,并对失效节点的数据自动进行数据的恢复,不会对前端的服务有影响。

通过在 BookKeeper 内部提供 Stream 和 Table 两种服务,可以很方便的满足在实时数据处理中的绝大部分的存储需求。

首先我们看一个计算引擎最本质的是要解决什么问题。 首先用户定了了一个计算的需求,也就是处理过程: f(x),一组输入数据通过 f(x)的计算,得到一组输出的结果。

基于本质问题,计算引擎经过了长期的发展。第一代的计算引擎,以 Storm 为代表的通过一个有向无环图(DAG)来完成一组计算,通常需要大量的代码编写工作。现在大部分的计算引擎都提供第二代的 API,即通过 DSL 的方式。第二代的 API 相比第一代更加的紧凑和方便,但是还是有些复杂,比如包含着大量的 map、flatmap 等。

另外,云的兴起,带动了 serverless 的出现和兴盛,Serverless 为我们提供了一个很好的思路。serverless 提供的是 function 的 API,每一个事件触发一次 function,多个 function 可以通过组合的方式,完成比较复杂的逻辑。

我们可以举例说明一下 Pulsar Functions 适用的典型应用场景。

在边缘计算(Edge Computing)中,传感器会产生大量数据,而且数据会在边缘的本地节点上进行很多简单的处理,比如 Simple filtering, threshold detection, regex matching 等,另外边缘节点的计算资源有限。 Pulsar Functions 对这样的场景十分匹配。另外是在机器学习中。最开始的基础模型通过离线进行计算和训练。当训练完,上线后,每一个输入,都会匹配和应用模型,并对模型进行调整。这十分匹配 Pulsar Functions 的消息驱动的模式。另外模型本身也可以使用 BookKeeper 做存储,简化系统的部署。

通过介绍 Pulsar 的消息,存储和计算三个部分,希望能让大家对 Pulsar 有更进一步的了解。在 Pulsar 的消息系统中,提供了基于 Stream 和Queue的统一的消费模式,提供了无状态的Broker来提升系统的扩展性和容错性。在存储系统 BookKeeper 中,提供了对 Stream 的存储和对 K/V Table 的存储的统一,满足了实时处理系统中对 topic 和状态的存储需求。 在计算部分,Pulsar Functions 中基于消息驱动(stream-native),可以计算和消息一种统一。

这里的 Benchmark(是我们和阿里一起起草的 openMessaging 项目的一部分。如果有时间和机器,欢迎大家自己验证一下。

除了带宽数值的区别,另一方面是对 ExactlyOnce 的处理,Pulsar 通过自身的机制,几乎相对于一般的 模式在性能上没有区别。但是 Kafka 的两种模式会有较大的差别。

这个结果是 Pulsar 和 Kafka 在固定的 Public 带宽(50K/ 秒)下,各个百分位消息的发布时延。可以看出 Kafka 在不到 99% 的百分位,时延就开始大幅上升,但是 Pulsar 在 99.9% 的百分位以后,时延才开始上升。

这个结果是从时间轴的角度来看 Pulsar 和 Kafka 的时延。先不关注时延的绝对数值,直观的感觉是 Pulsar 的时延更加稳定;Kafka的时延会有很大的波动。 这和Pulsar中的内存和对 GC 的优化有直接的关系。Apache Pulsar 是一个新兴的下一代的消息系统,由于Pulsar Functions的加入,和底层 Apache BookKeeper 提供的 Table 服务的完善,现在可以认为 Apache Pulsar 是一个在消息、存储和计算三方面的统一的实时数据处理平台。

Apache Pulsar 有很多先进的理念、设计和抽象在里面。由于时间关系有很多的部分没能展开细讲。

翟佳 ,Streamlio核心创始成员之一,毕业于中科院计算所,目前就职于一家下一代实时处理初创公司 Streamlio,是 Streamlio的核心创始成员之一。在此之前任职于 EMC,是北京 EMC实时处理平台的技术负责人。主要从事实时计算和分布式存储系统的相关开发,是开源项目Apache BookKeeperPMC Member和 Committer,也在Apache Pulsar,Distributedlog等项目中持续贡献代码。

此条目发表在boylesports分类目录,贴了标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注