paimon sink 源码之 dataStream 的拓扑梳理

随笔6个月前发布 一马平川
87 0 0
  • paimon sink 源码 之 DynamicTableSink
  • 上篇得知 sink 的拓扑是在 DynamicTableSink#getSinkRuntimeProvider 里面定义的
  • Paimon 对于 DynamicTableSink 的实现就是 org.apache.paimon.flink.sink.FlinkTableSink
  • getSinkRuntimeProvider 在父类 org.apache.paimon.flink.sink.FlinkTableSinkBase#getSinkRuntimeProvider 中定义
  • 这里只探讨主键表在流模式下写入场景

拓扑梳理

  1. 判断是否有 logStoreTableFactory 在创建 DynamicTableSink 是根据建表语句是否有配置 log.system 来决定是否有 logStoreTableFactory
    1.1 log.system 默认值是 none,可以配置成 kafka,配置成 kafka 数据不仅仅会写入 fileSystem 也会写入 kafka 相当于是双写,流读消费 paimon 表数据的时候 streaming-read-mode 可以配置成 log 从 kafka 进行消费

    1.2 log.system 设置为 kafka 举例

    CREATE TABLE T (i INT, j INT) WITH (
       'log.system'='kafka', 
       'log.system.partitions'='2', 
       'kafka.bootstrap.servers'='%s', 
       'kafka.topic'='Tt',
      'connector='paimon'
      ...
    )
    

    1.3 LogSinkFunction 的创建,对于 log.system 是 kafka 实际上就是利用 kafka connector 的 FlinkKafkaProducer 进行数据发送

    paimon sink 源码之 dataStream 的拓扑梳理

  2. sink.parallelism 设置 sink 的并行度

  3. local-merge-buffer-size 如作业存在主键数据偏斜可以设置“local-merge-buffer-size”,在数据进行 shuffle 之前进行 buffer 和 merge, 当同一主键在快照之间频繁更新时,这特别有用。建议从“64 mb”开始调整缓冲区大小。不适用于 CDC ingestion
    3.1 如果设置了 local-merge-buffer-size 就会加入一个 LocalMergeOperator 算子
    3.2 LocalMergeOperator 算子并行度和上游保持一致,shuffle 方式是 forward

  4. 对 DataStream<RowData> 进行一次 map 将 org.apache.flink.table.data.RowData 转化成 org.apache.paimon.data.InternalRow
    4.1 算子并行度和上游一致
    4.2 paimon InternalRow 只是对 flink RowData 的包装实际还是操作的 flink RowData

  5. 判断 BucketMode ,在这篇里有讲述 Paimon Table BuketMode 的逻辑

  • 对于主键表会有 FIXED、DYNAMIC、GLOBAL_DYNAMIC 三种 mode
  • 对于 append only 表有 FIXED、UNAWARE 两种 mode
  • 不同的 BucketMode 会有不同的拓扑,他们的不同主要体现在 partition 的逻辑不一样和数据写入的逻辑不一样,之后的一些算子就是一样的,这次我们先梳理主线不去看算子细节,主线梳理完成之后在写每个算子的细节
  • 这次面向主键表,从 FIXED mode 开始

    FIXED

    1. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 sink.parallelism
      • 分区逻辑为
        a. 先抽取 bucket key 的值,然后将值的 hash 和 bucket 桶数取模 Math.abs(key_hashcode % numBuckets 得到一个 bucket 序号
        b. 如果有 logFuntion 则直接按照 bucket 序号和算子并行度取模 recordChannel = bucket % numChannels
        c. 如果没有 logFuntion 则先算出 分区属于那一个 channel, 然后再用 分区的 channel + bucket 序号再和并行度取模 这个算法和我在 hudi 提的分区算法是一样的

        int startChannel = Math.abs(partition.hashCode()) % numChannels;
        return (startChannel + bucket) % numChannels;
        

    2. 分区后添加数据写入 RowDataStoreWriteOperator 算子
      • 并行度为上游并行度
      • 这里有点花原本我以为 logFuntion 会直接加一个 KafkaSinkFunction 算子,没想到他是直接集成到了 RowDataStoreWriteOperator 里面。可能是我 out 了。上代码

           public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
              private final LogSinkFunction logSinkFunction;
              @Override
              public void setup() {
                   super.setup(containingTask, config, output);
                   if (logSinkFunction != null) {
                      // 调用 flink core 强加 function
                       FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
                   }
                }
               public void open() throws Exception {
                   super.open();
                   this.sinkContext = new SimpleContext(getProcessingTimeService());
                   if (logSinkFunction != null) {
                       // to stay compatible with Flink 1.18-
                       if (logSinkFunction instanceof RichFunction) {
                           RichFunction richFunction = (RichFunction) logSinkFunction;
                           //不仅仅当前算子要 open  logFuntion 也手动 open 
                           richFunction.open(new Configuration());
                       }
                   }
               }
        
               public void processElement(StreamRecord<InternalRow> element) {
                   SinkRecord record;
                   try {
                       record = write.write(element.getValue()); //写 fileSystem
                   } catch (Exception e) {
                       throw new IOException(e);
                   }
        
                  if (record != null && logSinkFunction != null) {
                       // write to log store, need to preserve original pk (which includes partition fields)
                       SinkRecord logRecord = write.toLogRecord(record);
                       logSinkFunction.invoke(logRecord, sinkContext); // 来吧接着干 log
                   }
               }
        
               @Override
               public void snapshotState(StateSnapshotContext context) throws Exception {
                   super.snapshotState(context);
                    if (logSinkFunction != null) {
                         // log 也 snapshotState
                       StreamingFunctionUtils.snapshotFunctionState(
                               context, getOperatorStateBackend(), logSinkFunction);
                   }
               }
           }
        

    DYNAMIC

    1. Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
    2. 获取 dynamic-bucket.assigner-parallelism: 用来定义 assigner 算子的并行度如果没有定义就是 sink.parallelism。这个配置的设置和桶的初始化个数相关,太小会导致 assigner 算子处理速度不够
    3. 获取 dynamic-bucket.initial-buckets: 控制初始化bucket的数量。
    4. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 assigner-parallelism
      • 分区逻辑为
        a. 他的逻辑和 FIXED 在没有 logFunction 的逻辑一样,不同点是 FIXED 的 buketSize 是固定的,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值 bucket=MathUtils.min(numAssigners, numChannels)
        b. 然后就是 FIXED 逻辑一致了

           int start = Math.abs(partitionHash % numChannels);
           int id = Math.abs(keyHash % buketSize);
           return (start + id)  % numChannels;
        

    5. 添加 HashBucketAssignerOperator 算子
      – 并行度为上游的并行度
    6. 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle
      – 并行度为 sink.parallelism
      – 分区逻辑为
      a. 上游 HashBucketAssignerOperator 会给每条记录打上一个 编号(编号产生的逻辑暂时不看)
      b. 然后就是和 FIX 一样

       int startChannel = Math.abs(partition.hashCode()) % numChannels;
       // 这个 bucket 是上游 HashBucketAssignerOperator 打上的一个编号
       return (startChannel + bucket) % numChannels;
      

    7. 分区后添加数据写入 DynamicBucketRowWriteOperator 算子
      • 并行度为上游并行度

    GLOBAL_DYNAMIC

    1. Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
    2. 离线(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用处于 TODO 转态(此条可以忽略)
    3. 添加一个 IndexBootstrapOperator 算子构建索引
      • 并行度为上游并行度
    4. 同上获取 dynamic-bucket.assigner-parallelism
    5. 同上获取 dynamic-bucket.initial-buckets
    6. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 Max(assigner-parallelism,initial-buckets) 如果为空则是 sink.parallelism
      • 分区逻辑为 主键 函数和并行度取模 Math.abs(主键.hashCode() % numChannels)
    7. 添加 GlobalIndexAssignerOperator 算子

    - 并行度为上游并行度
    

    1. 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle 和 DYNAMIC 第 6 步一样的
    2. 分区后添加数据写入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一样的

到此在不同 bucketMode 下对应的算子梳理完毕了, 在这些处理完之后还有一些通用的 doCommit 逻辑

  1. 判断 sink.savepoint.auto-tag 是否为 true 默认为 false , 参数表示是否自动创建 tag 如果开了则添加 AutoTagForSavepointCommitterOperator 算子 并且这个算子里面是包含 CommitterOperator 的
    • 算子并行度 为 1
  2. 如果没有开启则直接是 CommitterOperator 算子 并行度也是 1
  3. 最后添加一个 空的 sink 节点 DiscardingSink 并行度为 1

FINAL

  • 梳理了一个 DataStream 在 Paimon sink 时的整个 DataStream 转化拓扑。并且对其中的分区器进行了分析整理入下图

    paimon sink 源码之 dataStream 的拓扑梳理

  • 接下来看具体算子的逻辑
© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...