基于对象存储的离线大数据处理架构和应用实践

文章作者:熵简大数据团队

内容来源:熵简学院

导读: 熵简科技大数据处理系统目前已经累计完成 3.7 PB 规模的大数据处理和分析,覆盖了超 2000+ 数据源,涉及丰富的数据类型,如宏观经济数据、电商招聘等另类数据、研报新闻等文本类数据。这背后,是一个处理PB级数据的离线大数据处理架构。

本文从离线大数据处理的数据存储选型要点入手,详细介绍如何构建一套基于对象存储的离线大数据处理框架。同时,作为彩蛋,我们在文章最后一部分,介绍了基于该大数据处理框架的电商大数据实践案例,该数据集已被广泛应用于金融投资分析中。

作者信息

本文出自熵简科技大数据团队,团队致力于构建高效率、低成本和低运维的大数据处理系统,为熵简科技应用层及中台层各项服务提供海量的算力支持,涉及数据清洗、数据融合、数据核验、数据建模等多种处理能力。

01

背景

随着大数据时代的到来,当前互联网上的各类数据急剧膨胀。庞杂的信息需要经过采集、清洗、分析才能形成真正有价值的信息,从而为各类机构和企业提供决策依据。作为一家提供全域数据智能服务的科技公司,熵简科技的数据服务是所有业务的核心基础。

目前,熵简科技大数据处理系统已经累计完成 3.7 PB 规模的大数据处理和分析,覆盖了超 2000+ 数据源,涉及丰富的数据类型,如宏观经济数据、电商招聘等另类数据、研报新闻等文本类数据。

这背后,是一个处理PB级数据的离线大数据处理架构。在这个PB级离线数据处理的架构中,最重要的两个部分便是数据存储和计算引擎,各自的核心需求如下:

  1.1   数据存储

原始数据是来自于互联网的海量数据,其特点是数据量大,并且随着时间的推移不断的膨胀。

很多原始数据是只有在特定的时间内才能采集到的,如果丢失将不能进行恢复,是一个不可挽回的损失。

因此,数据存储的要求要保证:

1、大量数据的存储(月增幅TB级)

2、强大的IO能力

3、支持增量数据的追加写入和读取

4、稳定可靠

  1.2   计算引擎

数据处理需要应对的事情是面对不断膨胀的数据不仅仅需要进行新增数据的处理,还要面临可能因为算法调整需要重新计算所有历史数据的情况。

因此,数据处理要求保证:

1、大量数据处理的能力(PB级别)

2、高效的数据处理能力(月度或者日度的数据更新需要在小时级别处理完毕)

3、稳定、鲁棒性高,在出错的情况下可以迅速进行调整和步骤级别恢复

4、开发和学习成本低

02

数据存储技术选型

  2.1      对象存储介绍

对象存储是一种可以水平扩展的分布式数据存储,其中每个数据单元存储称为一个对象,实际上可以是任何类型和任何大小的数据。对象存储中的所有对象都存储在单个平面地址空间中,而没有文件夹层次结构。

当前市面上最有名的对象存储非 AWS S3 莫属,阿里云也提供了类似的存储服务 Aliyun OSS。其特点就是高扩展性(几乎无限的存储空间),可用性和持久性(11个9的持久性保证),高性能,低成本,丰富的支持(Hadoop s3a等)。基于云服务商提供的对象存储服务,使用 RESTful API 可以在互联网任何位置存储和访问数据,同时也能选择多种存储类型以全面优化存储成本。

除了云服务商提供的对象存储服务之外,开源社区也提供了对象存储服务,MinIO 是其中的佼佼者。MinIO 是一个基于 Apache License v2.0 开源协议的对象存储服务。它兼容 AWS S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,一个对象文件可以是任意大小,从数 KB 到最大 5TB 不等。

丰富的云厂商支持和开源社区支持,保证了使用对象存储作为存储介质具有了私有化部署的可能,也为 ToB 的大数据服务提供了更多的技术方案。

  2.2       存储成本对比

