- paimon sink 源码 之 DynamicTableSink
- 上篇得知 sink 的拓扑是在 DynamicTableSink#getSinkRuntimeProvider 里面定义的
- Paimon 对于 DynamicTableSink 的实现就是 org.apache.paimon.flink.sink.FlinkTableSink
- getSinkRuntimeProvider 在父类 org.apache.paimon.flink.sink.FlinkTableSinkBase#getSinkRuntimeProvider 中定义
- 这里只探讨主键表在流模式下写入场景
拓扑梳理
-
判断是否有 logStoreTableFactory 在创建 DynamicTableSink 是根据建表语句是否有配置 log.system 来决定是否有 logStoreTableFactory
1.1 log.system 默认值是 none,可以配置成 kafka,配置成 kafka 数据不仅仅会写入 fileSystem 也会写入 kafka 相当于是双写,流读消费 paimon 表数据的时候 streaming-read-mode 可以配置成 log 从 kafka 进行消费1.2 log.system 设置为 kafka 举例
CREATE TABLE T (i INT, j INT) WITH ( 'log.system'='kafka', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='Tt', 'connector='paimon' ... )
1.3 LogSinkFunction 的创建,对于 log.system 是 kafka 实际上就是利用 kafka connector 的 FlinkKafkaProducer 进行数据发送
-
sink.parallelism 设置 sink 的并行度
-
local-merge-buffer-size 如作业存在主键数据偏斜可以设置“local-merge-buffer-size”,在数据进行 shuffle 之前进行 buffer 和 merge, 当同一主键在快照之间频繁更新时,这特别有用。建议从“64 mb”开始调整缓冲区大小。不适用于 CDC ingestion
3.1 如果设置了 local-merge-buffer-size 就会加入一个 LocalMergeOperator 算子
3.2 LocalMergeOperator 算子并行度和上游保持一致,shuffle 方式是 forward -
对 DataStream<RowData> 进行一次 map 将 org.apache.flink.table.data.RowData 转化成 org.apache.paimon.data.InternalRow
4.1 算子并行度和上游一致
4.2 paimon InternalRow 只是对 flink RowData 的包装实际还是操作的 flink RowData -
判断 BucketMode ,在这篇里有讲述 Paimon Table BuketMode 的逻辑
- 对于主键表会有 FIXED、DYNAMIC、GLOBAL_DYNAMIC 三种 mode
- 对于 append only 表有 FIXED、UNAWARE 两种 mode
- 不同的 BucketMode 会有不同的拓扑,他们的不同主要体现在 partition 的逻辑不一样和数据写入的逻辑不一样,之后的一些算子就是一样的,这次我们先梳理主线不去看算子细节,主线梳理完成之后在写每个算子的细节
- 这次面向主键表,从 FIXED mode 开始
FIXED
- 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
- 并行度为 sink.parallelism
- 分区逻辑为
a. 先抽取 bucket key 的值,然后将值的 hash 和 bucket 桶数取模Math.abs(key_hashcode % numBuckets
得到一个 bucket 序号
b. 如果有 logFuntion 则直接按照 bucket 序号和算子并行度取模recordChannel = bucket % numChannels
c. 如果没有 logFuntion 则先算出 分区属于那一个 channel, 然后再用 分区的 channel + bucket 序号再和并行度取模 这个算法和我在 hudi 提的分区算法是一样的int startChannel = Math.abs(partition.hashCode()) % numChannels; return (startChannel + bucket) % numChannels;
- 分区后添加数据写入 RowDataStoreWriteOperator 算子
- 并行度为上游并行度
- 这里有点花原本我以为 logFuntion 会直接加一个 KafkaSinkFunction 算子,没想到他是直接集成到了 RowDataStoreWriteOperator 里面。可能是我 out 了。上代码
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> { private final LogSinkFunction logSinkFunction; @Override public void setup() { super.setup(containingTask, config, output); if (logSinkFunction != null) { // 调用 flink core 强加 function FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext()); } } public void open() throws Exception { super.open(); this.sinkContext = new SimpleContext(getProcessingTimeService()); if (logSinkFunction != null) { // to stay compatible with Flink 1.18- if (logSinkFunction instanceof RichFunction) { RichFunction richFunction = (RichFunction) logSinkFunction; //不仅仅当前算子要 open logFuntion 也手动 open richFunction.open(new Configuration()); } } } public void processElement(StreamRecord<InternalRow> element) { SinkRecord record; try { record = write.write(element.getValue()); //写 fileSystem } catch (Exception e) { throw new IOException(e); } if (record != null && logSinkFunction != null) { // write to log store, need to preserve original pk (which includes partition fields) SinkRecord logRecord = write.toLogRecord(record); logSinkFunction.invoke(logRecord, sinkContext); // 来吧接着干 log } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); if (logSinkFunction != null) { // log 也 snapshotState StreamingFunctionUtils.snapshotFunctionState( context, getOperatorStateBackend(), logSinkFunction); } } }
DYNAMIC
- Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
- 获取 dynamic-bucket.assigner-parallelism: 用来定义 assigner 算子的并行度如果没有定义就是 sink.parallelism。这个配置的设置和桶的初始化个数相关,太小会导致 assigner 算子处理速度不够
- 获取 dynamic-bucket.initial-buckets: 控制初始化bucket的数量。
- 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
- 并行度为 assigner-parallelism
- 分区逻辑为
a. 他的逻辑和 FIXED 在没有 logFunction 的逻辑一样,不同点是 FIXED 的 buketSize 是固定的,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值bucket=MathUtils.min(numAssigners, numChannels)
b. 然后就是 FIXED 逻辑一致了int start = Math.abs(partitionHash % numChannels); int id = Math.abs(keyHash % buketSize); return (start + id) % numChannels;
- 添加 HashBucketAssignerOperator 算子
– 并行度为上游的并行度 - 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle
– 并行度为 sink.parallelism
– 分区逻辑为
a. 上游 HashBucketAssignerOperator 会给每条记录打上一个 编号(编号产生的逻辑暂时不看)
b. 然后就是和 FIX 一样int startChannel = Math.abs(partition.hashCode()) % numChannels; // 这个 bucket 是上游 HashBucketAssignerOperator 打上的一个编号 return (startChannel + bucket) % numChannels;
- 分区后添加数据写入 DynamicBucketRowWriteOperator 算子
- 并行度为上游并行度
GLOBAL_DYNAMIC
- Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
- 离线(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用处于 TODO 转态(此条可以忽略)
- 添加一个 IndexBootstrapOperator 算子构建索引
- 并行度为上游并行度
- 同上获取 dynamic-bucket.assigner-parallelism
- 同上获取 dynamic-bucket.initial-buckets
- 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
- 并行度为 Max(assigner-parallelism,initial-buckets) 如果为空则是 sink.parallelism
- 分区逻辑为 主键 函数和并行度取模 Math.abs(主键.hashCode() % numChannels)
- 添加 GlobalIndexAssignerOperator 算子
- 并行度为上游并行度
- 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle 和 DYNAMIC 第 6 步一样的
- 分区后添加数据写入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一样的
- 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
到此在不同 bucketMode 下对应的算子梳理完毕了, 在这些处理完之后还有一些通用的 doCommit 逻辑
- 判断 sink.savepoint.auto-tag 是否为 true 默认为 false , 参数表示是否自动创建 tag 如果开了则添加 AutoTagForSavepointCommitterOperator 算子 并且这个算子里面是包含 CommitterOperator 的
- 算子并行度 为 1
- 如果没有开启则直接是 CommitterOperator 算子 并行度也是 1
- 最后添加一个 空的 sink 节点 DiscardingSink 并行度为 1
FINAL
-
梳理了一个 DataStream 在 Paimon sink 时的整个 DataStream 转化拓扑。并且对其中的分区器进行了分析整理入下图
- 接下来看具体算子的逻辑