Skip to content
gqlxj1987's Blog
Go back

Flink kafka source & sink intro

Edit page

原文链接

Source解析

消费kafka最重要的是offset的管理

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}
/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}

kafka 官方文档中,提到当 enable.auto.commit=false 时候需要手动提交 offset,也就是需要调用 consumer.commitSync(); 方法提交。

但是在 flink 中,非 checkpoint 模式下,不会调用 consumer.commitSync();, 一旦关闭自动提交,意味着 kafka 不知道当前的 consumer group 每次消费到了哪。

有checkpoint的情况

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   }
}

关闭自动提交

消费模式

public enum StartupMode {

   /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
   GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

   /** Start from the earliest offset possible. */
   EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

   /** Start from the latest offset. */
   LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

   /**
    * Start from user-supplied timestamp for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   TIMESTAMP(Long.MIN_VALUE),

   /**
    * Start from user-supplied specific offsets for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   SPECIFIC_OFFSETS(Long.MIN_VALUE);
}

默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费

在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1

Sink 解析

@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
         FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
         producer.beginTransaction();
         return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
      case AT_LEAST_ONCE:
      case NONE:
         // Do not create new producer on each beginTransaction() if it is not necessary
         final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
         if (currentTransaction != null && currentTransaction.producer != null) {
            return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
         }
         return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
}

开启事务


Edit page
Share this post on:

Previous Post
Jetcache intro
Next Post
Scalable and Cloud-native hyperparameter tuning system