与文件类型和块类型存储不同,对象存储没有归档层次结构,并且元数据完全可自定义,因此与文件或块存储相比,硬件限制要少得多;加上对象存储的横向扩展性,其存储成本远远低于其他类型的存储。

我们以阿里云上的 OSS、Table Storage、文件存储NAS、普通云盘四类存储服务为例,对比一下各自的价格:

注:以上数据均来自阿里云的各项存储服务的公开标价。

从上表中可以看出来,对象存储无疑是性价比之王,其标准存储的价格仅为普通云盘的一半。而云服务商提供的低频存储和归档存储类型,价格更低(一般为三分之一的价格)。存储类型可以方便的切换和灵活的配置,大大降低了成本。

  2.3      数据 管道

在对大数据清洗的过程中,数据管道通常指数据来源到计算引擎之前的数据传递渠道,主要负责给计算引擎提供数据以及保存计算结果。由于采集系统或上游系统每时每刻都在产生大量的数据,计算引擎读取数据的速度一定要高于上游系统的写入速度。 以 Apache Spark 为例,Spark应用的耗时有很大一部分花在读取数据上,因此一个高效的数据处理流程,对数据管道的要求也是极高,数据管道需要极佳的 IO 性能。在 做大数据离线清洗中,常见的数据源包括关系型数据库、Kafka 和对象存储。

关系型数据库

关系型数据库无法应付大量的非结构化数据,大规模的存储和高并发访问会导致数据库负载过高最终产生性能瓶颈。

实践中我们尝试过 2TB 的 RDS 数据库作为数据管道,一旦超过千万级的数据量,读取整张表将要花费近小时级的时间,而同样的读取在对象存储中只需要数分钟。这样的差距放在 10 亿级别的数据量上更是无法接受的。

Kafka 等消息队列

Kafka 在大数据量方面上则占据了优势,高吞吐量,高并发读写,无疑是流式数据处理中绝佳的选择,但是对于离线大数据处理,其最大的问题是存储成本高、运维成本高、且不适合保存历史冷数据。

对象存储

使用对象存储作为中间数据管道,可以带来很多好处。

从存储成本分析,对象存储在存储方式上可按照对象访问量、创建时间来区分存储类型,可以很方便地将历史数据做降级存储,有效降低存储成本。

从并发量分析,对象存储的并发性能极佳,特别是使用云服务商的对象存储服务时,例如阿里云的 OSS、AWS 的 S3 等,不需要考虑并发量的上限。

从运维上考虑,无论是开源的 MinIO,或者是云服务商提供的对象存储服务,运维都极其简单,人力成本几乎为零。

此外,使用对象存储可以将结构化数据和非结构化数据采用统一的方式存储,避免了在使用数据库类解决方案时,文本、音视频等文件需要单独存储的痛点。

但对象存储同时也带来了一些其他问题。例如对增量数据的读取方式,没有 Kafka 的指针、RDS 的自增 ID 作为原生支持。但是 我们可以通过对象存储的key值的特殊设计来解决这一问题,下文的具体方案中会有详细阐述。

  2.4 中间存储

在数据清洗过程中为了保证调试的鲁棒性和效率的提升,我们一般会在重要节点存储中间数据,例如非结构化数据第一次变成结构化的时候,一般会存储一份中间数据,保证后续的读取足够高效。

通常中间存储使用的是 HDFS。HDFS 具有的特性是高可用性和高扩展性,而这些特性对象存储全部都占有,甚至还有更强大的 IO 性能。此外由于 HDFS 用 NameNode 来存储文件元数据和文件寻址方式的原因,HDFS 对于大量小文件的读写存在性能瓶颈;而对象存储单独存储文件元数据,对于不同类型、不同大小的中间存储都提供了强大的支持。

