/** * Determine the offset commit mode using several configuration values. * * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties. * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled. * @param enableCheckpointing whether or not checkpoint is enabled for the consumer. * * @return the offset commit mode to use, based on the configuration values. */ publicstatic OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) {
if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; } }
/** * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS. * This overwrites whatever setting the user configured in the properties. * @param properties - Kafka configuration properties to be adjusted * @param offsetCommitMode offset commit mode */ staticvoidadjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) { if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } }
/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */ GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** Start from the earliest offset possible. */ EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset. */ LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
/** * Start from user-supplied timestamp for each partition. * Since this mode will have specific offsets to start with, we do not need a sentinel value; * using Long.MIN_VALUE as a placeholder. */ TIMESTAMP(Long.MIN_VALUE),
/** * Start from user-supplied specific offsets for each partition. * Since this mode will have specific offsets to start with, we do not need a sentinel value; * using Long.MIN_VALUE as a placeholder. */ SPECIFIC_OFFSETS(Long.MIN_VALUE); }
默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费
在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1
@Override protected FlinkKafkaProducer.KafkaTransactionState beginTransaction()throws FlinkKafkaException { switch (semantic) { case EXACTLY_ONCE: FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer(); producer.beginTransaction(); returnnewFlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer); case AT_LEAST_ONCE: case NONE: // Do not create new producer on each beginTransaction() if it is not necessary final FlinkKafkaProducer.KafkaTransactionStatecurrentTransaction= currentTransaction(); if (currentTransaction != null && currentTransaction.producer != null) { returnnewFlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer); } returnnewFlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true)); default: thrownewUnsupportedOperationException("Not implemented semantic"); } }