DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Flink kafka source & sink intro

原文链接

Source解析

  • SourceStreamTask 中的 performDefaultAction()

  • sourceThread.start();
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    * LegacySourceFunctionThread 的 run 方法

    * headOperator.run

    * StreamSource 中的 run 方法

    * userFunction.run(ctx)

    * FlinkKafkaConsumerBase.run

    * kafkaFetcher.runFetchLoop()

    * ```java
    consumerThread.start();
  • ```java
    KafkaConsumer.poll

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19





    消费kafka最重要的是offset的管理

    ```java
    public enum OffsetCommitMode {

    /** Completely disable offset committing. */
    DISABLED,

    /** Commit offsets back to Kafka only when checkpoints are completed. */
    ON_CHECKPOINTS,

    /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
    KAFKA_PERIODIC;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Determine the offset commit mode using several configuration values.
*
* @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
*
* @return the offset commit mode to use, based on the configuration values.
*/
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {

if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}

kafka 官方文档中,提到当 enable.auto.commit=false 时候需要手动提交 offset,也就是需要调用 consumer.commitSync(); 方法提交。

但是在 flink 中,非 checkpoint 模式下,不会调用 consumer.commitSync();, 一旦关闭自动提交,意味着 kafka 不知道当前的 consumer group 每次消费到了哪。

有checkpoint的情况

1
2
3
4
5
6
7
8
9
10
11
/**
* Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
* This overwrites whatever setting the user configured in the properties.
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}

关闭自动提交

消费模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public enum StartupMode {

/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

/** Start from the earliest offset possible. */
EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

/** Start from the latest offset. */
LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

/**
* Start from user-supplied timestamp for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
TIMESTAMP(Long.MIN_VALUE),

/**
* Start from user-supplied specific offsets for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
SPECIFIC_OFFSETS(Long.MIN_VALUE);
}

默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费

在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1

Sink 解析

  • StreamSink.processElement
  • FlinkKafkaProducer.invoke
  • at_least_once/exactly_once,FlinkKafkaProducer.snapshotState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}

开启事务