基于对象存储的 Hadoop-AWS、Hadoop-OSS 等支持包解决了各计算引擎读取数据的方式,可以直接通过类 HDFS 的接口进行文件操作,与 HDFS 可以无缝切换,加上其几乎无需运维的特性,对象存储是离线数据处理中间存储的最佳选择之一。

03

整体架构

我们以 Apache Spark + 对象存储的流程为例,介绍基于对象存储的离线大数据处理架构。

整个处理过程分为以下几个主要流程:

1、Spider 通过网络采集,获取到数据后存储到对象存储中

2、通过 Airflow 的调度将数据清洗任务提交到 Spark cluster

3、清洗任务读取对象存储中的原始采集数据,并将中间数据输出到对象存储

4、再通过中间若干个数据清洗任务,最终将数据输出到 ElasticSearch

5、前后端通过对 ElasticSearch 的数据查询结果向客户展示分析后的有效数据

  3.1   对象存储详细方案

  3.1.1   原始数据存储

存储格式

采集端使用 JSON 格式存储、Snappy方式压缩数据,目的在于压缩存储所需的资源,Snappy 对 JSON 数据的压缩效果不错,可以大大节省存储空间,并且 Spark 读取 Snappy 数据不需要任何改造。

以下为压缩测试的一些结果:

存储路径管理

尽管对象存储没有天然的层次结构,但是我们可以将相同类型的数据对象赋予相同前缀的Key值,得到不同的存储路径(对象 Key 值前缀)。

存储路径按照不同的数据种类分目录进行存储,例如某网站的评论数据可以放在  web/comment/ 路径下,使用不同的路径区分不同的数据表,目的很简单,根据路径可以确定单表内容,保证 JSON 数据不进行混合

分布式爬虫可以生成 UUID 作为文件名放入当前时间的文件夹,并发写入性能可以大大提升。

时间游标管理

在同一类型的数据存储路径下,采用数据入库时间来确定具体对象 Key 值, 例如:web/comment/2019/01/01/00/02/00/  ,利用路径的方式给数据增加了时间游标的功能,可以通过路径进行增量遍历。

文件大小

单文件大小控制在 64 MB 左右,避免过小的分片,数据进入对象存储之前进行合并处理是一个优秀的处理方式。

  3.1.2   中间数据存储

存储格式

中间存储一律使用 Parquet 格式,Parquet 格式压缩率很高,节省空间,Spark 支持很好,读取效率极高,丰富的数据格式可以支持绝大部分的数据结构。

存储路径管理

按照路径区分中间数据表例如:web/comment_analyzed,web 代表数据库名,而 comment_analyzed 代表被分析过的中间数据。

时间游标管理

离线数据的清洗有比较强的批次性,在采集数据的时候就会确定好相应的批次。

例如,如果需要 2019年12月 的数据,那么就会在原始数据中增加 “batch_date”: “2019-12-01” 的批次标志。

中间数据清洗后,依照 web/comment_analyzed/batch_date=2019-12-01/ 的方式进行分区,Hadoop-AWS、Hadoop-OSS 等多数基于对象存储的依赖包都支持基于对象前缀的自动分区发现。

文件大小

控制中间输出文件的大小为 64 MB 左右,不宜过小也不宜过大。

其原因在于使用 Spark 进行数据处理的时候会按照文件的大小作为初始单个 partition 处理数据的大小。

过小的文件会造成读取的 overhead 过大,从而读取缓慢,而过大的文件大小会导致需要的 executor 内存大小暴涨。

  3.2   Spark任务开发详细方案

时间游标管理

由于使用对象前缀标记存储类型和入库时间,数据清洗过程中需要开发读取对象存储时间格式路径的基础功能,我们接入了Mysql 作为元数据记录,记录每个任务当前处理到的时间节点信息。

例如:在2020年1月1日我们处理到时间游标为 web/comment/2020/01/01/00/02/00/的数据,则在数据库中记录 2020-01-01 00:01:00 这个时间。

2020年1月2日再次进行任务清洗的时候,则将先从数据读取这个时间,并从 web/comment/2020/01/01/00/02/01/ 开始数据的读取。

