云计算·大数据 频道

Spark+ClickHouse企业级数据仓库实战

  近年来,基于云原生架构的新一代消息队列和流处理引擎 Apache Pulsar 在大数据领域发挥着愈发重要的作用,其应用场景和客户案例也在不断地丰富与扩充。

  火山引擎是字节跳动的企业服务品牌,主要面向 To B 业务场景。火山引擎中 Stateless 云原生开源大数据平台 E-MapReduce(简称 EMR)为用户提供了云上的端到端的大数据解决方案。与此同时,Apache Pulsar 的一个十分重要的特性也是云原生。先进的存算分离的架构使其非常适合在云化的环境中部署、运维,而 Topic 数据的存储方式也使其扩容操作大为简化,不需要数据的 rebalance 过程。于是,将 Pulsar 集成到火山引擎 EMR 的生态系统中便是一件水到渠成且极具价值的事情。

  本文介绍火山引擎 EMR 中 Apache Pulsar 的集成情况和应用场景,按照如下结构来编排:

  业务背景

  详解Apache Pulsar 在EMR的集成方案;

  Apache Pulsar 典型应用场景、问题与解法;

  火山引擎 EMR 集成 Pulsar 的未来规划。

  一、业务背景

  火山引擎是字节跳动旗下的云服务平台,将字节跳动快速发展过程中积累的增长方法、技术能力和工具开放给外部企业,提供云基础、视频与内容分发、数智平台 VeDI、人工智能、开发与运维等服务,帮助企业在数字化升级中实现持续增长。

  火山引擎 EMR 是火山引擎数据中台产品体系的基座。数据中台是火山引擎中的一类重要产品,服务于用户的大数据体系,支撑用户构建端到端的数据链路。火山引擎数据中台产品体系如下图所示。

  数据中台的大数据生产、服务体系,数据来源于交易系统、日志、IoT、消息、文件等,通过数据集成进入到数据湖中,然后经过数据开发、治理过程,进入到专题集市,最后通过数据分析平台提供给数据的最终用户,包括 BI 报表、离线分析、实时分析、即席查询、数据挖掘等。以上是用户搭建大数据体系的一条完整的数据链路。在这条数据链路上的各个环节都有火山引擎数据中台的产品来对接。火山引擎 EMR 产品在数据中台整个的产品体系全景图中,处于基座的位置(如上图中黄色框所示),对于用户构建端到端的数据链路起着重要的支撑作用。火山引擎 EMR 基于火山引擎的 IaaS 能力,提供底层基础的大数据体系的计算引擎和存储引擎,并向上对接数据开发治理工具 DataLeap。

  如果用一句话来定义火山引擎 EMR 这个云产品,那就是“Stateless 云原生开源大数据平台”。用户可以在 EMR 产品中创建自己的集群,并使用 EMR 集群中配置好的服务,进行大数据的计算与存储。

  这里重点分析一下火山引擎 EMR 产品定义中的几个关键词。云原生、开源、大数据平台这些概念相信都是读者们耳熟能详的。

  云原生是指云上资源的池化、用户的弹性按需使用、资源的成本摊薄和利用率提升等。开源大数据平台则是 EMR 这类云产品的共有定义。接下来重点讲一下 Stateless 这个概念。

  Stateless 指的是“无状态”。在 EMR 中创建的用户集群的“状态”指的是什么呢?以有状态场景下的 Hadoop 集群类型为例,集群的状态包括用户的 HDFS 中的数据(属于用户的核心数据资产)、Hive Metastore 中的元数据、Ranger 中的权限配置、各个服务的日志、历史作业执行统计信息、集群的配置信息等等。这些状态信息都是存储在用户集群内部的,是用户集群的一部分。在这样的情形下,用户的集群是一个有状态的(Stateful)集群。在 EMR 的场景下,状态信息无处不在,集群内部包含大量状态信息并不稀奇,且这些状态信息的量级较重。

  然而,用户集群富含状态信息,会给用户带来额外的一些成本和困扰。例如,如果用户想升级自己的集群版本,或者对自己的集群做一些其他的运维操作(例如服务的启停、执行定制化的运维脚本等),就会有一些顾虑:用户的数据、元数据、配置等信息都在集群内部,在执行集群升级或运维操作的时候,会不会对集群内部的状态信息造成影响。事实上,如果状态信息内置在用户集群内部,用户在对集群进行运维操作的时候,是需要做仔细的评估的,确保运维操作不会对集群内部的状态信息产生预期外的影响。这会给用户对集群的运维操作带来额外的顾虑和成本。

  从上面的讨论不难看出有状态的集群会给客户带来一系列痛点问题,而火山引擎的 Stateless 的 EMR 集群则针对以上问题,为用户提供了解决方案。如果我们把集群的数据、元数据、配置、历史作业信息等状态通过一些方案放置在用户集群的外部,而在用户集群的内部不再持有状态信息,这样用户的集群就是一个无状态的集群,此时用户如果需要对集群执行升级或者其他运维操作,就不会有“集群状态数据受影响”相关的顾虑了,减少了运维的风险与成本。

  在 Stateless 集群的场景下,用户甚至可以选择按需去持有集群,即:需要使用计算资源的时候,创建一个集群;不需要使用计算资源的时候,将集群释放。例如如果用户的数据生产 ETL 作业集中在凌晨执行,那么可以在当日的数据生产任务执行前将集群创建出来,然后用这个集群执行一系列的 ETL 作业,而在所有作业都成功执行完成后,再把这个集群释放掉。而到第二天凌晨,新一轮的数据生产作业执行之前,再创建出一个集群,待数据生产完成后再释放集群。如此循环往复。这样用户可以只为集群真正被使用的那段时间付费,而在不需要使用集群的时段,用户不需要持有集群,不存在用户持有的资源闲置的问题,用户也就不需要为闲置资源付费。这样可以给用户带来极大的成本优化,并提升云上资源的利用率。Stateless 的EMR 集群为这样的使用方式提供了可能。

  上面介绍了火山引擎 EMR 的核心定义。针对火山引擎 EMR 的核心功能,进一步展开讲一下,就是提供了企业级的大数据生态组件,例如:Hadoop、Spark、Flink、Hive、Presto、Kafka、ClickHouse、Hudi、Iceberg 等,100% 开源兼容,快速构建企业级大数据平台,降低运维⻔槛。

  火山引擎 EMR 的核心特性包括以下几点:

  开源兼容 & 开放环境:大数据组件来自开源社区,与开源版本兼容。EMR 提供半托管的环境。EMR 托管在火山引擎的基础设施之上,通过管控面将用户在控制台上的操作传递到用户集群内部。但是这个意义上的托管并不是“全托管”,而是“半托管”——用户有足够的自主性、灵活性,可以登录到自己集群的节点的命令行环境中,执行灵活的运维操作,如脚本执行、软件安装与部署等,以满足用户的个性化需求。也就是说,“半托管”一方面可以通过云托管、白屏化来解决用户实际运维中的痛点问题,降低用户的运维成本,另一方面又不失灵活性,用户可以自主控制自己集群内的节点,有极大的自由度。

  Stateless 云原生湖仓:Stateless 的概念在上文已有详述。火山引擎 EMR 通过存算分离把集群内部的数据外置到云存储中,如火山引擎对象存储 TOS,不再依赖用户集群内部的 HDFS。此外,通过外置 Hive Metastore、Public History Server、作业管理、配置中心等产品和技术方案,进一步把集群内部的状态信息外置。另外,通过弹性伸缩,支持用户在云上合理地调配资源,实现资源利用的最大化和成本的节约。Stateless 的架构也使得弹性伸缩的扩缩容过程更加轻量化,运维成本和风险得以降低。另外,火山引擎 EMR 也支持 Lakehouse(湖仓)这一近年来兴起的数据开发理念。

  引擎企业级优化:可以分两方面来看。一方面是火山引擎 EMR 针对开源的大数据组件在功能和性能上做了一些增强,后续也会将一些增强回馈社区。另一方面是给引擎增加了一些企业级的特性,例如权限相关的功能。

  云上便捷运维:复用了云上 EMR 的通用的管控底座能力,各个类型的集群的创建等操作复用 EMR 的公共管控底座。支持按量付费和包年包月的计费模式。支持集群的按需创建和释放。支持集群内服务的操作、参数配置、监控、报警、日志等运维能力。用户在购买 EMR 后可以直接在控制台对接使用这些功能,开箱即用,十分方便。用户可以把大量的运维操作交给云,或者借助云上提供的能力大大降低用户的运维成本。很多原本需要通过命令行和运维流程操作的运维动作,在火山引擎 EMR 中可以通过控制台界面白屏操作。这样用户可以专注于自身的业务逻辑、增长逻辑,而把大数据平台的构建和运维交给云平台。这也是云上的 EMR 产品能够给用户提供的核心价值之一。

  下图为火山引擎 EMR 的功能架构图。

  火山引擎 EMR 建构在火山引擎的基础设施底座上,由火山引擎提供云服务器、公网 IP、云存储、VPC 等基础设施。在基础设施底座上,建构出数据存储引擎(如 HDFS、CloudFS、表格式等)、数据调度引擎(如 YARN 等)、各种面向不同场景的大数据计算、存储组件以及贯穿整个 EMR 服务端到端的管控面。EMR 向上可以对接火山引擎的大数据研发治理套件 DataLeap,支持用户构建数据仓库,赋能百行百业,助力企业决策,帮助业务成长,体现数据价值。

  从 EMR-1.3.0 版本开始,火山引擎 EMR 支持 Pulsar 集群类型的创建。下面我们来具体看一下火山引擎 EMR 集成 Apache Pulsar 的情况。

  二、Apache Pulsar 在EMR的集成方案

  本节内容重点讨论 Apache Pulsar 集成火山引擎 EMR 的原因和方案。

  火山引擎 EMR 是一个云上的大数据平台,覆盖大数据开发领域各个场景,包括离线计算、实时计算以及存储、数据调度、工具链等。

  除此之外,还有一类组件不可或缺的,即消息队列,至少有两类不同的场景依赖消息队列:

  第一个场景是数据摄入(Data Ingestion),即从业务系统(也就是整个 大数据 体系的外部)把源头数据接入到大数据体系中,涉及到一个数据从业务系统向大数据体系传输的过程。

  以客户端埋点日志为例,埋点日志被上报到消息队列,该消息队列为大数据链路的第一站。从该消息队列开始,数据会继续向下游的离线 Hive 表或者实时数仓的下游消息队列流动。在此场景下,作为整个大数据体系的源头,消息队列连通业务系统和数据仓库,将大数据体系外面的数据上报到消息队列后,消息队列作为一个沟通的纽带,消息会流向下游的数据仓库的各层存储中,进入大数据体系内部。

  不光是埋点日志信息,用户的业务数据库的信息,也可以通过把数据库 binlog 上报到消息队列,由计算任务消费消息队列中的 binlog 并把数据写入下游表,实现业务数据库的数据向数仓的同步,在数仓中重建出业务库的副本。

  此外,像监控、日志类型的数据也可以上报到消息队列,再通过消息队列将对应的数据传导到大数据体系的内部。

  第二个典型应用场景是 实时数仓 。

  数据接入到数据仓库后,可以继续通过 ETL 过程构建离线表,也可以构建实时数据链路,使用实时处理逻辑将数据写到下游的消息队列中,而这个消息队列可以再进入下一级的实时处理逻辑,或做 mapping,或做聚合,进入到下一级的消息队列中。

  以上消息队列相当于实时数仓的实时表,存放 ODS、DWD、DWS、ADS 等层级的实时数仓数据。在这里,是使用消息队列作为实时数仓各层数据的存储。

  在最终数据应用的时候,根据应用场景的实际需要和查询特点,可以将实时数仓消息队列中的数据导出到像 Redis 这样的 K-V 存储中,或者像 StarRocks、Doris、ClickHouse 这样的 OLAP 引擎中。

  实时数仓的数据链路的中间层依赖消息队列的,因为实时数据的处理主要是流处理,而消息队列的存储与计算模式与流处理的模式是天然契合的。

  从上面的讨论可以看出,消息队列至少在数据接入和实时数仓中间层两个大数据体系的场景中扮演着不可或缺的作用,因此是大数据体系离不开的一类组件。所以火山引擎 EMR 将消息队列集成进来也就成为了一件水到渠成的很自然的事情了。

  而在消息队列领域中,近年来发展迅速、表现优异、备受关注的一个佼佼者便是 Apache Pulsar。以上是我们选择将 Apache Pulsar 集成到火山引擎 EMR 的原动力之一。

  当然除了这一点之外,还有以下的一些其他的原因。让我们来看一下 Apache Pulsar 的基本情况,以及一些核心的特性和优势。正是这些特性和优势,促成了我们将 Apache Pulsar 集成到火山引擎 EMR 中,并相信这样做会给用户带来很大的价值。

  Apache Pulsar 是一个开源的基于发布 / 订阅模式的分布式、云原生、多租户的高性能消息与流平台,提供消息队列和计算服务,解决服务器间的消息传输与队列问题。

  Pulsar 具有很多令人瞩目的特性和优势,下面选取了其中的一部分,主要是与把 Pulsar 集成到 EMR 最相关的一些关键要素。正是这些关键要素,使得我们相信把 Pulsar 集成到火山引擎 EMR 中确定会给用户带来很大的价值。这些关键要素列举如下:

  弹性:支持用户无感知的动态扩缩容,提供更好的弹性,为用户节省硬件成本,更好地契合了云上产品的特征。这是云上产品的基础特性,也是一个产品想要上云所需要具备的特性,能够给客户带来上云的实际价值。

  云原生:采用先进的云原生架构,将有状态的存储与无状态的计算分离在不同的架构层级中,非常适合在云化的基础设施中部署、使用和运维。这个也是被大家常常提到的 Pulsar 的核心特性,无论是基于 Kubernetes 部署,还是通过 Bare metal / ECS 部署,都可以利用到存算分离的架构特点,更好地利用云上资源池化、弹性的特点,实现更好的云原生。

  易扩容:存算分离以及数据的分散存储的架构特点极大减少了用户对计算或存储能力进行扩容时的成本与风险,用户可以对计算或存储节点分别扩容,特别是在扩容的时候不需要做繁重的数据迁移、rebalance,对系统的可用性、稳定性、可运维性和运维成本优化大有裨益。这也是大家津津乐道的 Pulsar 的一个非常令人瞩目的优秀特征。

  与用户既有系统(如 Kafka)兼容:通过 KoP (Kafka on Pulsar),提供与 Kafka 的在使用层面上的兼容性,便于用户直接复用已有的基于 Kafka 的代码体验 Pulsar 的特性。这一点也是非常重要的,能够带来很大的用户价值。Kafka 也是非常流行且在业内被广泛使用的一个消息队列组件,用户可能也会有很多基于 Kafka 开发的业务代码。如果用户希望把这些业务代码在 Pulsar 上面进行试用与体验,那么如果 Pulsar 与用户既有的一些系统(如 Kafka)兼容,就可以零成本或者低成本地把既有的业务代码放到 Pulsar 上来体验,更易于用户去体验 Pulsar 的各种令人瞩目的特性和功能。这一点对用户的价值很大。假设 Pulsar 没有提供与 Kafka 协议的兼容性,那么如果用户想体验 Pulsar,把既有的一些代码放到 Pulsar 上面试用、体验,可能需要对既有业务代码做一些修改、适配和迁移,这些工作也是有成本的,且迁移工作能够给用户在业务层面带来的价值有限,只是相当于在技术实现层面把代码进行了系统之间的迁移和适配,但是会给用户带来一些痛点和运维成本。所以如果能够做到和用户既有系统的兼容,可以帮用户省去一些很繁重的迁移工作,会带来很大的用户价值。

  基于以上这几点, Pulsar 可以很好地为客户提供价值、增值,这也促成Pulsar 集成到火山引擎 EMR 中。

  下面针对上文中提到的 Pulsar 的云原生架构和易扩容的特性,再展开讲一下技术细节。

  Pulsar 的云原生架构,如下图所示:

  具体来讲,有以下几点要素:

  计算和存储分离,消息数据存储在 BookKeeper 的 Bookie 中,由 Broker 提供服务。

  Broker 节点和 Bookie 节点可分别运维、扩缩容。

  支持数据 offload 到云上的对象存储。

  此外,Pulsar Client 与 Pulsar Broker 进行对接。ZooKeeper 节点与 Broker、Bookie 交互,处理元数据以及分布式系统中的协调。

  Pulsar 的另一个重要特性是易扩容。Pulsar Topic 数据的存储模式使得节点扩容时不需要 rebalance。这个的原因是 Pulsar 采用了 Topic - Ledger - Fragment - Entry 的多级结构来存储 Topic 的消息数据。如下图所示:

  一个 Topic 下会有多个 Ledger,一个 Ledger 下面会有一个或多个 Fragment,每一个 Fragment 下面会有多条消息(多个 Entry)。每个 Fragment 的实际数据的存储位置是在一组 Bookie 上面,不同的 Fragment 对应的 Bookie 的集合都是不一样的。这样的一个结构使得每一个 Topic 的消息天然分布在不同的 Bookie 节点中,而不同的 Fragment 的数据存储在不同的 Bookie 集合中。

  如果用户扩容一个新的 Bookie 节点,只需要把 Topic 的新的 Ledger / Fragment 的数据写入新 Bookie。旧 Bookie 的数据不用 rebalance。Pulsar 中的 Topic 和具体的存储节点并没有耦合、绑定。假设一个 Topic 的数据绑定在某一个固定的存储节点上,那么如果单纯地扩容存储节点,且如果 Topic 的数量不变,那么新的存储节点是不会有 Topic 的数据写进去的。为了让新扩容出来的存储节点能够被利用到,能够被写入 Topic 的数据,就需要更改一部分 Topic 与存储节点的绑定关系,这样就涉及到了数据的搬迁,即 rebalance。

  而 Pulsar 不存在这个问题,因为 Pulsar 天然就是一个 Topic 的数据分散在不同的 Bookie 节点中存储,所以在新扩容出一个 Bookie 节点后,一个 Topic 中的新的数据是可以写入到新的 Bookie 节点中的,新的 Bookie 节点也不用担心没有数据写进去。而 Topic 中的一些历史存量数据仍然存放在原来的地方,不用做存量数据的搬迁、rebalance。

  这样的话,对于用户来说,在扩容时的运维成本、风险和复杂性都大大降低了。这是 Pulsar 给客户提供的核心价值之一。

  相比于其他消息队列组件,Pulsar 也提供了一些差异化价值。下面这张表对比了 Pulsar 与 Kafka 的部分特性。

0
相关文章