Upsert在Hudi中的实现分析
介绍
Hudi支持
Upsert
语义,即将数据插入更新至Hudi数据集中,在借助
索引
机制完成数据查询后(查找记录位于哪个文件),再将该记录的位置信息回推至记录本身,然后对于已经存在于文件的记录使用
UPDATE
,而未存在于文件中的记录使用
INSERT
。本篇继续分析记录如何进行插入更新的。
分析
还是从
HoodieBloomIndex#tagLocation
开始进行分析,其核心代码如下。
public JavaRDD<HoodieRecord> tagLocation(JavaRDD<HoodieRecord> recordRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { ... // Lookup indexes for all the partition/recordkey pair JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); ... JavaRDD<HoodieRecord> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD); ... return taggedRecordRDD; }
经过
lookupIndex
方法后只是找出了哪些记录存在于哪些文件,此时在原始记录中还并未有位置信息,需要经过
tagLocationBacktoRecords
将位置信息回推到记录中,该方法核心代码如下
protected JavaRDD<HoodieRecord> tagLocationBacktoRecords( JavaPairRDD keyFilenamePairRDD, JavaRDD<HoodieRecord> recordRDD) { JavaPairRDD<HoodieKey, HoodieRecord> keyRecordPairRDD = recordRDD.mapToPair(record -> new Tuple2(record.getKey(), record)); // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), // so we do left outer join. return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values() .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull()))); }
可以看到该方法的核心逻辑非常简单,先把最原始的记录进行一次变换(方便后续进行join操作),然后将变换的记录与之前已经查找的记录进行一次左外连接就完成了记录位置的回推操作(不得不感叹
RDD
太强大了)。
在完成位置信息回推后,就可以通过
upsertRecordsInternal
进行插入更新了,该方法核心代码如下
private JavaRDD upsertRecordsInternal(JavaRDD<HoodieRecord> preppedRecords, String commitTime, HoodieTable hoodieTable, final boolean isUpsert) { ... WorkloadProfile profile = null; if (hoodieTable.isWorkloadProfileNeeded()) { profile = new WorkloadProfile(preppedRecords); saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime); } // partition using the insert partitioner final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile); JavaRDD<HoodieRecord> partitionedRecords = partition(preppedRecords, partitioner); JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { if (isUpsert) { return hoodieTable.handleUpsertPartition(commitTime, partition, recordItr, partitioner); } else { return hoodieTable.handleInsertPartition(commitTime, partition, recordItr, partitioner); } }, true).flatMap(List::iterator); return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime); }
首先会对记录进行统计,如本次处理中每个分区插入、更新多少条记录,然后根据不同的表类型(
Merge On Read
/
Copy On Write
)来获取对应的
Partitioner
进行重新分区,这里以
HoodieCopyOnWriteTable$UpsertPartitioner
为例进行分析。构造该对象时会利用
profile
信息来进行必要的初始化。
UpsertPartitioner(WorkloadProfile profile) { ... assignUpdates(profile); assignInserts(profile); ... }
其中
assignUpdates
相应方法核心代码如下
private void assignUpdates(WorkloadProfile profile) { // each update location gets a partition WorkloadStat gStat = profile.getGlobalStat(); for (Map.Entry<String, Pair> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) { addUpdateBucket(updateLocEntry.getKey()); } } private int addUpdateBucket(String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.UPDATE; bucketInfo.fileIdPrefix = fileIdHint; bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; }
该方法借助统计信息遍历需要更新的记录,并生成
UPDATE
类型的桶信息,
文件名 -> 桶序号
的映射、
桶序号与桶信息
的映射都会被保存。
而
assignInserts
方法核心代码如下
private void assignInserts(WorkloadProfile profile) { // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); ... for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { List smallFiles = getSmallFiles(partitionPath); long totalUnassignedInserts = pStat.getNumInserts(); List bucketNumbers = new ArrayList(); List recordsPerBucket = new ArrayList(); // first try packing this into one of the smallFiles for (SmallFile smallFile : smallFiles) { long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); if (recordsToAppend > 0 && totalUnassignedInserts > 0) { // create a new bucket or re-use an existing bucket int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); } else { bucket = addUpdateBucket(smallFile.location.getFileId()); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); totalUnassignedInserts -= recordsToAppend; } } // if we have anything more, create new insert buckets, like normal if (totalUnassignedInserts > 0) { long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize(); if (config.shouldAutoTuneInsertSplits()) { insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); recordsPerBucket.add(totalUnassignedInserts / insertBuckets); BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } } // Go over all such buckets, and assign weights as per amount of incoming inserts. List insertBuckets = new ArrayList(); for (int i = 0; i < bucketNumbers.size(); i++) { InsertBucket bkt = new InsertBucket(); bkt.bucketNumber = bucketNumbers.get(i); bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); insertBuckets.add(bkt); } partitionPathToInsertBuckets.put(partitionPath, insertBuckets); } } }
该方法相对
assignUpdagtes
较为复杂,其首先会获取记录中涉及到的分区,若该分区有记录插入,则需要获取该分区下所有小数据文件(小于配置的最小数据文件大小),然后计算该小文件中还可插入多少条记录,之后计算剩余待插入记录还需要多少桶,并生成
INSERT
类型的桶信息和随机文件名,最后遍历所有的桶,并生成
InsertBucket
和对应的权重信息,经过上述步骤就完成了记录插入的处理,即优先插入小文件,对于剩余的记录则写入新文件。
对于自定义
Partitioner
而言,最重要的是
numPartitions
和
getPartition
两个方法,
numPartitions
返回桶的个数(
totalBuckets
),
getPartition
方法核心代码如下
public int getPartition(Object key) { Tuple2<HoodieKey, Option> keyLocation = (Tuple2<HoodieKey, Option>) key; if (keyLocation._2().isPresent()) { HoodieRecordLocation location = keyLocation._2().get(); return updateLocationToBucket.get(location.getFileId()); } else { List targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath()); // pick the target bucket to use based on the weights. double totalWeight = 0.0; final long totalInserts = Math.max(1, globalStat.getNumInserts()); final long hashOfKey = Hashing.md5().hashString(keyLocation._1().getRecordKey(), StandardCharsets.UTF_8).asLong(); final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; for (InsertBucket insertBucket : targetBuckets) { totalWeight += insertBucket.weight; if (r <= totalWeight) { return insertBucket.bucketNumber; } } // return first one, by default return targetBuckets.get(0).bucketNumber; } }
对于更新的记录,则从
文件名 -> 桶序号
映射中根据文件名返回桶序号,对于插入的记录,则结合权重信息计算后返回桶序号。
再回到
upsertRecordsInternal
方法,定义了
Partitioner
后,然后对记录进行一次重新分区(一个
bucket
对应一个
分区
),然后再调用
HoodieCopyOnWriteTable#handleUpsertPartition
进行更新插入,其核心代码如下
public Iterator<List> handleUpsertPartition(String commitTime, Integer partition, Iterator recordItr, Partitioner partitioner) { UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; ... if (btype.equals(BucketType.INSERT)) { return handleInsert(commitTime, binfo.fileIdPrefix, recordItr); } else if (btype.equals(BucketType.UPDATE)) { return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } ... }
首先根据分区号获取到对应的桶信息,然后分别处理
INSERT
和
UPDATE
不同场景,其中
handleInsert
方法核心代码如下
public Iterator<List> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord> recordItr) throws Exception { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } return new CopyOnWriteLazyInsertIterable(recordItr, config, commitTime, this, idPfx); }
如果分区无记录,则直接返回空迭代器,否则会创建一个迭代器进行处理。
而
handleUpdate
方法核心代码如下
public Iterator<List> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord> recordItr) throws IOException { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr); return handleUpdateInternal(upsertHandle, commitTime, fileId); } protected Iterator<List> handleUpdateInternal(HoodieMergeHandle upsertHandle, String commitTime, String fileId) throws IOException { ... AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema()); BoundedInMemoryExecutor wrapper = null; try (ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build()) { wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader), new UpdateHandler(upsertHandle), x -> x); wrapper.execute(); ... return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); }
可以看到如果分区无记录,则直接返回空迭代器,否则会再进一步处理。
不管是对于
INSERT
还是
UPDATE
,其都会借助
BoundedInMemoryExecutor
来转发对记录的处理(涉及生产者-消费者-队列模型,后续会单独分析)。
对于
UPDATE
,最终会由
HoodieMergeHandle
来处理实际的更新,具体可参考
HoodieMergeHandle#write
方法,其核心代码如下
public void write(GenericRecord oldRecord) { String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); boolean copyOldRecord = true; if (keyToNewRecords.containsKey(key)) { // If we have duplicate records that we are updating, then the hoodie record will be deflated after // writing the first record. So make a copy of the record to be merged HoodieRecord hoodieRecord = new HoodieRecord(keyToNewRecords.get(key)); Option combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema); if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) { /* * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new * value * * We no longer need to copy the old record over. */ copyOldRecord = false; } writtenRecordKeys.add(key); } if (copyOldRecord) { ... storageWriter.writeAvro(key, oldRecord); ... recordsWritten++; } }
如果旧记录(文件中的旧记录)在新纪录(新写入的记录)中存在,将旧记录与新纪录合并(合并策略可以自定义实现,默认新记录覆盖旧记录),合并后再写入新文件(与原来FileId相同,但是commitTime不同,commitTime越大,文件越新),如果旧记录不存在,那么需要复制旧记录,然后写入新文件中。这样便完成了文件中已存在记录的更新和文件中未存在记录的复制,保证无记录丢失。
对于
INSERT
,最终会由
HoodieCreateHandle
来处理实际的插入,具体可参考
HoodieCreateHandle#write
方法,其核心代码如下
public void write(HoodieRecord record, Option avroRecord) { Option recordMetadata = record.getData().getMetadata(); if (avroRecord.isPresent()) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); // update the new location of record, so we know where to find it next record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); record.seal(); recordsWritten++; insertRecordsWritten++; } else { recordsDeleted++; } writeStatus.markSuccess(record, recordMetadata); // deflate record payload after recording success. This will help users access payload as a // part of marking // record successful. record.deflate(); }
对于值存在的记录,则表示插入,写入数据文件,然后释放记录的内容,当然在调用该
write
方法写入之前,需要先判断该文件还能不能写入(当前文件大小是否大于配置的最大数据文件大小和分区路径是否一致),若不能写入,则会在原来FileId上加从0开始的递增序列以生成新文件写入。
经过上述步骤就完成了整个写入过程,之后还有些收尾工作,如索引的更新、写入失败处理及临时文件清除等,这里不再具体分析。
总结
对于
Upsert
而言,Hudi总体的处理流程是先根据索引给记录打标签,然后进行一次重新分区,对于新插入的记录,会优先插入小文件中,避免出现太多小文件,而且也会根据数据文件的具体配置控制数据文件的大小;而对于更新的记录,则会与旧记录进行合并、必要时复制旧记录到新文件(FileId与旧文件的FileId相同,commitTime不同)中。