处理步骤的拆分

原始数据和中间存储采用不同的格式存储数据,读写性能也有较大的差距,为此需要区分采集数据的首次清洗和后续的数据清洗。

采集数据的首次清洗具有比较高的复杂度,由于是从 JSON 数据读取,并且可能是庞大的非结构化数据,处理效率较低,所以首次处理一般会使用追加的方式将原始数据转存到以批次时间进行分区的对象存储路径中,例如:

web/comment_analyzed/batch_date=2019-01-01。

后续的数据清洗是灵活的,可以按照任务的复杂性进行步骤拆分,每个步骤最终都会产生一个中间数据,可以大大地提高数据处理效率。

04

实践案例

接下来,以某电商平台的全量数据分析为案例,详细介绍如何运用本文所提出的架构,高效率地实现 TB 级别电商大数据的处理和分析。

在本案例中,我们需要对电商平台上的全部商品进行销售情况分析,需要计算商品的月度销售量、销售额等指标,并按行业、品牌、属性等做不同维度的聚合。

采集端提供的原始数据有如下几类:

(1) 商品属性数据

包含商品ID、行业名称、品牌名称,以及商品细分参数,例如规格、类型、产地、包装等。

(2) 商品价格数据

包含商品原始价格、折扣价格、促销信息。

(3) 商品销售量数据

包含商品总销量、月度销量。

(4) 商品评论数据

包含商品总评论数、好评数、中评数、差评数等信息。

在金融投资领域,信息不对称是这一行业的本质特征和竞争焦点,因而数据的及时性非常重要。在大数据时代, 快速利用和挖掘信息的能力越来越成为各个金融资管机构竞争的焦点。 因此,对于每月的新增批次数据,我们需要保证在月初的一天内完成对于上月数据的处理和分析。

  4.1      难点分析

1、数据处理时限短:如上所述,该电商平台同期在线商品SKU量在10多亿。为了满足数据及时性要求,每批次的数据需要在月初的一天内完成数据采集、数据处理。

2、数据量大:计算月度销量等指标数据所需的基础数据有:上月总销量、本月总销量、商品当月价格、商品属性、商品所属行业品牌等。每一种基础数据的量级都在 10 亿级别。存储格式为多行 JSON 文件+ Snappy 压缩,每批次数据的空间占用量接近 1TB。

4.2   实现方案

针对 4.1 中的难点,我们以分品牌、分行业、分属性的月度销售量数据为分析目标,详细介绍其中的处理细节。

数据预处理

预处理是指将采集中的一部分增量数据进行实时处理,而非等到完全采集完毕才处理,这样可以有效减少最终计算时处理的数据量和计算量。

可进行预处理的计算任务如下图所示:

预处理实施分为如下几个步骤:

1、每日将所有爬取到的增量数据由 JSON 转为 Parquet。

2、对某些可进行预计算的数据进行计算并保存处理后的中间数据。

例如,对于价格部分我们采用了拉链表的计算方式。可以每日进行一次拉链表计算,每次处理增量数据,这样就不用到月末时处理一整个月的数据。

对于销量部分,我们需要按照采集时间将销量数据分片,这个操作也可以放到每日进行。

这部分的任务,如果放在最终计算步骤,会额外增加约 6h 的处理时间。

销量数据计算

有了数据预处理的基础,我们开始进行月度销量数据的计算。通过 Airflow 可以将复杂的计算逻辑以图的形式展示出来,使得依赖关系变得清晰可控,整体的计算逻辑如下图所示:

商品销售数据的计算主要分为如图几个部分:

1、价格处理

此部分主要对价格进行拉链表转换、月度平均价格计算等处理。

考虑到价格是会变化的,因此价格数据的采集粒度是日度,每月的价格数据加在一起有300亿条以上。但价格并不是每天都在变,而且大部分商品的价格变化并不频繁,因此采用了拉链表的存储方式,来减少最终处理的数据量。

