前言
Hudi除了支持Spark、Fink寫Hudi外,還支持Java客戶端。本文總結(jié)Hudi Java Client如何使用,主要為代碼示例,可以實(shí)現(xiàn)讀取Hive表寫Hudi表。當(dāng)然也支持讀取其他數(shù)據(jù)源,比如mysql,實(shí)現(xiàn)讀取mysql的歷史數(shù)據(jù)和增量數(shù)據(jù)寫Hudi。
版本
Hudi 0.12.0
功能支持
支持insert/upsert/delete,暫不支持bulkInsert目前僅支持COW表支持完整的寫Hudi操作,包括rollback、clean、archive等。
代碼
完整代碼已上傳GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。
其中HoodieJavaWriteClientExample是從Hudi源碼里拷貝的,包含了insert/upsert/delte/的代碼示例,JavaClientHive2Hudi是我自己的寫的代碼示例總結(jié),實(shí)現(xiàn)了kerberos認(rèn)證、讀取Hive表Schema作為寫hudi的Schema、讀取Hive表數(shù)據(jù)寫hudi表,并同步hudi元數(shù)據(jù)至hive元數(shù)據(jù),實(shí)現(xiàn)自動(dòng)創(chuàng)建Hive元數(shù)據(jù),當(dāng)然也支持讀取其他數(shù)據(jù)源,比如mysql,實(shí)現(xiàn)歷史和增量寫。
相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置參數(shù),更貼近實(shí)際使用,比如HoodieJavaWriteClientExample的payload為HoodieAvroPayload這只能作為示例使用,JavaClientHive2Hudi使用的為DefaultHoodieRecordPayload它支持預(yù)合并和歷史值比較,關(guān)于這一點(diǎn)可以參考我之前寫的文章:Hudi preCombinedField 總結(jié)(二)-源碼分析,如果只需要預(yù)合并功能,可以使用OverwriteWithLatestAvroPayload,這倆分別是Spark SQL 和 Spark DF的默認(rèn)值,當(dāng)然都不需要的話,也支持HoodieAvroPayload,代碼里是根據(jù)條件判斷需要用哪個(gè)payloadClassName。
String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload.class.getName() :
shouldCombine ? OverwriteWithLatestAvroPayload.class.getName() : HoodieAvroPayload.class.getName();
然后利用反射構(gòu)造payload,其實(shí)這里反射的邏輯就是Hudi Spark源碼里的邏輯。
另一個(gè)它更貼近實(shí)際使用的原因就是我們項(xiàng)目上就是將Hudi Java Client封裝成了一個(gè)NIFI processor,然后用NIFI調(diào)度,其性能和穩(wěn)定性都能夠滿足項(xiàng)目需求,這里的核心邏輯和實(shí)際項(xiàng)目中的邏輯是差不多的。關(guān)于我們使用Java客戶端的原因是由于歷史原因造成的,因?yàn)槲覀冎斑€沒有調(diào)度Spark、Flink的開發(fā)工具(之前用的NIFI),而開發(fā)一個(gè)新的開發(fā)工具的話是需要時(shí)間成本的,所以選擇了Java客戶端,我們現(xiàn)在已經(jīng)將Apache DolphinScheduler作為自己的開發(fā)調(diào)度工具了,后面會(huì)主要使用Spark/Flink,所以現(xiàn)在總結(jié)一下Hudi Java Client的使用以及源碼,避免遺忘,也希望對(duì)大家有所幫助。
初始化Hudi表
Java Client的代碼更貼近源碼。
initTable主要是根據(jù)一些配置信息,生成.hoodie元數(shù)據(jù)路徑,并生成hoodie.properties元數(shù)據(jù)文件,該文件里持久化保存了Hudi的一些配置信息。
if (!(fs.exists(path) && fs.exists(hoodiePath))) { //根據(jù)Hudi路徑存不存在,判斷Hudi表需不需要初始化
if (Arrays.asList(INSERT_OPERATION, UPSERT_OPERATION).contains(writeOperationType)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(TABLE_TYPE)
.setTableName(targetTable)
.setPayloadClassName(payloadClassName)
.setRecordKeyFields(recordKeyFields)
.setPreCombineField(preCombineField)
.setPartitionFields(partitionFields)
.setBootstrapIndexClass(NoOpBootstrapIndex.class.getName())
.initTable(hadoopConf, tablePath);
} else if (writeOperationType.equals(DELETE_OPERATION)) { //Delete操作,Hudi表必須已經(jīng)存在
throw new TableNotFoundException(tablePath);
}
}
hoodie.properties
#Properties saved on 2022-10-24T07:40:36.530Z
#Mon Oct 24 15:40:36 CST 2022
hoodie.table.name=test_hudi_target
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=5
hoodie.timeline.layout.version=1
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.checksum=1749434190
創(chuàng)建HoodieJavaWriteClient
首先要?jiǎng)?chuàng)建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些參數(shù),具體可以自己去了解。
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(2, 2).withDeleteParallelism(2)
.forTable(targetTable)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000萬總體時(shí)間提升1分鐘
.bloomFilterFPP(0.000001) // 1000萬總體時(shí)間提升3分鐘
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileLimit)
.approxRecordSize(recordSizeEstimate).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150, 200).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(maxFileSize).build())
.build();
writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
startCommit
返回commitTime,首先會(huì)執(zhí)行rollback,然后創(chuàng)建一個(gè).commit.request,再將commitTime返回。
String newCommitTime = writeClient.startCommit();
generateRecord
這里主要是構(gòu)造寫hudi需要的數(shù)據(jù)結(jié)構(gòu),包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。
public static List<HoodieRecord<HoodieRecordPayload>> generateRecord(ResultSet rs,
Schema writeSchema,
String payloadClassName,
boolean shouldCombine) throws IOException, SQLException {
List<HoodieRecord<HoodieRecordPayload>> list = new ArrayList<>();
while (rs.next()) {
GenericRecord rec = new GenericData.Record(writeSchema);
writeSchema.getFields().forEach(field -> {
try {
rec.put(field.name(), convertValueType(rs, field.name(), field.schema().getType()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
String partitionPath = partitionFields == null ? "" : getRecordPartitionPath(rs, writeSchema);
System.out.println(partitionPath);
String rowKey = recordKeyFields == null && writeOperationType.equals(INSERT_OPERATION) ? UUID.randomUUID().toString() : getRecordKey(rs, writeSchema);
HoodieKey key = new HoodieKey(rowKey, partitionPath);
if (shouldCombine) {
Object orderingVal = HoodieAvroUtils.getNestedFieldVal(rec, preCombineField, false, false);
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec, (Comparable) orderingVal)));
} else {
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec)));
}
}
return list;
}
寫Hudi
最后執(zhí)行寫Hudi的操作,常用upsert/insert/delete,Java Client也是默認(rèn)開啟clean等操作的,具體的實(shí)現(xiàn)是在HoodieJavaCopyOnWriteTable中。目前還不支持bulkInsert等操作,后面如果我有能力的話,會(huì)嘗試提交PR支持。
writeClient.upsert(records, newCommitTime);
writeClient.insert(records, newCommitTime);
writeClient.delete(records, newCommitTime);
同步Hive
最后是同步元數(shù)據(jù)至Hive,實(shí)現(xiàn)在hive中建表,這一步是可選的。這樣可以利用Hive SQL和Spark SQL查詢Hudi表。
/**
* 利用HiveSyncTool同步Hive元數(shù)據(jù)
* Spark寫Hudi同步hive元數(shù)據(jù)的源碼就是這樣同步的
*
* @param properties
* @param hiveConf
*/
public static void syncHive(TypedProperties properties, HiveConf hiveConf) {
HiveSyncTool hiveSyncTool = new HiveSyncTool(properties, hiveConf);
hiveSyncTool.syncHoodieTable();
}
public static HiveConf getHiveConf(String hiveSitePath, String coreSitePath, String hdfsSitePath) {
HiveConf configuration = new HiveConf();
configuration.addResource(new Path(hiveSitePath));
configuration.addResource(new Path(coreSitePath));
configuration.addResource(new Path(hdfsSitePath));
return configuration;
}
/**
* 同步Hive元數(shù)據(jù)的一些屬性配置
* @param basePath
* @return
*/
public static TypedProperties getHiveSyncProperties(String basePath) {
TypedProperties properties = new TypedProperties();
properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(), true);
properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), dbName);
properties.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTable);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
if (partitionFields != null && !partitionFields.isEmpty()) {
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
}
return properties;
}
與0.9.0版本差異
之前是基于0.9.0版本開發(fā)的,本文代碼示例基于0.12.0,核心代碼是一樣的,差異的地方有兩處。
1、0.9.0 clean、archive的參數(shù)都是在withCompactionConfig中,現(xiàn)在單獨(dú)拎出來2、0.9.0 HiveSyncTool的參數(shù)為HiveSyncConfig,現(xiàn)在為TypedProperties。
總結(jié)
Hudi Java Client和Spark、Flink一樣都可以實(shí)現(xiàn)完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數(shù)的形式完成歷史數(shù)據(jù)和增量數(shù)據(jù)寫入Hudi。也可以自己實(shí)現(xiàn)多線程,提升性能,我們目前測(cè)試的性能是Insert可以達(dá)到10000條/s,而upsert因?yàn)樾枰x取索引,還有歷史數(shù)據(jù)的更新,可能需要重寫整個(gè)表,所以當(dāng)歷史數(shù)據(jù)比較大且更新占比比較高時(shí),單線程的性能會(huì)非常差,但是我們基于源碼改造,將布隆索引和寫數(shù)據(jù)的部分改為多線程后,性能就會(huì)提升很多,當(dāng)然這也取決于機(jī)器的性能,和CPU、內(nèi)存有關(guān)。對(duì)于數(shù)據(jù)量不是很大的ZF數(shù)據(jù),一般大表幾十億,性能還是可以滿足要求的。