DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

End Data process

原文链接

EOMP

Exactly once msg processing,端到端一致性

把本不幂等的操作转化为幂等操作是end to end consistency的关键之一.

几个事实

  • 不存在完美的failure detector

主要是zombie process问题

liveness和safety的取舍问题,出现failure的时候,1. 认为对方还没有crash, 持续等待; 2. 认为其crash掉了, 进行failover处理;

  • zombie fencing

zombie fencing设计的关键点在于如何阻止已经“成为zombie的自己”搞乱正常的“下一代的自己”的状态;

先读后写,分发fencing token来实现阻止zombie部分

要实现上下游exact once processing,需要实现4个条件

(a. 结果高可用, b.下游去重, c. 上游可以replay, d. 记录上游进度)

利用存储结果来避免重算,

Google MillWheel做出了一个很有意思的选择,发送端完全不维护sequenceId,而是为每一个发出的message生成一个全局唯一的id,下游则需要记住”所有”见过的id来去重,但是这样会造成大量查询io和存储cost,所以需要另外的方案来解决性能和下游没有无限的存储所以”不可能记住所有id”的问题

    • 记录event处理的进度,和把计算结果存在高可用存储里的操作是一个原子操作,要么一起成功,要么一起失败;
    • 或者存储计算结果是一个幂等操作,那么可以先存计算结果,再记录event处理进度,一旦计算计算结果成功但是记录event处理进度失败,重新储存计算结果也不会引起问题。

难点在于带状态的计算

Google MillWheel(DataFlow)

    1. 每个节点维护一个用来去重的”已处理msgId”集, 从上游收到新msg之后, 检查去重 (b.下游去重)
    2. 开始计算, 所有的状态更新写在本地, 由于一个状态只有一个更新者(本计算), 所以可以在本地维护一个状态的view, 所有的更新只更新本地的view而暂时不commit到”remote的高可用DataStore”, MillWheel用的BigTable;
    3. 计算完毕后,(1).所有的要发送的计算结果,(有一些可能是在计算过程中产生并要求发送的,都会cache起来), (2)所有的state的所有更新, (3) 引发计算的msgId,会用一个atomic write写在BigTable里。 (a.要求结果高可用, d+. 要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复)
    4. 当commit成功之后, ACK上游, 而由于上游也采用commit计算结果到BigTable里的策略,且只有当自己发出的消息ACK之后, 才会允许 garbage collection回收计算结果占用的存储, 所以在收到ACK之前, 上游的计算结果, 也就是当前计算所需要的msg,都可以重发,直至本节点计算成功且commit结果 (c. 要求触发本次计算的event可以replay)
    5. 一旦计算过程中failure发生(比如机器挂了), 会在另外的host上重启本process节点,从BigTable恢复本地state和”用来去重的已处理msgId集”, 由于上次计算的结果还没有commit, 所以满足(e. state需要在replay event的时候rollback到处理event之前时的状态)
    6. 新启动的运算节点在load本地状态之前先用自己的sequencer废掉现存的sequencer, 这样BigTable就可以block zombie计算节点的写;

kafka transactional messaging

Transactional Messaging是指用户可以通过类似以下code来定义哪些对kafka topic的写属于一个transaction, 并进一步保证tx的atomic和Isolation

1
2
3
4
5
6
7
8
9
10
11
12
producer.initTransactions();
try {
// called right before sending any records
producer.beginTransaction();
//sending some messages...
// when done sending, commit the transaction
producer.commitTransaction();
} catch (Exception e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}

kafka state的所有更新

写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state); failover的时候, 之前的”本地”表丢失没关系, 可以冲change log里恢复出失败前确定commit的所有state;

计算过程中出现failure(比如机器挂了), 那么当计算重启,会重新运行initTransactions来注册tx, 此时tx coordinator会分配一个新的epoch id给此producer, 并且从此以后拒绝老的epoch id的任何commit信息来防止zombie的计算节点; tx coordinator同时roll back(如果上一个tx已经在prepare_commit状态, 继续完成transaction, 具体看下边Transactional Messaging这个章节); 如果rollback,那么input的处理进度, 状态的更改和往下游发送的信息都会rollback, 那么计算可以重新开始,就好像没有上次fail的失败一样; 如果上一个tx已经prepare_commit, 那么完成所有信息的commit; 此时当initTransactions返回,当前计算会接着上一个tx完成的进度继续计算;(e. state需要在replay event的时候rollback到处理event之前时的状态)

producer

同一消息的多次接受,对于broker部分;zombie producer带来的乱序消息问题;

  • 注册transaction.id来开始session, 而在session里此tx发来的消息都可以通过维护一个sequenceid来dedup.

  • 非正常结束tx的话, 比如机器挂了, producer重启, 那么就会再次注册自己的transaction.id, 则标志前一个session失效, 而所有属于上一个session的信息全部作废(具体看下一节Atomic and Isolation), 这样就可以做到producer的zombie fencing

重算和记录计算结果的结合

当下游crash重启需要O(n+1)时, 我们则有2种选择:

  • 1.记录O(n+1),
  • 2.不记录O(n+1)但是记住, O(n+1)是根据什么数据生成的

1.是记录计算结果, 2是重算; 两者并用的好处在于, 1可以异步batch进行而不需要节点必须等待O(n+1)记录成功才往下游发送O(n+1); 而2保证了当1还没有完成时, 系统也有足够的信息可以重建O(n+1);

spark streaming

通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系(比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据), 像这样利用checkpoint, 就可以防止lineage无限增长;

micro-batch带来的好处:

减少关系图维护的负担;

随之带来的坏处:

端到端很高的latency,叠加性的latency;

全局的一致性和状态集的维护

Flink的解法就是由一个高可用的coordinator连续发出不同的stage barrier(比如先给所有src发1,然后1分钟后发2,2分钟后发3….. 如此增长), 夹杂在发给数据源发出的数据流里, 所有的节点都必须忠实的转发这个stage barrier, 这样所有的节点的

    1. input都分为了接收到某barrier(设为barrier-a)之前的信息和收到barrier-a之后的信息,
    2. 所有的发给下游的计算结果也分为自己发出barrier-a之前的信息和发出barrier-a之后的信息;
    3. 所有的状态变迁也分为,用所有接收到barrier-a之前的信息, 所建立的状态, 和收到barrier-a之后被新的信息影响了的状态;

Flink使用RocksDB 作为本地状态储存, RocksDB本质上就是一个LSM tree, 对状态的写会写在内存的memtable, 一般是一个linked hashmap, 写到一定大小就存到硬盘里变成sstable(sorted-string-table), 不再更改; 此后会开一个新的memtable来接受新的写; 这样会按历史时间来生成很多小文件, 读的时候先读memtable,如果里边有想要的key对应的value,必定是最新的,否则按历史时间顺来查sstable(sstable有自己的cache,所以未必需要读硬盘); 对于flink来说, 当需要checkpoint的时候, 只需要把当时的memtable写在硬盘里即可, 这是唯一一个需要block住当前计算的操作, 此后也只需要把从上个checkpoint开始, 新生成的sstable异步发送到高可用的远程文件系统即可(比如S3, HDFS); 这样就做到了异步(发送到高可用datastore是异步执行的),和增量(只发送新增文件);