考虑到无法采集到每一个商品的成交价格,因此需要计算月度平均价格来做销售额计算。

2、销量处理

此部分主要对销量数据按照采集时间来做一个分片存储,方便后续计算月度销量。

3、商品行业、品牌、属性处理

此部分主要从采集数据中抽取到商品的行业、品牌等分类。以及对商品的各种参数进行归整。

由于单条数据体积较大,在格式解析,字段提取上会耗费不少时间。

4、商品月度销售额计算

此部分计算商品的月度销售数据。月度销售量即为本次采集到的的销售量减去上月采集到的销售量。但由于采集时间并非严格的每月1号,因此需要将销售量进行一些细微的调整拆分,以获取月度准确销售量。然后根据价格拉链表获取对应月度的平均价格,计算出销售额。

5、行业、品牌等月度销售聚合

需要对全部商品数据进行行业、品牌、店铺、属性等各个维度的聚合。

6、指标清洗

此部分需要按行业、按品牌将历史全部的月度销量转换为时序数据,方便业务人员使用。

如上所述,出于数据量以及性能优化的考虑,将整个计算任务进行了细致的拆分,并加入中间数据缓存以及任务并行度提升的优化措施:

1、计算步骤拆分

一般计算过程会产生很多中间结果,中间结果的持久化会带来很多好处。当程序崩溃时,通过中间结果可以以最小的代价复现崩溃,从而更快速的发现 BUG。

当修复 BUG 后,也可以通过读取中间数据进行之后的计算,从而节省恢复时间。当计算结果核查有问题时,可通过查询中间数据,快速定位到出问题的数据点。

2、任务并行化拆分

很多计算的结果,既需要保存到数据库、也需要作为下一步计算的输入。由于数据量巨大,在保存到业务端数据库比如 MySQL、ElasticSearch 时是非常耗时的。

此时通过将计算结果持久化到对象存储。然后在下游拆分多个任务同时进行计算和存储,也可节省将近 4 小时的时间。

在整个实现方案中,中间数据的存储起到了非常重要的作用。对象存储的低成本、高 IO 效率使得大规模使用中间数据成为可能。

最终,整套计算方案采用100台机器( 4核32G )可以在 16h 内完成 1TB 量级电商数据的处理和分析,完全满足业务应用上对于数据实时性的要求。

05

总结

针对 PB 级异构数据源的离线处理和分析的需求,本文介绍了一种基于低成本对象存储的大数据处理方案,该技术方案是一个低成本、低运维、低学习成本的架构方案,同时具备很高的可用性和扩展性。基于该方案,我们以某电商平台的月度全量数据为应用案例,详细介绍了运用该框架进行销量数据统计的处理细节和重要优化点。目前熵简科技的数据量已经达到了 3700TB 左右,该方案可以持续稳定的提供数据清洗、数据分析和数据融合的能力。

今天的分享就到这里,谢谢大家。

如果您喜欢本文,欢迎点击右上角,把文章分享到朋友圈~~

社群推荐:

欢迎加入  DataFunTalk  深度学习 交流群 ,跟同行零距离交流。识别下方二维码, 自动入群。

团队招聘

熵简科技大数据团队长期招聘大数据工程师、后端工程师,期 待优秀的 同学加入我们,Base北京、上海。欢迎感兴趣的同学发送简历至:

recruit@entropyreduce.com。

文章推荐:

NLP技术在金融资管领域的落地实践

NLP技术在海外金融机构的应用

关于我们:

DataFunTalk  专注于 大数据、人工智能 技术应用的 分享与交流 。发起于2017年,在 北京、上海、深圳、杭州 等城市举办超过 100 场线下沙龙、论坛及峰会,已邀请近 500 位专家和学者参与分享。其公众号 DataFunTalk 累计生产原创文章 300+百万+ 阅读, 6万+ 精准粉丝。

你今天写  BUG  了么? :point_down: