流计算也能像数据库那样保证数据不丢失吗?
为何要保证流计算中的数据不丢失?
在领域,有些应用的对系统的可用性要求不那么苛刻,允许在系统异常时丢失一些数据。比如,实时推荐系统或业务质量实时监测系统因系统原因或应用程序故障而不能提供实时服务,在故障过程中流数据未进行处理就丢失了。这类场景对少量数据丢失是可容忍的,一是因为它们处理不是关键交易(不直接影响账户、交易、订单等核心数据);二是因为系统恢复后再处理“过时”数据意义已经不大,如客户已经离开特定的商圈,系统还拿“过时”的位置信息向他推荐商品会让客户体验很差。
然而,很多实时处理系统对数据的丢失是不容忍的,要求对所有数据至少处理一次或准确地处理一次。如CDR话单实时处理,实时计费等应用要求所有数据至少且至多处理一次,因为一旦出错就会造成损失。
如何做到“至少处理一次”?
传统的数据库依赖日志、事务控制、Checkpoint等技术实现数据的ACID。而Streams流计算采用了“一致区域(Consistent Region)”机制实现流数据“至少处理一次”。
什么是一致区域
在应用程序中定义的sub-graph(或叫Region),一旦有tuple流入该区域就要保证该tuple经过了所有Operator的至少一次处理,那么该区域就是“一致区域”。
什么是一致状态
在一致区域里的所有流的所有元组tuples全部经过一致区域里的operators完全处理的当前状态。
下图中的左边是由3个Operator组成的应用,tuple经过op1处理后分布输出到op2和op3,右边表示tuple提交的时间轴。根据“一致状态”的定义,第一条虚线所捕获的状态是“一致的”,即当前时刻的tuples都在3个Operators处理并提交。第二条和第三条虚线所捕获的状态是不一致的,因为在捕获每个Operator状态那个时间点,同一个tuple并未在一致区域内的完全处理完成。
一致区域的特点
- 一致区域的第一个operator必须能重发数据。
- 一致区域保证每一条数据至少被处理一次(at-least-once)。
- 一致区域里的所有operators要满足如下条件之一,则保证每个元组仅被处理一次(Exactly Once):
- 能够将自身的状态和交互的外部系统的状态重置为最后一次检查点一致状态
- 能够检查重复的元组tuples,并且不予处理
- 不管处理几次,都和第一次处理结果一样
- 支持多线程的Operator。
- 允许用户自定义一致区域的开始和结束(默认没有输出端口的Operator是一致区域的结束)。
实现原理
实现一致区域关键是定时建立该区域的一致状态。从一致区域内的operator角度来看,建立一致状态包含2个阶段:(1)排干和(2)检查点。当所有Operators完成这两个阶段,则一致区域的一致状态被成功建立。
排干/Drain - 在这个阶段,Operator排干它的内部状态和输出流。这意味着Operator已经完成处理之前收到的任何元组,排出其内部状态并提交任何Pending的元组到其内部缓冲区输出流。用通俗的话讲,这个Operator完成排干后,对之前收到的任何元组来说它已经完全解放了。下游的Operator在排干完成之前,会完全处理前面Operator发送过来的元组,直到确定一致状态。如果一个具有输入流的Operator在一致区域,它的排干阶段一定是发生在上游Operators排干完成之后和处理完所有上游排干时发过来的所有元组之后。
检查点/checkpointing - 在这个阶段,Operater序列化它的状态到检查点后端(后端可以是Redis或文件系统),这一阶段总是发生排干阶段之后。Streams runtime自动管理Operator的检查点,这意味着runtime维护多个检查点版本和自动删除无需用于失败恢复的检查点。
恢复一致状态
一旦发生异常,Streams runtime自动将一致区域恢复到最新的一致状态。任何一个Operator的状态重置一定是发生在其上游Operators已完成状态重置之后。当一致区域内的全部Operator完成重置,这时就恢复到了一致状态。之后就可以重放元组(最后确定一致状态之后的的元组),这点类似数据库的事务在rollback之后重新从第一条语句开始执行事务。
恢复一致状态的操作在这几个场景下发生:1)PE crash。2)主机crash。3)PE之间的连接中断。4)Operator发现并报告错误。5)通过人工管理介入。
关键技术 - Chandy-Lamport算法
Chandy-Lamport算法是一个经典的分布式快照记录算法。
为达到一致性状态,并且减少建立一致状态而对性能带来的影响,Streams使用Chandy-Lamport算法的变体来建立一个分布式的快照。该算法会考虑流连接的渠道状态channel state和运算符的进程状态process state永久性存储。Streams对一致性有着严格的定义:每一个提交的tuple必须被处理。因此,SPL系统并不需要保存渠道状态。这是因为它有效地强制任何渠道状态在进程状态中得到映射。
如何使用?
Streams通过操作符Operator和注释annotations得到增强,这些操作程序和注释允许定义在流处理期间不会丢失元组的区域,确保元组在一致区域至少被处理一次。
可在允许的操作符上使用 @consistent 注释定义一致区域的开头。Streams自动确定一致区域的作用域,但是您可使用 @autonomous 注释更改该区域的结束操作符。所定义的一致区域将定期建立一致状态。您的应用程序还必须具有新的Job Control Plane操作符,该操作符将协调一致区域的排干 (draining) 和重置。具体语法如下:
@consistent( //一致区域定义
trigger={periodic|operatorDriven}, //如何触发开始建立一个一致状态
period=3.0, //周期
drainTimeout=30.0, //排干超时
resetTimeout=30.0, //重置超时
maxConsecutiveResetAttempts=5) //重置重试次数
详情请咨询!
客服热线:023-66090381