你好我是程序员贵哥。在 Storm的论文里我们看到Storm巧妙地利用了异或操作能够追踪消息是否在整个Topology中被处理完了做到了“至少一次At Least Once”的消息处理机制。然后在 Kafka的论文里我们又看到了Kafka通过将消息处理进度的偏移量记录在ZooKeeper中的方法使得整个消息队列非常容易重放。Kafka的消息重放机制和Storm组合就使得At Least Once的消息处理机制不再是纸上谈兵。然而我们并不会满足于“至少一次”的消息处理机制而是希望能够做到“正好一次Exactly Once”的消息处理机制。因为只有“正好一次”的消息处理机制才能使得我们计算出来的数据结果是真正正确的。而一旦需要真的实现“正好一次”的消息处理机制系统的“容错能力”就会变得非常重要。Storm的容错能力虽然比起S4已经有了一定的进步但是实际上仍然非常薄弱。所有的这些问题伴随着Kappa架构设想的出现为我们带来了新一代的流式数据处理系统。那么接下来的几节课里让我们步入现代流式数据处理系统一起看看从Google的MillWheel、Dataflow到开源的Apache Flink的系统是怎么回事儿。在这节课里我们会先看看在没有这些系统的时候在实践上使用Storm时会遇到哪些实际的问题。其实也正是由于这些问题催生了现代流式处理系统的诞生。在学完这节课之后我希望你能够理解以下这三点为什么“Exactly Once”的消息处理是困难的但又是必须的。为什么Storm的容错机制比起实际需求远远不够和MapReduce这样同样粗糙而原始的框架比起来Storm的容错机制缺失了什么。“时间窗口Time Window”是一个什么样的概念为什么这个概念对于流式数据处理系统如此重要。在理解了这些问题之后我们其实就已经开始逼近现代的流式数据处理系统了。只要能够解答好这些问题我们就会有一个全新的系统了而这样一个全新的系统究竟应该如何搭建其实就是后续我们会详细讲解的MillWheel、Dataflow以及Flink的核心知识点了。一个简单的流式数据处理系统我们先来看一看在有了Kafka和Storm之后一个实际的流式数据处理系统是怎么样的。最简单的我们就采用一个进行广告点击率计算和计费的数据处理需求。我们的日志会是这样的格式每一条日志都表示一次广告相关的日志。其中广告位ID广告客户ID广告ID标明了是哪一个广告展示在了哪一个广告位置上。比如可口可乐的新年促销广告展示在了某视频网站的开屏页就可以通过这三个字段分辨出来。事件类型这个字段用来标注这条日志是表示一次广告的展示还是一次广告的点击。用户UID用来标识是哪一个用户。这个在实践层面可以方便我们对于同一个用户在短时间内反复点击相同的广告进行去重。事件ID用来标识一个唯一的事件。在实践层面如果出现系统故障我们常常会重试保障“至少一次”的数据处理。而有了这个字段之后我们就可以在处理的时候进行去重这样我们才有可能做到“正好一次”的数据处理。最后的时间戳字段用来记录事件发生的时间。花费字段则是记录这一次点击广告需要花费广告客户多少预算。这里的日志只是一个最简化的模型。在实际的广告系统中会有上百个字段比如我们还会记录IP地址以分辨用户所在的地理位置等等。不过有了这个最简单的日志格式我们已经可以做两个最常见的广告数据的流式处理了。第一个自然是接近于实时的广告计费了。广告客户会设置当天可以花掉的广告预算我们不能让它花费超过设置的预算所以我们必须实时地统计客户的花费。第二个则是统计各个广告的点击率对于点击率太低的广告我们应该反馈给广告投放系统自动停止广告的投放。那么根据这两个需求我们就可以很容易地基于Kafka和Storm搭建起一个我们需要的流式数据处理系统。首先前端的应用服务器会把产生的广告日志发送给一个负载均衡。然后通过负载均衡均匀而随机地发送给Kafka不同的Broker服务器。下游有一个Storm集群里面有一个Topology同时完成了广告计费以及广告的点击率统计的工作。这个Topology就只有简单的两层。第一层是一个KafkaSpout它会从Kafka拉取日志然后解析并获取需要的字段并向下游的Bolt进行数据分发。KafkaSpout的每一条日志都会发送两条消息给下游两种不同的Bolt。一条发给AdsCtrBolt用来统计不同广告的点击率另一条发给ClientSpentBolt用来计算每个广告客户的花费。在向下游发送数据的时候都是采用字段分组的方式。发给AdsCtrBolt的是按照广告ID进行分组发给ClientSpentBolt的则是按照广告客户ID进行分组。这样所有相同广告的日志都会发送到同一个AdsCtrBolt里而所有相同广告客户的日志也都会发送给同一个ClientSpentBolt。AdsCtrBolt的处理逻辑很简单就是它会在内存里维护一个广告ID(展示次数点击次数广告花费)的Map。然后定时把这个表输出到外部的数据库里比如HBase或者Bigtable这样的数据库。也就是它会每分钟输出一次对应广告ID的点击率信息。ClientSpentBolt的逻辑也很简单就是它可以以更高的频率比如每秒甚至每次接收到一次广告点击就对应更新一次HBase里的广告花费数据。最后整个Storm的Topology是开启了AckerBolt的也就是我们会确保所有的消息能够至少被处理一次。“正好一次”的正确性这样一切看起来都很完美我们简单地通过KafkaStorm就有了一条可以实时计算广告花费和广告点击率的数据流水线。当然如果我们的系统非常稳定没有任何软硬件故障的时候事情也许是这样的。不过在大数据领域我们始终面临“出错”这个问题。而一旦出错我们的麻烦就来了。首先就是这个“至少一次”数据处理的特性其实已经满足不了我们实际的业务需要了。随着时间的推移我们已经把“广告计费”这样对于准确性要求很高的应用也放到流式处理系统里来。在我们这个应用场景里可能某一个ClientSpentBlot写入外部数据库的时候出现了比较高的延时。这个时候Storm的“至少一次”的处理机制会重发对应的消息。如果没有考虑这样重发的消息那么我们就会在ClientSpentBolt里面重复计算同一条日志的广告花费这就意味着我们多扣了广告客户的预算这显然是难以接受的。而如果说单条日志重发计费可能对于最终计费的影响还很小。那么如果Storm的某一个KafkaSpout出现了硬件故障挂掉了我们就可能有一大批消息会重复计费了。因为为了性能考虑我们从Kafka拉取数据不会是拉一条、处理一条然后更新一次ZooKeeper上的偏移量。特别是ZooKeeper会受不了这么大的负载它和Chubby一样是用于实现一个粗粒度的分布式锁而不是一个高性能的KV存储。所以KafkaSpout会从Kafka拉一小批数据然后发送出去等到这一小批数据发送完了并且下游都处理完了才会变更一次ZooKeeper上的偏移量。但是只要其中有一条消息在下游还没有处理完的时候KafkaSpout所在的服务器挂掉了对应的偏移量没有更新。那么在容错机制下重新启动在另一台服务器上的KafkaSpout会重新再发送一遍这一批数据。而这个时候我们就不只是重新对一条日志重复计费而是需要对一大批日志重复计费。要解决这个问题一个很直观的思路自然是对重复发送的日志或者消息进行去重。最简单的方式就是在每一个Bolt里我们维护一个这个Bolt已经处理完成的所有的message-id的集合。那么任何一条新的消息发送过来的时候我们都去这个集合里看一看这条消息是否已经处理过了就能解决这个问题了。不过让每个Bolt都保留所有处理过的message-id的集合显然会占用太多的内存了。因为在流式系统里随着时间的推移系统处理过的日志量在不断地增加message-id的集合只会越来越大。所以在工程实践上我们可以做两个优化第一个是使用BloomFilter进行去重来代替原始的数据集合。我们把所有已经处理过的message-id放到一个BloomFilter里去这样可以大大压缩我们需要的内存空间。不过使用BloomFilter会带来的副作用是我们可能会有很小的概率误算使得不是重复的消息也会被认为是重复的。第二个是把数据按照时间窗口切分成多个BloomFilter。比如我们可以设定有30个BloomFilter每个BloomFilter都只存放某一分钟的message-id。而每过一分钟我们都把30分钟前的那个BloomFilter清空。这样我们可以通过一个固定大小的内存空间确保只要是30分钟内的重复数据就不会被多次处理。因为一般来说简单的重发不太可能超过30分钟。我们可以根据系统的实际情况来设定这个对应的时间窗口。真正做到“正好一次”的数据处理是现代流式数据处理的第一个目标。计算节点迁移的容错问题BloomFilter的引入使得我们用于计算的Bolt节点其实有了“状态”。也就是说它自身已经不是一个纯粹的函数了。事实上不仅是为了做到“正好一次”的消息处理需要状态我们本身的数据处理需求就需要状态。比如我们的AdsCtrBolt里维护的那张广告ID(展示次数点击次数广告花费)的Map也就是我们在Bolt里维护的状态。不过需要维护状态又给我们带来了一个新的挑战那就是系统的容错问题。对于系统的“计算节点”的容错很容易我们只要在另外一台服务器上重新启动一个Bolt就好了。但是这个时候我们之前维护在Bolt内存里的广告ID(展示次数点击次数广告花费)的状态就已经丢失了。如果我们是每一分钟输出一次数据给HBase/Bigtable里的话这意味着我们经常会丢掉一分钟的数据。事实上不仅仅是针对容错问题我们需要考虑恢复Bolt里的状态对于系统的可扩展性我们同样需要考虑恢复Bolt里的状态。Storm的论文里我们的并行度是部署Topology的时候预先设定好的。但是这样的系统很难进行动态的扩容。如果我们的广告业务越来越红火意味着上游的日志越来越多。这个时候我们其实希望调整每一层并行度通过增加并行度使得我们系统仍然能够在线水平扩展。但是要调整并行度意味着两点第一点意味着我们会在线上增加服务器的数量有些正在运行的Bolt会被迁移Migrate到其他的服务器上去。更进一步地我们想要增加Bolt的数量。这意味着Bolt里的这个广告ID(展示次数点击次数广告花费)的状态也需要能够拆分。这个时候S4的设计反而显得更合理那就是每一个PE都对应着一个Key了。这样我们需要迁移的状态就和对应的计算函数是绑定在一起的了。Bolt会被拆分和迁移并且在迁移的过程中我们需要能够保留状态信息这意味着我们的状态需要能够持久化下来。我们需要能够把这些状态也更新到一个稳定的外部存储中去。当我们的节点挂掉在其他服务器上恢复计算能力的时候需要把这些状态信息重新读取回来。并且这个能力也使得我们去调度计算变得更容易了我们可以动态地在线上增加系统的并行度。而不是采用部署一个新系统再把老系统下线这样运维成本更高的模式。通过把各个计算节点的中间状态持久化使得系统在容错情况下仍然能够做到“正好一次”的数据处理并且能够在线上动态扩容、调度计算是现代流式数据处理的第二个目标。处理消息的时间窗口除了重复发送的消息去重Bolt的中间状态需要持久化之外其实我们前面的Topology还有一个问题没有解决好这个问题就是“时间问题”。我们在前面Storm的Topology里很简单地用一句“每分钟”输出一次广告点击率概括了AdsCtrBolt的逻辑。这个“每分钟”的时间依靠的是Storm内建的一个叫做TickTuple的机制。Storm可以在系统层面设置一个时间间隔参数根据这个参数Storm会按照固定的时间间隔向每一个Bolt和Spout发送一个特殊的TickTuple。我们的Bolt只需要每当接收到这个TickTuple的时候把当前计算出来的状态信息输出出去就好了。但是这个处理逻辑有一个问题就是我们用消息传输到AdsCtrBolt的时间替代了对应的广告曝光和点击发生的时间。也就是我们用处理时间Processing Time替代了事件时间Event Time。这样我们计算出来的点击率乃至计费信息会和实际情况有差异。而且这个差异情况在很多场景下我们是无法容忍的。一种情况是和业务需求相关比如我们的广告客户设置了广告预算都在11月份花完。那么在11月30日晚上11点59分59秒发生的广告点击实际被处理的时候很有可能已经是12月1日了。这样我们的广告客户会看到他并没有在12月份分配任何广告预算但是我们的系统却让他在12月1日有了花费这显然会引起客户的不满。另一种情况则是和我们对于日志的重放相关。无论是系统故障还是我们修改了数据分析逻辑当我们要通过重放Kafka里的日志重新计算统计数据的时候现在的逻辑会造成更大的麻烦。因为所有的日志都是在短时间内重放所以我们会把过去几小时甚至是几天的数据都统计在最近几分钟内我们的统计数据不是有一些小小的误差而是完全错误的。当然批量重放日志不是一个常见的情况。但是在硬件故障的情况下部分前端应用服务器的日志没有及时进入Kafka或者某些Kafka的Broker的部分日志没有及时进入我们的Topology则是一个常见的情况。在这样的场景下我们仍然会有大量的日志出现少则数十秒多则一两个小时的延误。这样计算出来的错误的统计数据我们仍然接受不了。一个合理的解决方案就是我们需要使用实际的事件发生的时间即Event Time来进行相应的数据统计。但是这样一来我们就面临两个新的问题。第一个问题是我们不能简单地维护广告ID(展示次数点击次数广告花费)这样一个映射关系了而是需要一个时间窗口[广告ID1(展示次数点击次数广告花费)广告ID2(展示次数点击次数广告花费) , ……]这样一个三维多层的映射关系了。第二个问题是我们很难决策什么时候应该将我们的统计结果写入到外部的数据库里。因为在上节课里我们就看到过上游发送过来的日志并不是严格按照时间排序的。一个可行的方案就要考虑很多因素比如我们要加上这样几个判断条件和因素我们需要在Bolt内部维护一个“时钟”来判断最近接收到的日志大概是在什么时间戳附近。比如我们可以用最近1000条日志的最早时间戳作为这个时间戳。当然这里的1000是一个我们自己可以设计的参数。当我们的时间窗口已经比最近的时间戳晚上一个特定的时间长度比如5分钟了。我们可以认为接下来不会再接收到这样的日志了那么我们就可以把对应的数据写入到外部的数据库里去。而如果又有一条我们确认不再应该收到的日志又传输过来了那么我们有两个选择。一个是直接忽略丢弃另一种则是从我们的数据库里读取出之前的统计信息更新之后再写回到数据库。显然要实现这些逻辑我们使用Storm现有的内置机制是做不到的。虽然我们还是可以通过像TickTuple这样的机制定时提醒我们去检查是否应该把数据从Bolt内存里维护的Map输出到外部的数据库里。但是像是维护时间窗口的映射关系、统计最近日志的时间戳这些逻辑代码我们仍然都需要自己来撰写。而我们希望的仍然是大数据应用的开发人员只需要撰写统计相关的业务逻辑代码而不需要为了容错或者考虑Kafka发送数据可能存在的延时去写大量实现容错功能的代码。我们希望能够把和时间窗口相关的以及触发数据更新到外部数据库相关的处理机制在流式处理框架中内建。而撰写流式数据处理逻辑的开发人员不需要关心这些机制和容错问题这个也就是现代流式数据处理的第三个目标。小结好了相信到这里你对于流式数据处理面临的挑战应该已经清楚了。可以看到面对这些挑战我们原本以为已经非常优秀的Storm是远远不够的。我们需要一个系统能够达成三点目标第一点是**“正好一次”的数据处理机制**要做到这一点我们需要在流式数据系统里内置一个数据去重的机制。第二点是把计算节点需要使用的“状态”信息持久化下来。这样我们才能够做到真正的容错而不是在系统出错的时候丢失一部分信息。而且这个机制也有助于我们在线扩容。第三点是我们需要把流式数据处理的时间窗口以及触发机制内置到流式处理系统里面去。这样我们就可以让我们的业务代码专注于实现业务逻辑而不是需要自己在应用代码里搞一套时间窗口的维护和触发机制。在这节课里我们已经给出了一些实践上的解决方案。但是我们并不希望自己在写Storm的Spout代码的时候写上一大堆代码来解决正好一次的数据处理、Spout中间状态的持久化以及针对时间窗口的处理逻辑。因为这些问题是流式数据处理的共性问题。我们希望能有一个流式处理系统帮助我们解决这些问题。作为应用开发人员我们仍然只需要撰写业务代码。这个也就是我们接下来会讲解的MillWheel、Dataflow以及Flink的系统会做到的事情。
- Dataflow(一):正确性、容错和时间窗口
你好我是程序员贵哥。在 Storm的论文里我们看到Storm巧妙地利用了异或操作能够追踪消息是否在整个Topology中被处理完了做到了“至少一次At Least Once”的消息处理机制。然后在 Kafka的论文里我们又看到了Kafka通过将消息处理进度的偏移量记录在ZooKeeper中的方法使得整个消息队列非常容易重放。Kafka的消息重放机制和Storm组合就使得At Least Once的消息处理机制不再是纸上谈兵。然而我们并不会满足于“至少一次”的消息处理机制而是希望能够做到“正好一次Exactly Once”的消息处理机制。因为只有“正好一次”的消息处理机制才能使得我们计算出来的数据结果是真正正确的。而一旦需要真的实现“正好一次”的消息处理机制系统的“容错能力”就会变得非常重要。Storm的容错能力虽然比起S4已经有了一定的进步但是实际上仍然非常薄弱。所有的这些问题伴随着Kappa架构设想的出现为我们带来了新一代的流式数据处理系统。那么接下来的几节课里让我们步入现代流式数据处理系统一起看看从Google的MillWheel、Dataflow到开源的Apache Flink的系统是怎么回事儿。在这节课里我们会先看看在没有这些系统的时候在实践上使用Storm时会遇到哪些实际的问题。其实也正是由于这些问题催生了现代流式处理系统的诞生。在学完这节课之后我希望你能够理解以下这三点为什么“Exactly Once”的消息处理是困难的但又是必须的。为什么Storm的容错机制比起实际需求远远不够和MapReduce这样同样粗糙而原始的框架比起来Storm的容错机制缺失了什么。“时间窗口Time Window”是一个什么样的概念为什么这个概念对于流式数据处理系统如此重要。在理解了这些问题之后我们其实就已经开始逼近现代的流式数据处理系统了。只要能够解答好这些问题我们就会有一个全新的系统了而这样一个全新的系统究竟应该如何搭建其实就是后续我们会详细讲解的MillWheel、Dataflow以及Flink的核心知识点了。一个简单的流式数据处理系统我们先来看一看在有了Kafka和Storm之后一个实际的流式数据处理系统是怎么样的。最简单的我们就采用一个进行广告点击率计算和计费的数据处理需求。我们的日志会是这样的格式每一条日志都表示一次广告相关的日志。其中广告位ID广告客户ID广告ID标明了是哪一个广告展示在了哪一个广告位置上。比如可口可乐的新年促销广告展示在了某视频网站的开屏页就可以通过这三个字段分辨出来。事件类型这个字段用来标注这条日志是表示一次广告的展示还是一次广告的点击。用户UID用来标识是哪一个用户。这个在实践层面可以方便我们对于同一个用户在短时间内反复点击相同的广告进行去重。事件ID用来标识一个唯一的事件。在实践层面如果出现系统故障我们常常会重试保障“至少一次”的数据处理。而有了这个字段之后我们就可以在处理的时候进行去重这样我们才有可能做到“正好一次”的数据处理。最后的时间戳字段用来记录事件发生的时间。花费字段则是记录这一次点击广告需要花费广告客户多少预算。这里的日志只是一个最简化的模型。在实际的广告系统中会有上百个字段比如我们还会记录IP地址以分辨用户所在的地理位置等等。不过有了这个最简单的日志格式我们已经可以做两个最常见的广告数据的流式处理了。第一个自然是接近于实时的广告计费了。广告客户会设置当天可以花掉的广告预算我们不能让它花费超过设置的预算所以我们必须实时地统计客户的花费。第二个则是统计各个广告的点击率对于点击率太低的广告我们应该反馈给广告投放系统自动停止广告的投放。那么根据这两个需求我们就可以很容易地基于Kafka和Storm搭建起一个我们需要的流式数据处理系统。首先前端的应用服务器会把产生的广告日志发送给一个负载均衡。然后通过负载均衡均匀而随机地发送给Kafka不同的Broker服务器。下游有一个Storm集群里面有一个Topology同时完成了广告计费以及广告的点击率统计的工作。这个Topology就只有简单的两层。第一层是一个KafkaSpout它会从Kafka拉取日志然后解析并获取需要的字段并向下游的Bolt进行数据分发。KafkaSpout的每一条日志都会发送两条消息给下游两种不同的Bolt。一条发给AdsCtrBolt用来统计不同广告的点击率另一条发给ClientSpentBolt用来计算每个广告客户的花费。在向下游发送数据的时候都是采用字段分组的方式。发给AdsCtrBolt的是按照广告ID进行分组发给ClientSpentBolt的则是按照广告客户ID进行分组。这样所有相同广告的日志都会发送到同一个AdsCtrBolt里而所有相同广告客户的日志也都会发送给同一个ClientSpentBolt。AdsCtrBolt的处理逻辑很简单就是它会在内存里维护一个广告ID(展示次数点击次数广告花费)的Map。然后定时把这个表输出到外部的数据库里比如HBase或者Bigtable这样的数据库。也就是它会每分钟输出一次对应广告ID的点击率信息。ClientSpentBolt的逻辑也很简单就是它可以以更高的频率比如每秒甚至每次接收到一次广告点击就对应更新一次HBase里的广告花费数据。最后整个Storm的Topology是开启了AckerBolt的也就是我们会确保所有的消息能够至少被处理一次。“正好一次”的正确性这样一切看起来都很完美我们简单地通过KafkaStorm就有了一条可以实时计算广告花费和广告点击率的数据流水线。当然如果我们的系统非常稳定没有任何软硬件故障的时候事情也许是这样的。不过在大数据领域我们始终面临“出错”这个问题。而一旦出错我们的麻烦就来了。首先就是这个“至少一次”数据处理的特性其实已经满足不了我们实际的业务需要了。随着时间的推移我们已经把“广告计费”这样对于准确性要求很高的应用也放到流式处理系统里来。在我们这个应用场景里可能某一个ClientSpentBlot写入外部数据库的时候出现了比较高的延时。这个时候Storm的“至少一次”的处理机制会重发对应的消息。如果没有考虑这样重发的消息那么我们就会在ClientSpentBolt里面重复计算同一条日志的广告花费这就意味着我们多扣了广告客户的预算这显然是难以接受的。而如果说单条日志重发计费可能对于最终计费的影响还很小。那么如果Storm的某一个KafkaSpout出现了硬件故障挂掉了我们就可能有一大批消息会重复计费了。因为为了性能考虑我们从Kafka拉取数据不会是拉一条、处理一条然后更新一次ZooKeeper上的偏移量。特别是ZooKeeper会受不了这么大的负载它和Chubby一样是用于实现一个粗粒度的分布式锁而不是一个高性能的KV存储。所以KafkaSpout会从Kafka拉一小批数据然后发送出去等到这一小批数据发送完了并且下游都处理完了才会变更一次ZooKeeper上的偏移量。但是只要其中有一条消息在下游还没有处理完的时候KafkaSpout所在的服务器挂掉了对应的偏移量没有更新。那么在容错机制下重新启动在另一台服务器上的KafkaSpout会重新再发送一遍这一批数据。而这个时候我们就不只是重新对一条日志重复计费而是需要对一大批日志重复计费。要解决这个问题一个很直观的思路自然是对重复发送的日志或者消息进行去重。最简单的方式就是在每一个Bolt里我们维护一个这个Bolt已经处理完成的所有的message-id的集合。那么任何一条新的消息发送过来的时候我们都去这个集合里看一看这条消息是否已经处理过了就能解决这个问题了。不过让每个Bolt都保留所有处理过的message-id的集合显然会占用太多的内存了。因为在流式系统里随着时间的推移系统处理过的日志量在不断地增加message-id的集合只会越来越大。所以在工程实践上我们可以做两个优化第一个是使用BloomFilter进行去重来代替原始的数据集合。我们把所有已经处理过的message-id放到一个BloomFilter里去这样可以大大压缩我们需要的内存空间。不过使用BloomFilter会带来的副作用是我们可能会有很小的概率误算使得不是重复的消息也会被认为是重复的。第二个是把数据按照时间窗口切分成多个BloomFilter。比如我们可以设定有30个BloomFilter每个BloomFilter都只存放某一分钟的message-id。而每过一分钟我们都把30分钟前的那个BloomFilter清空。这样我们可以通过一个固定大小的内存空间确保只要是30分钟内的重复数据就不会被多次处理。因为一般来说简单的重发不太可能超过30分钟。我们可以根据系统的实际情况来设定这个对应的时间窗口。真正做到“正好一次”的数据处理是现代流式数据处理的第一个目标。计算节点迁移的容错问题BloomFilter的引入使得我们用于计算的Bolt节点其实有了“状态”。也就是说它自身已经不是一个纯粹的函数了。事实上不仅是为了做到“正好一次”的消息处理需要状态我们本身的数据处理需求就需要状态。比如我们的AdsCtrBolt里维护的那张广告ID(展示次数点击次数广告花费)的Map也就是我们在Bolt里维护的状态。不过需要维护状态又给我们带来了一个新的挑战那就是系统的容错问题。对于系统的“计算节点”的容错很容易我们只要在另外一台服务器上重新启动一个Bolt就好了。但是这个时候我们之前维护在Bolt内存里的广告ID(展示次数点击次数广告花费)的状态就已经丢失了。如果我们是每一分钟输出一次数据给HBase/Bigtable里的话这意味着我们经常会丢掉一分钟的数据。事实上不仅仅是针对容错问题我们需要考虑恢复Bolt里的状态对于系统的可扩展性我们同样需要考虑恢复Bolt里的状态。Storm的论文里我们的并行度是部署Topology的时候预先设定好的。但是这样的系统很难进行动态的扩容。如果我们的广告业务越来越红火意味着上游的日志越来越多。这个时候我们其实希望调整每一层并行度通过增加并行度使得我们系统仍然能够在线水平扩展。但是要调整并行度意味着两点第一点意味着我们会在线上增加服务器的数量有些正在运行的Bolt会被迁移Migrate到其他的服务器上去。更进一步地我们想要增加Bolt的数量。这意味着Bolt里的这个广告ID(展示次数点击次数广告花费)的状态也需要能够拆分。这个时候S4的设计反而显得更合理那就是每一个PE都对应着一个Key了。这样我们需要迁移的状态就和对应的计算函数是绑定在一起的了。Bolt会被拆分和迁移并且在迁移的过程中我们需要能够保留状态信息这意味着我们的状态需要能够持久化下来。我们需要能够把这些状态也更新到一个稳定的外部存储中去。当我们的节点挂掉在其他服务器上恢复计算能力的时候需要把这些状态信息重新读取回来。并且这个能力也使得我们去调度计算变得更容易了我们可以动态地在线上增加系统的并行度。而不是采用部署一个新系统再把老系统下线这样运维成本更高的模式。通过把各个计算节点的中间状态持久化使得系统在容错情况下仍然能够做到“正好一次”的数据处理并且能够在线上动态扩容、调度计算是现代流式数据处理的第二个目标。处理消息的时间窗口除了重复发送的消息去重Bolt的中间状态需要持久化之外其实我们前面的Topology还有一个问题没有解决好这个问题就是“时间问题”。我们在前面Storm的Topology里很简单地用一句“每分钟”输出一次广告点击率概括了AdsCtrBolt的逻辑。这个“每分钟”的时间依靠的是Storm内建的一个叫做TickTuple的机制。Storm可以在系统层面设置一个时间间隔参数根据这个参数Storm会按照固定的时间间隔向每一个Bolt和Spout发送一个特殊的TickTuple。我们的Bolt只需要每当接收到这个TickTuple的时候把当前计算出来的状态信息输出出去就好了。但是这个处理逻辑有一个问题就是我们用消息传输到AdsCtrBolt的时间替代了对应的广告曝光和点击发生的时间。也就是我们用处理时间Processing Time替代了事件时间Event Time。这样我们计算出来的点击率乃至计费信息会和实际情况有差异。而且这个差异情况在很多场景下我们是无法容忍的。一种情况是和业务需求相关比如我们的广告客户设置了广告预算都在11月份花完。那么在11月30日晚上11点59分59秒发生的广告点击实际被处理的时候很有可能已经是12月1日了。这样我们的广告客户会看到他并没有在12月份分配任何广告预算但是我们的系统却让他在12月1日有了花费这显然会引起客户的不满。另一种情况则是和我们对于日志的重放相关。无论是系统故障还是我们修改了数据分析逻辑当我们要通过重放Kafka里的日志重新计算统计数据的时候现在的逻辑会造成更大的麻烦。因为所有的日志都是在短时间内重放所以我们会把过去几小时甚至是几天的数据都统计在最近几分钟内我们的统计数据不是有一些小小的误差而是完全错误的。当然批量重放日志不是一个常见的情况。但是在硬件故障的情况下部分前端应用服务器的日志没有及时进入Kafka或者某些Kafka的Broker的部分日志没有及时进入我们的Topology则是一个常见的情况。在这样的场景下我们仍然会有大量的日志出现少则数十秒多则一两个小时的延误。这样计算出来的错误的统计数据我们仍然接受不了。一个合理的解决方案就是我们需要使用实际的事件发生的时间即Event Time来进行相应的数据统计。但是这样一来我们就面临两个新的问题。第一个问题是我们不能简单地维护广告ID(展示次数点击次数广告花费)这样一个映射关系了而是需要一个时间窗口[广告ID1(展示次数点击次数广告花费)广告ID2(展示次数点击次数广告花费) , ……]这样一个三维多层的映射关系了。第二个问题是我们很难决策什么时候应该将我们的统计结果写入到外部的数据库里。因为在上节课里我们就看到过上游发送过来的日志并不是严格按照时间排序的。一个可行的方案就要考虑很多因素比如我们要加上这样几个判断条件和因素我们需要在Bolt内部维护一个“时钟”来判断最近接收到的日志大概是在什么时间戳附近。比如我们可以用最近1000条日志的最早时间戳作为这个时间戳。当然这里的1000是一个我们自己可以设计的参数。当我们的时间窗口已经比最近的时间戳晚上一个特定的时间长度比如5分钟了。我们可以认为接下来不会再接收到这样的日志了那么我们就可以把对应的数据写入到外部的数据库里去。而如果又有一条我们确认不再应该收到的日志又传输过来了那么我们有两个选择。一个是直接忽略丢弃另一种则是从我们的数据库里读取出之前的统计信息更新之后再写回到数据库。显然要实现这些逻辑我们使用Storm现有的内置机制是做不到的。虽然我们还是可以通过像TickTuple这样的机制定时提醒我们去检查是否应该把数据从Bolt内存里维护的Map输出到外部的数据库里。但是像是维护时间窗口的映射关系、统计最近日志的时间戳这些逻辑代码我们仍然都需要自己来撰写。而我们希望的仍然是大数据应用的开发人员只需要撰写统计相关的业务逻辑代码而不需要为了容错或者考虑Kafka发送数据可能存在的延时去写大量实现容错功能的代码。我们希望能够把和时间窗口相关的以及触发数据更新到外部数据库相关的处理机制在流式处理框架中内建。而撰写流式数据处理逻辑的开发人员不需要关心这些机制和容错问题这个也就是现代流式数据处理的第三个目标。小结好了相信到这里你对于流式数据处理面临的挑战应该已经清楚了。可以看到面对这些挑战我们原本以为已经非常优秀的Storm是远远不够的。我们需要一个系统能够达成三点目标第一点是**“正好一次”的数据处理机制**要做到这一点我们需要在流式数据系统里内置一个数据去重的机制。第二点是把计算节点需要使用的“状态”信息持久化下来。这样我们才能够做到真正的容错而不是在系统出错的时候丢失一部分信息。而且这个机制也有助于我们在线扩容。第三点是我们需要把流式数据处理的时间窗口以及触发机制内置到流式处理系统里面去。这样我们就可以让我们的业务代码专注于实现业务逻辑而不是需要自己在应用代码里搞一套时间窗口的维护和触发机制。在这节课里我们已经给出了一些实践上的解决方案。但是我们并不希望自己在写Storm的Spout代码的时候写上一大堆代码来解决正好一次的数据处理、Spout中间状态的持久化以及针对时间窗口的处理逻辑。因为这些问题是流式数据处理的共性问题。我们希望能有一个流式处理系统帮助我们解决这些问题。作为应用开发人员我们仍然只需要撰写业务代码。这个也就是我们接下来会讲解的MillWheel、Dataflow以及Flink的系统会做到的事情。