学习 Druid(三):Kafka 数据摄入
2014 年 5 月 24 日
Druid 支持两种 Kafka 数据摄入方式:
- Push 通过 Tranquility;
- Pull 通过 Kafka Indexing Service。
Tranquility
TODO
Kafka Indexing Service
1. 编辑配置文件
编辑 Overload 和 MiddleManager 的 conf/druid/cluster/_common/common.runtime.properties 配置文件。
加载 Kafka Indexing Service 扩展:
druid.extensions.loadList=["mysql-metadata-storage"]
2. 编写 Supervisor 说明文件
官网:chestnut::
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions": [], "dimensionExclusions": [ "timestamp", "value" ] } } }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "localhost:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" } }
Supervisor 配置:
- type
kafka
,必填; - dataSchema 指定摄入数据的 schema,必填;
- ioConfig Supervisor 和 Indexer 任务配置,必填;
- tuningConfig Supervisor 和 Indexer 任务调优配置,选填。
2.1 DataSchema
Druid 的 Datasource 可以作为关系型数据库中的 Table。
Druid 的 Schema 符合时序库的数据结构,包括:
- 时间戳(Timestamp) 描述事件发生的时间;
- 维度(Dimension) 描述事件的属性;
- 指标(Metric) 描述事件的值;
数据在 Datasource 中按时间戳进行分区,每一个分区被称为 Chunk,Chunk 中每一个文件被称为 Segment。
2.2 IOConfig
配置项 topic,Kafka Topic,必填;
配置项 consumerProperties,Kafka Consumer 配置项,必填;
配置项 replicas,副本集数;
配置项 taskCount,每个副本集任务数。
2.3 TuningConfig
略。
3. 提交 Supervisor
Druid 提供两种方法提交 Supervisor:
- UI ,在 Tasks 菜单,点击 Submit Supervisor 按钮;
- REST API ,
curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://:/druid/indexer/v1/supervisor
Trouble Shooting
索引 Task 报错 Unrecognized VM option 'ExitOnOutOfMemoryError'
错误日志:
Unrecognized VM option 'ExitOnOutOfMemoryError' Did you mean 'OnOutOfMemoryError='? Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
解决办法:
导致该问题的原因是主机 JVM 不支持 ExitOnOutOfMemoryError
参数。
第一种方法,升级至 JDK 1.8 8u92 以上版本(推荐!!!)。
第二种方法,编辑 conf/druid/cluster/data/middleManager/runtime.properties 文件,找到 druid.indexer.runner.javaOpts
配置项:
# Task launch parameters druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
移除其中的 -XX:+ExitOnOutOfMemoryError
JVM 配置项。