Kafka(十二)Streams

目录

  • Streams
    • 1 什么式是流式处理
    • 2 流式处理的相关概念
      • 2.1 拓扑
      • 2.2 时间
        • 2.2.1 输入时间
        • 2.2.2 输出时间
      • 2.3 状态
      • 2.4 流和表
      • 2.5 时间窗口
        • 2.5.1 测试时间窗口
      • 2.6 处理保证
    • 3 流式处理设计模式
      • 3.1 单事件处理
      • 3.2 使用本地状态
      • 3.3 多阶段处理和重分区
      • 3.4 使用外部查找:流和表的连接
      • 3.5 表与表的连接
      • 3.6 流与流的连接
      • 3.7 乱序事件
      • 3.8 重新处理
      • 3.9 交互式查询
    • 4 Streams架构
      • 4.1 构建拓扑
      • 4.2 优化拓扑
      • 4.3 测试拓扑
        • 4.3.1 单元测试
        • 4.3.2 集成测试
      • 4.4 扩展拓扑
        • 4.4.1 单服务器
        • 4.4.2 多服务器
        • 4.4.3 重分区
      • 4.5 在故障中存活下来
    • 5 流式处理应用场景
      • 5.1 客户服务
      • 5.2 物联网
      • 5.3 欺诈检测
    • 6 如何选择流式处理框架
      • 6.1 不同应用类型选择不同
        • 6.1.1 数据摄取
        • 6.1.2低延迟处理
        • 6.1.3异步微服务
        • 6.1.4几近实时的数据分析
      • 6.2 非功能角度
        • 6.2.1系统的可操作性
        • 6.2.2化繁为简
        • 6.2.3API的可用性和可调试性
        • 6.2.4社区
    • 7 Streams的配置
      • processing.guarantee=at_least_once
    • 8 实例
      • 8.1 需求
      • 8.2 关于设计
      • 8.3 关于方案
      • 8.4 逻辑视图

Streams

Kafka因其可靠的消息传递能力,被很多流式处理系统作为唯一可靠的数据来源。常见的流式处理系统有:

  • Apache Storm
  • Apache Spark Streaming
  • Apache Flink
  • Apache Samza

从0.10.0版本开始,Kafka提供了一个强大的流式处理开发库:Kafka Streams。有了它,开发人员可以直接在程序中进行流式处理,无需再引入上述的外部处理框架。下面一起来进入Streams的世界。

1 什么式是流式处理

我们知道Kafka Streams是用来做流式处理的,那么首先要理解什么是流式处理。
首先,流式处理是一种编程范式,是指实时地处理一个或多个事件流/数据流

首先来比较以下三种常用的编程范式:

  1. 请求与响应
    这是延迟最小的一种范式,响应时间在亚毫秒和毫秒之间,通常也比较稳定。这种处理模式一般是阻塞的,即应用程序会向处理系统发出请求,然后等待响应。
    • 传统的Web网站都属于这种范式。例如常用的编程框架有基于Java的SpringBootWeb,基于Python的Django,Flask,基于NodeJS的Express.js等等。
    • 在数据库领域,这种范式就是联机事务处理(OLTP)。
    • 销售点(POS)系统、信用卡处理系统和基于时间的追踪系统通常都使用这种范式。
  2. 批处理
    这种范式具有高延迟和高吞吐量的特点。处理系统按照设定的时间启动处理进程,比如每天凌晨两点开始启动、每小时启动一次,等等。它会读取所有的输入数据(自上一次处理之后的所有可用数据,或者从月初开始的所有可用数据,等等),输出结果,然后等待下一次启动。处理时间从几分钟到几小时不等,并且用户看到的结果都是旧数据。
    在数据库领域,数据仓库或商业智能系统就使用了这种范式。它们每天一次性加载大量数据,然后生成报告,在下一次加载新数据之前,用户看到的都是相同的报告。这种范式通常既高效又具备规模经济效益。但近几年,为了能够更及时、高效地做出决策,企业要求在更短的时间内提供可用数据,这就给那些为探索规模经济而开发却无法提供低延迟报告的系统带来了巨大压力。
  3. 流式处理
    这种范式是连续的、非阻塞的。流式处理填补了请求与响应范式和批处理范式之间的空白。在请求与响应范式世界里,处理一个事件可能只需要2毫秒,而在批处理范式世界里,可能每天只处理一次数据,并且需要8小时才能完成。大多数业务不要求亚毫秒级的响应,但也不能等到第二天。大多数业务流程是持续进行的,只要业务报告保持更新,业务产品线应用程序能够持续响应,处理流程就可以进行下去,不一定需要毫秒级的响应。具有持续性和非阻塞特点的业务流程,比如针对可疑信用卡交易或网络发送告警、根据供应关系实时调整价格、跟踪快递包裹,都可以选择这种范式。
    常用的编程框架有Apache Kafka,Apache Storm,Apache Spark Streaming,Apache Flink,Apache Samza等等。

那么事件流/数据流又是指什么呢?
事件流是无边界事件集合的抽象表示,并且事件是有序的,不可变的,事件流是可重放的。这个简单的模型(事件流)几乎可以用来表示任何一种业务活动,比如信用卡交易、股票交易、包裹递送、流经交换机的网络事件、制造商设备传感器发出的事件、发送出去的邮件、游戏场景中的物体移动,等等。

  • 事件流是有序的
    事件的发生都有先后顺序。以金融活动事件为例,先把钱存进账户,然后再把钱花掉与先把钱花掉,然后再把钱存入账户的顺序是完全不一样的。后者会出现透支,前者则不会。这是事件流与数据库表的一个区别。

  • 不可变的数据记录
    事件一旦发生,就不能被改变。一次金融交易被取消,并不是说它消失了,相反,表示尚一个交易操作被取消的事件将被添加到事件流中。顾客向商店退货,之前的销售事实北不会消失,退货行为将被视为一个额外的事件。这是数据流与数据表之间的另一个区:可以删除和修改数据表中的记录,但这些操作都是发生在数据库中的事务,也可以将这些事务当成事件记录到事件流中。如果你熟悉数据库的二进制日志(bin log)、预写式日志(WAL)或重做日志(redo log),就应该知道,如果向数据库表插入一条记录,再将其删除,那么表中就不会包含这条记录,但重做日志中会有两个事务:插入事务和删除事务。

  • 事件流是可重放的
    这是事件流非常有价值的一个属性。我们都知道不可重放的流(流经套接字的TCP数促包通常是不可重放的),但对大多数业务应用程序来说,能够重放发生在几个月前其至几年前)的原始事件流是非常关键的。可能是为了能够使用新的分析方法纠正过土的错误,或者是为了达到审计的目的。这也是为什么我们相信Kafka能够让现代业务领域的流式处理大获成功 – Kafka可以被用来捕获和重放事件流。

事件可以很小(有时候只有几字节),也可以很大(包含很多消息头的XML消息),它们可以是完全非结构化的键-值对,可以是半结如化的JSON,也可以是结构化的Avro消息或Protobuf消息。虽然数据流经常被视为“大数据”,并且包名「每秒数百万个事件,但这里所讨论的技术同样适用(通常是更加适用)于小事件流,可能每秒甚至每分钟只有几个事件。

2 流式处理的相关概念

2.1 拓扑

一个流式处理程序包含一个或多个处理拓扑。处理拓扑其实就是处理流程的一个抽象,是一个有向无环图,从数据源开始,经过多个处理器processor(例如过滤器,计数器,分组,连接,reduce等),最终写入目标数据流。

2.2 时间

时间是流式处理中最重要的概念。因为事件流是一个无限的数据集合,并且流式处理是实时的,所以基本不可能也不需要处理所有的数据,因此大部分流式应用程序的操作都是基于时间窗口的。
流式处理一般包含以下几种时间:

  1. 事件时间
    事件时间是指事件的发生时间和消息的创建时间,比如指标的生成时间、商店里商品的出售时间、网站用户访问网页的时间,等等。在Kafka 0.10.0和更高版本中,生产者会自动在消息里添加消息的创建时间。如果这个时间戳与应用程序对事件时间的定义不一样(例如,Kafka消息是在事件发生以后基于数据库记录而创建的),那么建议将事件时间作为一个单独的字段添加到消息里,这样在后续处理事件时两个时间戳将都可用。在处理数据流时,事件时间是非常重要的。
    例如下面的例子在消息体中加入了时间戳的字段,这是在业务系统中数据产生的时间:
    	public class AgentCase implements CreatingTimestampExtractor.EventWithTimestamp {
    	    private String id;
    	    private String agentId;
    	    private String status;
    	    private String createTime; /*yyyy-MM-dd'T'HH:mm:ssZ*/
    	}
    

然后在Streams时间提取类中,提取这个时间戳,注意时区的统一。

	public class CreatingTimestampExtractor implements TimestampExtractor {
	 
	    @Override
	    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
	        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
	        try {
	            String eventTime = ((EventWithTimestamp)record.value()).getTimestamp();
	            return sdf.parse(eventTime).getTime();
	        } catch (ParseException e) {
	            return record.timestamp();
	        }
	    }
	
	    public static interface EventWithTimestamp {
	        String getTimestamp();
	    }
	}
  1. 日志追加时间
    日志追加时间是指事件到达并保存到broker的时间,也叫摄取时间。在Kafka 0.10.0和更高版本中,如果启用了自动添加时间戳的功能,或者记录是用旧版本生产者客户端生成的,并且不包含时间戳,那么broker就会在收到记录时自动添加时间戳。这个时间戳通常与流式处理没有太大关系,因为我们一般只对事件的发生时间感兴趣。如果要计算每天生产了多少台设备,就需要计算在那一天实际生产的设备数量,尽管这些事件有可能因为网络问题第二天才进入Kafka。不过,如果事件时间没有被记录下来,则也可以考虑使用日志追加时间,因为它在记录创建之后就不会发生变化,而且如果事件在数据管道中没有延迟,那么就可以将其作为事件发生的近似时间。
  2. 处理时间
    处理时间是指应用程序在收到事件之后要对其进行处理的时间。这个时间可以是在事件发生之后的几毫秒、几小时或几天。同一个事件可能会被分配不同的处理时间戳,具体取决于应用程序何时读取这个事件。即使是同一个应用程序中的两个处理线程,它们为事件分配的时间戳也可能不一样。所以,这个时间戳非常不可靠,最好避免使用它。
    例如在Processor中可以获取到处理时间:
    public class MyProcessor implements Processor<String, String> {
        private ProcessorContext context;
    
        @Override
        public void process(String key, String value) {
            // 处理输入记录
            System.out.println("Received record with timestamp: " + context.timestamp());
        }
    }
    
2.2.1 输入时间

Streams基于 TimestampExtractor 接口为事件分配时间。Streams应用程序开发人员可以使用这个接口的不同实现,既可以使用前面介绍的3种时间语义,也可以使用完全不同的时间戳,包括从事件中提取的时间戳。

public class CreatingTimestampExtractor implements TimestampExtractor {
 
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
        try {
            String eventTime = ((EventWithTimestamp)record.value()).getTimestamp();
            return sdf.parse(eventTime).getTime();
        } catch (ParseException e) {
            return record.timestamp();
        }
    }
}
2.2.2 输出时间

当Streams将输出的消息写入Kafka主题时,它会根据以下规则为每个事件分配时间戳。

  1. 如果输出记录直接映射到输入记录,那么将使用输入记录的时间戳作为输出记录的时间戳。
  2. 如果输出记录是一个聚合结果,那么将使用聚合中最大的时间戳作为输出记录的间戳。
  3. 如果输出记录是两个流的连接结果,那么将使用被连接的两个记录中较大的时间戳作为输出记录的时间戳。如果连接的是一个流和一张表,那么将使用流记录的时间戳作为输出记录的时间戳。
  4. 如果输出记录是由Streams函数(如punctuate())生成的,该函数会在一个特定的时间调度内生成数据,而不管输入是什么,那么输出记录的时间戳将取决于流式处理用程序的当前内部时间。

注意时区问题
在处理与时间相关的问题时,需要注意时区问题。整个数据管道应该使用同
一个时区,否则得到的结果会令人感到困惑。如果不可避免地要处理不同时
区的数据,那么可以在处理事件之前将它们转换成同一个时区,所以需要将
时区信息保存在记录里。

2.3 状态

如果只是单独处理每一个事件,并且没有聚合操作,那么流式处理会非常简单。
但如果处理流程中包含了多个事件,那么流式处理就会变得复杂。例如,按照类型计算事件的数量、移动平均数、合并两个流以便生成更丰富的信息流,等等。在这些情况下,只看单个事件是不够的,需要跟踪更多的信息,比如这个小时内每种类型的事件有多少个,需要连接、求和、求平均值的所有事件,等等。我们把这些信息叫作状态。

这些状态通常会被保存在应用程序的本地变量里,比如使用哈希表保存移动计数器。但是,在流式处理应用程序中,这种保存状态的方法是不可靠的,因为如果应用程序停止或发生崩溃,那么状态就会丢失,导致结果发生变化。这通常不是我们所期望的,所以要小心地持久化状态,如果应用程序重启,则要将其恢复。

流式处理通常涉及以下几种状态:

  1. 本地状态或内部状态
    这种状态只能被单个应用程序实例访问,通常通过内嵌在应用程序中的数据库来维护和管理。本地状态的优点是速度快,缺点是受可用内存的限制。所以,流式处理的很多设计模式会将数据拆分成多个子流,以便使用有限的本地状态来处理它们。
  2. 外部状态
    这种状态通过使用外部数据存储来维护和管理,通常使用NoSQL系统,比如Cassandra。外部状态的优点是几乎没有大小限制,而且可以被应用程序的多个实例甚至是不同的应用程序访问。缺点是使用额外的系统会增加延迟和复杂性,还可能对可用性造成影响,而且外部系统也存在变得不可用的可能性。大部分流式处理应用程序会尽量避免使用外部存储,或者将信息缓存在本地,减少与外部存储发生交互,以此来降低延迟,而如何维护内部状态与外部状态的一致性就成了一个问题。

2.4 流和表

表是记录的集合,每条记录都有一个主键标识,并包含了一组由模式定义的属性。表的记录是可变的(可以执行更新和删除操作)。可以通过查询表获知数据在某一时刻的状态。

与表不同,流包含了历史变更数据。流是一系列事件,每个事件就是一个变更。表表示的是世界的当前状态,是发生多个变更后的结果。可见,表和流是同一枚硬币的两面 – 世国总是在发生变化,我们有时候对导致发生变化的事件感兴趣,有时候对世界的当前状态范兴趣。如果一个系统允许通过这两种方式来看待数据,那么它就比只支持一种方式的系统更强大。

更将表转化成流,需要捕获所有对表做出的变更。要将insert事件、update事件和
dolete事件保仔到流里。大多数数据库提供了CDC(Chage Data Cpature)解决方案,有很多Kafka连接器可以将这些变更发送到Kafka,用于后续的流式处理。

要将流转化成表,需要应用流里所有的变更。这也叫作流的物化。我们需要在内存、内部业态存储或外部数据库中创建一张表,然后从头到尾遍历流里所有的事件,逐个修改状态。在完成这个过程之后,就得到了一张表,它代表了某个时间点的状态。

2.5 时间窗口

大部分针对流的操作都是基于时间窗口的,例如平均数,最大值等。两个流的连接操作也是基于时间窗口的。
确定一个时间窗口,我们需要知道如下信息:

  1. 窗口大小
    你想要计算5分钟内,还是15分钟,抑或一天的平均数?窗口越大就越平滑,但滞后
    也越多。如果价格涨了,则需要更长的时间才能看出来。
  2. 窗口移动频率(移动间隔)
    5分钟平均数可以每分钟或每秒变化一次,或者在有新事件到达时发生变化。
  3. 窗口可更新时间(宽限期)
    假设我们已经计算出了00:00和00:05之间的5分钟移动平均数,一小时后,又收到了一些事件时间为00:02的事件,那么需要更新00:00~00:05这个窗口的结果吗?或者就这么算了?理想情况下,可以定义一个时间段,在这个时间段内,事件可以被添加到与它们对应的时间片段里。可以规定如果事件延迟不超过4小时,就重新计算并更新结果,否则就忽略它们。

窗口可以与时间对齐,也就是说,如果5分钟的窗口每分钟移动一次,那么第一个分片可以是00:00-00:05,第二个分片可以是00:01-00:06。窗口也可以不与时间对齐,可以随应用程序在任意时刻启动,所以第一个分片可以是03:17~03:22。

Kafka Streams提供了以下几种窗口类型:

  1. 会话窗口(Session window):
    Streams提供了一种会话窗口,其大小是通过不活跃的时间段来定义的。开发人员会定义一个会话间隙,所有连续到达且间隙小于这个会话间隙的事件都属于同一个会话。一个大的间隙将开始一个新会话,在这个间隙之后但在下一个间隙之前到达的所有事件都属于这个会话。

    stream
      .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
      .windowedBy(SessionWindows.with(Duration.ofSeconds(10)))
    
  2. 跳跃窗口(hopping window):
    时间隔固定的窗口叫作跳跃窗口或滑动窗口。

    stream
    .groupByKey(Grouped.with(Serdes.String(),Serdes.Long()))
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(3))
    
  3. 滚动窗口(tumbling window):
    移动间隔与窗口大小相等的窗口叫作滚动窗口。

    stream
      .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    
2.5.1 测试时间窗口

其实到目前为止,我对Kafka如何实现时间窗口的细节还有几个疑问,需要解决:

  • 每一个时间窗口的开始时间?
  • 每一个时间窗口的结束时间?
  • 时间窗口的数量是多少?

下面就让我们写一个程序验证上面提到的所有关于时间和时间窗口的内容。

  1. 首先指定时间窗口类型
    这里使用跳跃时间窗口,窗口大小为4分钟,移动间隔为1分钟
    @Bean
    public KStream<Windowed<String>, Long> timeWindowTestingKStream(StreamsBuilder myKStreamBuilder) throws Exception {
        KStream<String, AgentCase> agentCases = myKStreamBuilder.stream("time-window-testing-input", Consumed.with(Serdes.String(), new JsonSerde(AgentCase.class)));
        KStream<Windowed<String>, Long> caseStatistics = agentCases
                .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(CaseState.class)))
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(4)).advanceBy(Duration.ofSeconds(60)))
                .count()
                .toStream();
        caseStatistics.print(Printed.toSysOut());
        caseStatistics.process(WindowedLongProcessor::new);
        caseStatistics.to("time-window-testing-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class, 4*60L*1000)));
        return caseStatistics;
    }
    
  2. 指定事件时间
    我向数据源主题写入10条消息,并且在消息体中指定时间戳:
    public class AgentCase implements CreatingTimestampExtractor.EventWithTimestamp {
        private String id;
        private String agentId;
        private String status;
        private String createTime; /*yyyy-MM-dd'T'HH:mm:ssZ*/
    

第一个事件的时间是2024-04-27 18:00:00,之后每隔一分钟生成一个事件,总共10个事件。
3. 自定义TimestampExtractor,指定使用上面的时间为事件时间。

public class CreatingTimestampExtractor implements TimestampExtractor {
 
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        try {
            String eventTime = ((EventWithTimestamp)record.value()).getTimestamp();
            System.out.println("Event in value: " + eventTime + "     " + TimeUtils.parse(eventTime));
            System.out.println("Logging time by producer: " + TimeUtils.format(record.timestamp()) + "     " + record.timestamp());
            return TimeUtils.parse(eventTime);
        } catch (ParseException e) {
            return record.timestamp();
        }
    }

    public static interface EventWithTimestamp {
        String getTimestamp();
    }
}

打印出事件时间如下:

	2024-04-27T18:00:00+0800
	2024-04-27T18:01:00+0800
	2024-04-27T18:02:00+0800
	2024-04-27T18:03:00+0800
	2024-04-27T18:04:00+0800
	2024-04-27T18:05:00+0800
	2024-04-27T18:06:00+0800
	2024-04-27T18:07:00+0800
	2024-04-27T18:08:00+0800
	2024-04-27T18:09:00+0800
  1. 打印出时间的日志追加时间
	Logging time by producer: 2024-04-28T10:36:38+0800
  1. 打印出事件处理时间
    添加相应的处理器:
public class WindowedLongProcessor implements Processor<Windowed<String>, Long, Object, Object> {
    @Override
    public void process(Record<Windowed<String>, Long> record) {
        System.out.println("Current stream time is: " + TimeUtils.format(this.context.currentStreamTimeMs()));
        System.out.println("Current system time is: " + TimeUtils.format(this.context.currentSystemTimeMs()));
        System.out.println("Time window is from : "
                + TimeUtils.format(record.key().window().start())
                + " to "
                + TimeUtils.format(record.key().window().end()));
    }
}

不理解stream时间为什么比系统时间快一个小时?

	Current stream time is: 2024-04-28T11:30:00+0800
	Current system time is: 2024-04-28T10:36:38+0800
  1. 打印出时间窗口的开始时间和结束时间
    可以看到10个事件,共有13个时间窗口,滑动13次。第一个时间窗口的开始时间比第一个事件时间提前了3分钟,所以窗口总数比事件总数多3个,在覆盖了最后一个事件之后,时间窗口的滑动就停止了。
	2024-04-27T17:57:00+0800 to 2024-04-27T18:01:00+0800
	2024-04-27T17:58:00+0800 to 2024-04-27T18:02:00+0800
	2024-04-27T17:59:00+0800 to 2024-04-27T18:03:00+0800
	2024-04-27T18:00:00+0800 to 2024-04-27T18:04:00+0800
	2024-04-27T18:01:00+0800 to 2024-04-27T18:05:00+0800
	2024-04-27T18:02:00+0800 to 2024-04-27T18:06:00+0800
	2024-04-27T18:03:00+0800 to 2024-04-27T18:07:00+0800
	2024-04-27T18:04:00+0800 to 2024-04-27T18:08:00+0800
	2024-04-27T18:05:00+0800 to 2024-04-27T18:09:00+0800
	2024-04-27T18:06:00+0800 to 2024-04-27T18:10:00+0800
	2024-04-27T18:07:00+0800 to 2024-04-27T18:11:00+0800
	2024-04-27T18:08:00+0800 to 2024-04-27T18:12:00+0800
	2024-04-27T18:09:00+0800 to 2024-04-27T18:13:00+0800

总后汇总一下测试结果:

时间窗口
类型跳跃窗口
大小4分钟
时间间隔1分钟
总数13
开始时间比第一个事件时间早3分钟
结束时间直到覆盖完最后一个事件,不会有没有数据的事件窗口
事件时间日志追加时间处理时间时间窗口事件数
2024-04-27T17:57:00+0800 to 2024-04-27T18:01:00+08001
2024-04-27T17:58:00+0800 to 2024-04-27T18:02:00+08002
2024-04-27T17:59:00+0800 to 2024-04-27T18:03:00+08003
2024-04-27T18:00:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:00:00+0800 to 2024-04-27T18:04:00+08004
2024-04-27T18:01:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:01:00+0800 to 2024-04-27T18:05:00+08004
2024-04-27T18:02:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:02:00+0800 to 2024-04-27T18:06:00+08004
2024-04-27T18:03:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:03:00+0800 to 2024-04-27T18:07:00+08004
2024-04-27T18:04:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:04:00+0800 to 2024-04-27T18:08:00+08004
2024-04-27T18:05:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:05:00+0800 to 2024-04-27T18:09:00+08004
2024-04-27T18:06:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:06:00+0800 to 2024-04-27T18:10:00+08004
2024-04-27T18:07:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:07:00+0800 to 2024-04-27T18:11:00+08003
2024-04-27T18:08:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:08:00+0800 to 2024-04-27T18:12:00+08002
2024-04-27T18:09:00+08002024-04-28T10:36:38+08002024-04-28T11:30:00+08002024-04-27T18:09:00+0800 to 2024-04-27T18:13:00+08001

2.6 处理保证

无论是否出现故厚,都能够一次且仅一次处理每一条记录,这是流式处理应用程序的一个关键需求。如果没有精确一次性保证,那么流式处理就不能被用在要求精确结果的场是中。Kafka支持精确一次性语义。Kafka生产者支持事务和幂续性。Streams借助Kafka的事务特性为流式处理应用程序提供精确一次性保证。使用ctreams 库的应用程序可以将processing.guarantee设置为exactly_once,以此来启用精确一次性保证。Streams 2.6或更高版本提供了更高效的精确一次性实现,它需要broker 2.5成更高版本。可以将processing.guarantee设置为exactly_once_beta,以此来启用这种更高效的实现。

3 流式处理设计模式

每一种流式处理系统都不一样,不过还是有一些基本的设计模式和解决方案,它们是解决流式处理架构常见需求的解决方案。

3.1 单事件处理

处理单个事件是流式处理最基本的模式。这种模式也叫映射(map)模式过滤器(filter)模式,因为它经常被用于过滤无用的事件或对事件进行转换。(map这个术语是从map-reduce模式中来的,在map阶段转换事件,在reduce阶段聚合事件。)

在这种模式中,应用程序会读取流中的事件,修改它们,再把它们生成到另一个流中。
单事件处理模式

  • 一个应用程序从流中读取日志消息,然后把ERROR级别的消息写到高优先级流中,把其他消息写到低优先级流中。
     KStream<String, String>[] branches = inputStream
         .branch(
             (key, value) -> value.contains("INFO"), 
             (key, value) -> value.contains("ERROR")
         );
    branches[0].to("info-level-logs", Produced.with(Serdes.String(), Serdes.String()));
    branches[1].to("error-level-logs", Produced.with(Serdes.String(), Serdes.String()));
    
  • 一个应用程序从流中读取事件,然后把事件从JSON格式转换成Avro格式。这类应用程序不需要在程序内部维护状态,因为每一个事件都是独立处理的。这也意味着,从故障或负载均衡中恢复都是非常容易的,因为不需要恢复状态,只需将事件交给另一个实例去处理即可。

这种模式可以使用一个生产者和一个消费者来实现。

3.2 使用本地状态

大多数流式处理应用程序需要用到聚合信息,特别是基于时间窗口的聚合。例如,找出每天最低和最高的股票交易价格,并计算移动平均数。

要实现这些聚合操作,需要维护流的状态。在我们的例子中,为了计算股票每天的最小价格和平均价格,需要将到当前时间为止的最小值、总和以及记录数量保存下来。
这些操作可以通过本地状态(而不是共享状态)来实现,因为示例中的每一个操作都是分组聚合操作。也就是说,我们是对各只股票进行聚合,而不是对整个股票市场进行聚合。

我们使用Kafka分区器来确保具有相同股票代码的事件总是被写入相同的分区。然后,应用程序的每个实例会从分配给自己的分区读取事件(这是Kafka的消费者保证),并维护一个股票代码子集的状态。
使用本地状态
如果流式处理应用程序包含了本地状态,那么情况就会变得非常复杂。流式处理应用程序必须解决下面这些问题。

  • 内存使用
    应用程序实例必须有可用的内存来保存本地状态。一些本地存储允许溢出到磁盘,这
    对性能有很大影响。
  • 持久化
    要确保在应用程序关闭时状态不会丢失,并且在应用程序重启后或切换到另一个应用实例时可以恢复状态。
    Streams可以很好地处理这些问题:
    1. 它使用内嵌的RocksDB将本地状态保存在内存里,同时持久化到磁盘上,以便在重启后恢复。
    2. 本地状态的变更也会被发送到Kafka主题上。如果一个Streams节点发生崩溃,那么本地状态并不会丢失,因为可以通过重新读取Kafka主题上的事件来重建本地状态。
    3. 这些Kafka主题使用了压缩日志,以确保它们不会无限量地增长,方便我们重建状态。
  • 再均衡
    有时候,分区会被重新分配给不同的消费者。在这种情况下,失去分区的实例必须把最后的状态保存起来,而获得分区的实例必须知道如何恢复到正确的状态。

3.3 多阶段处理和重分区

本地状态对按组聚合的操作起到了非常大的作用。但如果需要基于所有可用信息来获得-个结果呢?假设我们每天要公布排名“前10”的股票,也就是每天从开盘到收盘收益最高的10只股票。很显然,只是在每个应用程序实例中执行操作是不够的,因为排名前10的股票可能分布在分配给其他实例的分区中。

我们需要一个两阶段解决方案:

  1. 首先计算出每只股票当天的涨跌,这个可以在每个实例中进行,并保存本地状态。
  2. 然后将结果写到一个包含单个分区的新主题中。另一个独立的应用程序实例会读取这个分区,找出当天排名前10的股票。新主题只包含了每只股票当日的概要信息,比其他包含交易信息的主题要小很多,所以流量很小,使用单个应用程序实例就足以应付。不过,有时候需要更多的步骤才能生成结果。
    在这里插入图片描述

3.4 使用外部查找:流和表的连接

有时候,流式处理需要将外部数据和流集成在一起,比如根据保存在外部数据库中的规则来验证事务,或者将用户信息填充到点击事件流中。

要使用外部查找来实现数据填充,可以这样做:对于事件流中的每一个点击事件,从用户信息表中查找相关的用户信息,生成一个新事件,其中包含原始事件以及用户的年龄和性别信息,然后将新事件发布到另一个主题上。

这种方式最大的问题在于,外部查找会严重增加处理每条记录的延迟,通常为5-15毫秒。这在很多情况下是不可行的。另外,给外部数据存储造成的额外负担也是不可接受的-流式处理系统每秒可以处理10-50万个事件,而数据库正常情况下每秒只能处理1万个事件。这也增加了可用性方面的复杂性,因为当外部存储不可用时,应用程序需要知道该作何处理。

为了获得更好的性能和伸缩性,需要在流式处理应用程序中缓存从数据库读取的信息。不过,管理缓存也是一个大问题。例如,该如何保证缓存中的数据是最新的?如果刷新太过频繁,则仍然会对数据库造成压力,缓存也就失去了应有的作用。如果刷新不及时,那么流式处理用的就是过时的数据。

如果能够捕获数据库变更事件,并形成事件流,那么就可以让流式处理作业监听事件流,然后根据变更事件及时更新缓存。捕获数据库变更事件并形成事件流的过程叫作CDC,Connect提供了一些连接器用于执行CDC任务,并把数据库表转成变更事件流。这样我们就拥有了数据库表的私有副本,一旦数据库发生变更,我们就会收到通知,并根据变更事件更新私有副本里的数据。

这样一来,每当收到点击事件,我们就从本地缓存里查找user_id,并将其填充到点击事
件中。因为使用的是本地缓存,所以具有更强的伸缩性,而且不会影响数据库和其他使用数据库的应用程序。

之所以将这种方式叫作流和表的连接,是因为其中一个流代表了本地缓存表的变更。
表与流的连接

3.5 表与表的连接

连接两张表不是基于时间窗口,在执行连接操作时,连接的是两张表的当前状态。Streams可以实现等价连接(equi-join),也就是说,两张表具有相同的键,并且分区方式也一样,这样我们就可以在大量的应用程序实例和机器之间执行高效的连接操作。
Streams还支持两张表的外键连接(foreign-key join),即一个流或表的键与另一个流或表的任意字段连接。

3.6 流与流的连接

有时候,我们需要连接两个真实的事件流,而不是一个流和一张表。什么是“真实”的
流?本章开头说过,流是无边界的。如果用一个流来表示一张表,那么就可以忽略流的大部分历史事件,因为我们只关心表的当前状态。但是,如果要连接两个流,则要连接所有的历史事件,也就是将两个流里具有相同键和发生在相同时间窗口内的事件匹配起来。这就是为什么流和流的连接也叫作基于时间窗口的连接。
假设我们有一个用户搜索事件流和一个用户点击搜索结果事件流。我们想要匹配用户的搜索和用户对搜索结果的点击,以便知道哪个搜索的热度更高。很显然,我们需要基于搜索关键词进行匹配,而且只能匹配一个时间窗口内的事件。假设用户会在输入搜索关键词几秒之后点击搜索结果。因此,我们为每一个流保留了几秒的时间窗口,并对每个时间窗口内的事件进行匹配。
Streams 支持等价连接(equi-joins),流、查询、点击事件都是通过相同的键来进行分区的,而且这些键就是连接用的键。这样一来,user_id:42所有的点击事件都会被保存到点击主题的分区5中,而user_id:42所有的搜索事件都会被保存到搜索主题的分区5中。然后,Streams会确保这两个主题的分区5被分配给同一个任务,这样这个任务就可以看到所有与user_id:42相关的事件。Streams在内嵌的RocksDB中维护了两个主题的连接时间窗口,所以能执行连接操作。

3.7 乱序事件

无论是流式处理系统还是传统的ETL系统,处理乱序事件对它们来说都是一个挑战。物联网领域经常出现乱序事件,这也是意料之中的(参见图14-9)。例如,一个移动设备断开WiFi连接几小时,在重新连上后将几小时以来累积的事件一起发送出去。这种情况在监控网络设备时(发生故障的交换机在修复之前不会发送任何诊断数据)或在生产车间(车间的网络连接非常不可靠)里也时有发生。

要让流处理应用程序处理好这些场景,需要做到以下几点。

  • 识别乱序事件。应用程序需要检查事件的时间,并将其与当前时间对比。
  • 规定一个时间段用于重排乱序事件。例如,3小时以内的事件可以重排,但3周以外的
    事件可以直接丢弃。
  • 能够带内(in-band)重排乱序事件。这是流式处理与批处理作业的一个主要不同点。如果我们有一个每天运行的作业,一些事件在作业结束之后才到达,那么可以重新运行昨天的作业并更新事件。而在流式处理中,“重新运行昨天的作业”这种事情是不存在的,乱序事件和新到达的事件必须一起处理。
  • 能够更新结果。如果处理结果是保存到数据库中,那么可以通过put或update更新结果。如果处理结果是通过邮件发送的,则需要用到一些巧妙的方法。

有些流式处理框架(比如Google Dataflow和Kafka Streams)支持处理独立于处理时间的事件,能够处理比当前时间更晚或更早的事件。它们在本地状态里维护了多个可更新的聚合时间窗口,为开发人员提供了配置时间窗口可更新时长的能力。当然,时间窗口可更新时间越长,维护本地状态需要的内存就越大。

Streams API 通常会将聚合结果写到主题中。这些主题一般是压缩日志主题(compactedtopics),也就是说,它们只保留每个键的最新值。如果一个聚合时间窗口的结果因为晚到事件需要被更新,那么Streams会直接为这个聚合时间窗口写入一个新结果,将前一个覆盖掉。

3.8 重新处理

最后一个重要的模式是重新处理事件,它有两个变种。

  • 我们对流式处理应用程序做了改进,用它处理同一个事件流,生成新的结果流,并比较两种版本的结果,然后在某个时间点将客户端切换到新的结果流中。
  • 现有的流式处理应用程序有很多bug,修复完bug之后,我们用它重新处理事件流并计算结果。

第一种情况很简单,Kafka会将事件流保存在一个可伸缩的数据存储里很长一段时间。要使用两个版本的流式处理应用程序来生成结果,只需满足如下条件。

  • 将新版本应用程序作为一个新的消费者群组。
  • 让新版本应用程序从输入主题的第一个偏移量开始读取数据(这样它就有了属于自己的输入流事件副本)。
  • 让新版本应用程序继续处理事件,等它赶上进度时,将客户端应用程序切换到新的结果流中。

第二种情况具有一定的挑战性。它要求“重置”现有的应用程序,让它回到输入流的起始位置开始处理,同时重置本地状态(这样就不会将两个版本应用程序的处理结果混淆起来了),还可能需要清理之前的输出流。尽管Streams提供了一个用于重置应用程序状态的工具(KafkaStreams.cleanUp()),但我们的建议是,如果有条件运行两个应用程序并生成两个结果流,那么还是使用第一种方案。第一种方案更加安全,因为它可以在多个版本之间来回切换,比较不同版本的结果,而且不会造成数据丢失,也不会在清理过程中引入错误。

3.9 交互式查询

如前所述,流式处理应用程序是有状态的,并且状态可以分布在多个应用程序实例中。大多数时候,流式处理应用程序的用户会从输出主题获取处理结果,但在某些情况下也可以用更简便的办法直接从状态存储里读取结果。如果处理结果是一张表(例如,10本最畅销的图书),而结果流又是这张表的更新流,那么直接从流式处理应用程序的状态存储中读取表数据要快得多,也容易得多。

Streams为此提供了灵活的API,用于查询流式处理应用程序的状态。

4 Streams架构

4.1 构建拓扑

每个流式处理应用程序都会实现和执行一个拓扑。拓扑(在其他流式处理框架中叫作DAG,即有向无环图)是一组操作和转换的集合,事件从输入到输出都会流经这个集合。

即使是一个简单的应用程序也会有不简单的拓扑。拓扑由处理器组成,处理器是拓扑图中的节点(在图中用椭圆表示)。大部分处理器实现了一个数据操作 – 过滤、映射、聚合等,数据源处理器从主题读取数据,并将数据传给其他组件,数据池处理器从处理器接收對据,并将数据生成到主题上。拓扑总是从一个或多个数据源处理器开始,并以一个或多个数据池处理器结束。

一个典型的拓扑(Topology)如下图所示。这个数据流用来统计文本行中每一个单词出现的次数,需要过滤掉介词the。
topology-of-word-count
此拓扑的代码实现如下:

    public KStream<String, String> counts(StreamsBuilder myKStreamBuilder) {
        final Pattern pattern = Pattern.compile("\\W+");
        KStream<String, String> source = myKStreamBuilder.stream("words-count-input");
        KStream<String, String> counts = source
                .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
                .map((key, value) -> new KeyValue<String, String>(value, value))
                .filter((key, value) -> !value.equals("the"))
                .groupByKey()
                .count(Named.as("CountStore"))
                .mapValues(value -> Long.toString(value)).toStream();
        counts.print(Printed.toSysOut());
        counts.to("words-count-output");
        return counts;
    }

4.2 优化拓扑

默认情况下,在执行使用DSL API构建的应用程序时,Streams会将每个DSL方法独立映射到一个底层的等价对象。因为每个DSL方法都是独立计算的,所以错失了优化整体拓扑的机会。

Streams 应用程序的执行分为3个步骤。

  1. 通过创建KStream对象和KTable对象并对它们执行DSL操作(比如过滤和连接)来定义逻辑拓扑。
  2. 调用StreamsBuilder.build(),从逻辑拓扑生成物理拓扑。
  3. 调用KafkaStreams,start()执行拓扑,这是读取、处理和生成数据的步骤。
    在第2个步骤中,也就是从逻辑拓扑生成物理拓扑这一步,可以对执行计划进行整体优化。
    目前,Kafka只提供了一部分优化,主要与重用主题有关。可以通过将StreamsConfig.
    TOPOLOGY_OPTIMIZATION 设置成 StreamsConfig.OPTIMIZE并调用build(props)来启用这些优化。如果只调用build()但没有传入配置,则仍然无法启用优化。建议对启用了优化和没有启用优化的应用程序进行测试,比较执行时间和写入Kafka的数据量,当然,也要验证各种已知场景中的结果是否相同。

4.3 测试拓扑

一般来说,在正式运行应用程序之前,需要对它进行测试。自动化测试被认为是黄金标准,每次修改应用程序或开发库都会自动进行测试。这种可重复的评估方式可以实现快速迭代,并让问题诊断变得更容易。

我们也想对我们的Streams应用程序进行同样的测试。除了自动化端到端测试(使用生成的数据在staging环境中运行流式处理应用程序),我们还想进行更快、更轻量级且更容易调试的单元测试和集成测试。

4.3.1 单元测试

Streams应用程序的主要测试工具是TopologyTestDriver。这些测试看起来就像是普通的单元测试。我们定义输入数据,将其生成到模拟输入主题,然后用这个工具运行拓扑,从模拟输出主题读取结果,并将结果与期望值进行对比。

建议使用TopologyTestDriver来测试流式处理应用程序,但因为没有模拟Streams的缓存行为(未在本书中提及的一种优化),所以有一类错误它无法检测到。

4.3.2 集成测试

对Streams来说,有两个流行的集成测试框架:
EmbeddedKafkaCluster 和Testcontainers。前者的broker和测试代码运行在同一个JVM中,后者的broker运行Docker容器中(还有其他需要用到的组件)。建议使用Testcontainer,因为它使用了Docker,可以将Kafka、依赖项和要用到的资源与要测试的应用程序完全隔离开。

4.4 扩展拓扑

Streams的伸缩方式有两种,一种是在一个应用程序实例中运行多个线程,一种是在分
布式实例之间进行负载均衡。我们可以在一台机器上使用多线程或在多台机器上运行Streams应用程序,不管是哪一种,应用程序中的所有活动线程都将均衡地分摊数据处理工作。

4.4.1 单服务器

Streams引擎会将拓扑分为多个并行执行的任务。任务数由Streams引擎决定,也取决于应用程序处理的主题有多少个分区。每个任务负责处理一部分分区:它们会订阅这些分区并从分区中读取事件。对于所读取的每一个事件,任务都会在将最终结果写入目标主题之前按顺序执行所有的处理步骤。这些任务是Streams最基本的并行执行单元,每个任务都可以独立执行。

应用程序开发人员可以选择每个应用程序使用的线程数。如果有多个线程可用,则每个线程将执行一部分任务。

4.4.2 多服务器

如果有多个应用程序实例运行在多台服务器上,那么每台服务器上的每一个线程都将执行一部分不同的任务。这就是流式处理应用程序的伸缩方式:主题有多少分区,就有多少个任务。如果想处理得更快,则可以添加更多的线程。如果一台服务器的资源被耗尽,就在另一台服务器上启动另一个应用程序实例。Kafka会自动协调任务,也就是说,它会为每个任务分配属于它们的分区,让每个任务独自处理属于自己的分区,并在必要时维护与聚合相关的本地状态。

4.4.3 重分区

有时候,一个处理步骤可能需要来自多个分区的输入,这会导致任务之间产生依赖关系。

如果要连接两个流,就像14.4.3节在“填充点击事件流”示例中所做的那样,就需要从每
个流中读取一个分区的数据才能生成结果。Streams会将连接所需的所有分区分配给同一个任务,这样任务就可以读取所有相关分区并独立执行连接操作。这就是为什么Streams要求所有参与连接操作的主题都要有相同的分区数,并基于连接所使用的键进行分区。

应用程序重新分区也会导致任务间产生依赖关系。例如,在“填充点击事件流”示例中,
所有事件都使用用户ID作为键。如果想基于页面或邮政编码生成统计信息该怎么办?

Streams需要用邮政编码对数据进行重新分区,并对新分区执行聚合操作。假设任务1在处理分区1的数据,这时遇到一个对数据进行重新分区(groupBy操作)的处理器,它需要shuffle数据,或者把数据发送给其他任务。与其他流式处理框架不同,Streams会将事件写到新主题上,并使用新的键和分区,以此来实现重新分区。然后,另一组新任务会从新主题上读取和处理事件。重分区会将拓扑拆分成两个子拓扑,每个子拓扑都有自己的任务集。第二个任务集依赖于第一个任务集,因为它们处理的是第一个子拓扑的结果。不过,这两组任务仍然可以独立并行执行,因为第一个任务集会按照自己的速率将数据写到一个主题上,第二个任务集也会按照自己的速率从这个主题读取和处理数据。两个任务集之间不需要通信,不共享资源,也不需要运行在相同的线程或服务器上。这是Kafka提供的最有用的特性之一-减少管道各个部分之间的依赖。

4.5 在故障中存活下来

Streams的扩展模型不仅允许扩展应用,还能让我们优雅地处理故障。首先,Kafka是高可用的,所以保存在Kafka中的数据也是高可用的。如果应用程序发生故障需要重启,那么可以从Kafka中找到上一次处理的数据在流中的位置,并从这个位置继续处理。如果本地状态丢失(比如可能需要替换服务器),则应用程序可以从保存在Kafka中的变更日志重新创建本地状态。

Streams还利用了消费者的协调机制来实现任务高可用性。如果一个任务失败,那么只要还有其他活跃的线程或应用程序实例,就可以用另一个线程来重启这个任务。这类似于消费者群组的故障处理:如果一个消费者失效,就把分区分配给其他活跃的消费者。Kafka消费者群组协调协议的改进也让Streams 锦上添花,比如固定群组成员关系和协作再平衡(参见第4章),以及对Kafka精确一次性语义的改进(参见第8章)。

虽然这里所说的高可用性方法在理论上是可行的,但在现实中存在一定的复杂性,其中一个比较重要的问题是恢复速度。当一个线程开始继续处理另一个故障线程留给它的任务时,它需要先恢复状态(比如当前的聚合窗口)。这通常是通过重新读取内部Kafka主题来实现的。在恢复状态期间,流式处理作业将暂停处理数据,从而导致可用性降低和数据过时。

因此,缩短恢复时间往往就变成了缩短恢复状态所需的时间。这里的关键在于要确保所有的Streams主题都是主动压实的 – 减小min.compaction.lag.ms的值,并将日志片段大小设置为100 MB,而不是默认的1GB(需要注意的是,每个分区的最后一片段,也就是活跃片段,不会被压实)。

为了让恢复进行得更快一些,建议使用任务备用副本(standby replica)。这些任务是活跃的影子任务,会在其他服务器上保留当前状态。当发生故障转移时,它们已经拥有最新的状态,并准备好在几乎不停机的情况下继续处理数据。

5 流式处理应用场景

如果想快速处理事件,而不是为处理一个批次数据等上几小时,但又不要求毫秒级的响应,那么流式小理(或持续处理)是最好的选择。

下面列出了一些流式处理的真实应用场景。

5.1 客户服务

假设我们刚刚向一家大型连锁酒店预订了一个房间,并希望收到电子邮件确认和收据。但是,在预订了几分钟之后我们还没有收到确认邮件,于是打电话向客服确认。客服的回复是:“我在我们的系统中看不到订单,将数据从预订系统加载到客服系统的批处理作业每天只运行一次,所以请明天再打电话过来。你应该可以在2~3个工作日之后收到确认邮件。”这样的服务质量有点儿糟糕,而我们已经不止一次在一家大型连锁酒店遭遇过类似的问题。我们希望连锁酒店的每一个系统在预订之后的几秒或几分钟之内都能发出通知,包括客服中心、酒店、发送确认邮件的系统、网站等。我们还希望客服中心能够立即拉取到我们在这家连锁酒店的历史入住数据,知道我们是忠实顾客,从而为我们升级服务。如果用流式处理应用程序来构建这些系统,它们就可以几近实时地接收和处理事件,带来更好的用户体验。有了这样的系统,顾客就可以在几分钟之内收到确认邮件,并及时从信用卡中扣除费用,然后发送票据,服务台就可以马上回答有关房间预订情况的问题了。

5.2 物联网

物联网包含了很多东西,从可调节温度和订购洗衣剂的家居设备到制药行业的实时质量监控设备。流式处理在这方面最为常见的应用是预测何时该进行设备维护。这与应用程序监控有点儿相似,只是监控的对象是硬件,这在很多行业中很常见,包括制造业、通信(识别故障基站)、有线电视(在用户投诉之前识别出故障机顶盒)等。每一种场景都有自己的模式,但目标是一样的,即处理大量来自设备的事件,并识别出故障设备的模式,比如交换机丢包、制造过程中需要更大的力气来拧紧螺丝,或者用户频繁重启有线电视机顶盒。

5.3 欺诈检测

欺诈检测也叫异常检测,是一个非常广泛的领域,专注于捕获系统中的“作弊者”或不良分子。欺诈检测的应用包括信用卡欺诈检测、股票交易欺诈检测、视频游戏欺诈检测和网络安全风险。在这些欺诈行为造成大规模破坏之前,越早识别出它们越好。一个几近实时的可以快速对事件做出响应(比如停止一个还没有通过审核的交易)的系统比在3天之后才能检测出欺诈行为的批处理系统要好得多。这也是一个在大规模事件流中识别模式的问题。

在网络安全领域,有一个被称为发信标(beacon)的欺诈手法。黑客在组织内部植入恶意软件,恶意软件会时不时地连接到外部网络接收命令。由于恶意软件可以在任意时间以任意频率接收命令,因此很难被检测到。通常,网络可以抵挡来自外部的攻击,但难以阻止从内部到外部的突围。通过处理大量的网络连接事件流,识别出不正常的通信模式(例如,检测出主机访问了平常不经常访问的某些IP地址),我们可以在蒙受更大的损失之前向安全组织发出告警。

6 如何选择流式处理框架

在选择流式处理框架时,需要着重考虑应用程序的类型。不同类型的应用程序需要不同的流式处理解决方案。

6.1 不同应用类型选择不同

6.1.1 数据摄取

数据摄取的目的是将数据从一个系统移动到另一个系统,并在传输过程中对数据做一些修改,使其更适用于目标系统。

如果你要解决数据摄取问题,那么就要考虑是需要流式处理系统还是更简单的专注于数据摄取的系统,比如Connect。如果你确定需要流式处理系统,那么就要确保它和目标系统都有可用的连接器。

6.1.2低延迟处理

任何要求立即得到响应的应用程序。有些欺诈检测系统就属于这一类。

如果你要进行低延迟处理,那么就要考虑是否一定要使用流。一般来说,请求与响应模式更适合用来处理这种任务。如果你确定需要流式处理系统,那么就选择一种支持逐事件低延迟模型而不是微批次模型的流式处理系统。

6.1.3异步微服务

这些微服务负责执行大型业务流程中的一些简单的操作,比如更新库存信息。这些应用程序需要通过维护本地状态缓存来提升性能。

如果你要构建异步微服务,那么就需要可以与消息总线(希望是Kafka)集成的流式处理系统,它应该具备变更捕获能力,可以将上游的变更更新到微服务的本地缓存里,并且支持本地存储,可以作为微服务数据的缓存和物化视图。

6.1.4几近实时的数据分析

这些流式应用程序通过执行复杂的聚合和连接操作来对数据进行切分,并生成有趣的业务见解。

如果你要构建复杂的数据分析引擎,那么就需要支持本地存储的流式处理系统,不过这次不是为了本地缓存和物化视图,而是为了支持高级聚合、时间窗口和连接,因为如果没有本地存储,就很难实现这些特性。流式处理系统的API需要支持自定义聚合、时间窗口操作和多种连接类型。

6.2 非功能角度

除了具体的应用场景,还需要从非功能性角度考虑如下事项。

6.2.1系统的可操作性
  • 是否容易部署
  • 是否容易监控和调试
  • 是否容易扩展
  • 是否能够与已有的基础设施集成
  • 如果出现错误需要重新处理数据该怎么办
6.2.2化繁为简

大部分系统声标它们支持基于时间窗口的高级聚合和本地缓存,但问题是,它们够简单吗?它们是带你处理了伸缩和故障恢复方面的问题,还是只提供了脆弱的抽象并让你自己处理剩下的脏活?系统提供的API越简洁,封装的细节越多,开发人员的效率就越高。

6.2.3API的可用性和可调试性

用同一种框架的不同版本开发高质量的应用程序所耗费的时间可能千差万别。因为开发时间和发布时机太重要了,所以需要选择一个高效的系统。

6.2.4社区

大部分流式处理框架是开源的。对开源软件来说,一个充满生机的社区是不可替代的。好的社区意味着用户可以定期获得新的功能特性,而且质量相对较高(没有人会使用糟糕的软件),bug可以很快地得到修复,用户的问题可以及时得到解答。如果你遇到一个奇怪的问题并在谷歌上搜索,那么可以搜索到相关的信息,因为其他人也在使用这个系统,并遇到过同样的问题。

7 Streams的配置

processing.guarantee=at_least_once

at_least_once, exactly_once, exactly_once_beta, exactly_once_v2

To be completed…

8 实例

8.1 需求

笔者的公司最近在做一个项目,需求大概是这样的:
客户呼叫中心有一个大屏幕,上面显示了一些实时的统计数据,从不同的维度统计团队的绩效,统计的时长是过去的12小时。

  • 客服员工的绩效指标,例如坐席时间(在线多久,离开多久),处理了多少个客户案例,正在处理的案例数,平均处理时间等等。
  • 不同客服团队的绩效指标。
  • 不同技能团队的绩效指标。

老的大屏幕非常简陋,分别显示了几个旧系统,数据都是孤立的,并且无法自动刷新。所以需要做一个新的大屏系统(Dashboard App),支持统计数据的实时刷新。

8.2 关于设计

团队一开始的设计时使用Oracle来存储数据,并使用SQL查询来进行数据统计。后来考虑到数量相对较大,数据插入和查询都很频繁,并且数据没有归档要求,遂考虑采用Kafka Streams来进行流式处理。

8.3 关于方案

下面以其中一条数据流为例,对解决方案进行说明:

  1. 数据从CRM系统中,通过Kafka Connect同步到Kafka中
  2. 需要创建5个主题
    • agent-profiles,3分区3副本
    • agnet-cases,3分区3副本
    • agent-presences,3分区3副本
    • agent-statistics,1分区3副本
    • agent-dashboard,1分区3副本
  3. agent-profiles和agnet-cases,agent-presences分别做表和流的左连接
  4. agnet-cases,agent-presences再做流和流的左连接
  5. 之后再针对每一个Agent数据做聚合,放入主题agent-statistics。
  6. 时间窗口为滚动窗口,大小为12小时。
  7. 将所有的Agent最新聚合数据在聚合到一条消息中,放入主题agent-dashboard
  8. 最用用Connect将数据同步到Mongo
  9. Dashboard App定时从Mongo中查询数据,显示在大屏幕上。

8.4 逻辑视图

整个逻辑视图如下:
agent-dashboard-solution-logic-view

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/581259.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阿里云操作日记

昨天买了一个超级便宜的阿里云服务器&#xff0c;2核2G&#xff0c;3M固定带宽&#xff0c;40G ESSD Entry云盘&#xff0c;搭载一个简单的系统&#xff0c;就想到了docker轻量级&#xff0c;易于管理 其实docker很好用&#xff0c;第一步就是安装docker 一、docker安装与端口…

Python快速入门1数据类型(需要具有编程基础)

数据类型&#xff1a; Python 3.0版本中常见的数据类型有六种&#xff1a; 不可变数据类型可变数据类型Number&#xff08;数字&#xff09;List&#xff08;列表&#xff09;String&#xff08;字符串&#xff09;Dictionary&#xff08;字典&#xff09;Tuple&#xff08;元…

python学习笔记----循环语句(四)

一、while循环 为什么学习循环语句 循环在程序中同判断一样&#xff0c;也是广泛存在的&#xff0c;是非常多功能实现的基础&#xff1a; 1.1 while循环语法 while 条件表达式:# 循环体# 执行代码这里&#xff0c;“条件表达式”是每次循环开始前都会评估的表达式。如果条件…

张大哥笔记:我付钱了,我就是大爷?

很抱歉用这个当做标题&#xff0c;来给大家分享一些电商的故事&#xff01;大家好&#xff0c;我是张大哥&#xff0c;今天聊聊在电商路上遇到过的奇葩买家&#xff1f; 比如最近我在做PDD的时候&#xff0c;就会遇到很多莫名其妙的sha子&#xff0c;咱是知识份子&#xff0c;肯…

漏洞扫扫工具合集

综合类扫描工具 AWVS Acunetix一款商业的Web漏洞扫描程序&#xff0c;它可以检查Web应用程序中的漏洞&#xff0c;如SQL注入、跨站脚本攻击、身份验证页上的弱口令长度等。它拥有一个操作方便的图形用户界面&#xff0c;并且能够创建专业级的Web站点安全审核报告。新版本集成了…

LeetCode1017题:负二进制转换(原创)

【题目描述】 给你一个整数 n &#xff0c;以二进制字符串的形式返回该整数的 负二进制&#xff08;base -2&#xff09;表示。注意&#xff0c;除非字符串就是 "0"&#xff0c;否则返回的字符串中不能含有前导零。 示例 1&#xff1a; 输入&#xff1a;n 2 输出&…

高频面试题:解决Spring框架中的循环依赖问题

引言&#xff1a;什么是Spring框架与循环依赖&#xff1f; 在Spring框架中&#xff0c;循环依赖是指两个或多个bean相互依赖对方以完成自己的初始化。这种依赖关系形成了一个闭环&#xff0c;导致无法顺利完成依赖注入。比如&#xff0c;如果Bean A在其构造函数中需要Bean B&a…

图像处理:乘法滤波器(Multiplying Filter)和逆FFT位移

一、乘法滤波器&#xff08;Multiplying Filter&#xff09; 乘法滤波器是一种以像素值为权重的滤波器&#xff0c;它通过将滤波器的权重与图像的像素值相乘&#xff0c;来获得滤波后的像素值。具体地&#xff0c;假设乘法滤波器的权重为h(i,j)&#xff0c;图像的像素值为f(m,…

基于51单片机的电子秤LCD1602液晶显示( proteus仿真+程序+设计报告+讲解视频)

基于51单片机电子秤LCD显示 1. 主要功能&#xff1a;2. 讲解视频&#xff1a;3. 仿真设计4. 程序代码5. 设计报告6. 设计资料内容清单&&下载链接 基于51单片机电子秤LCD显示( proteus仿真程序设计报告讲解视频&#xff09; 仿真图proteus8.9及以上 程序编译器&#xf…

NiceGUI:一个超赞的Python UI库

1. 引言 NiceGUI是一个基于Python的简单用户界面框架&#xff0c;可与浏览器或桌面应用程序流畅运行。无论你是制作小型网络应用程序、还是玩机器人项目&#xff0c;NiceGUI 都能以其简单的界面和众多的功能满足你的需求。这篇文章的目的是通过向大家展示如何构建和部署NiceGU…

如何选择适合自己需求的DC电源模块?

BOSHIDA 如何选择适合自己需求的DC电源模块&#xff1f; 在选择适合自己需求的DC电源模块时&#xff0c;需要考虑一些关键因素&#xff0c;以确保选择的模块能够满足电源要求并具有良好的性能。下面是一些值得考虑的因素&#xff1a; 1. 电压输出范围&#xff1a;首先&#xf…

短视频素材从哪里获取?推荐8个短视频素材高清网站

在这个视觉内容至关重要的数字化时代&#xff0c;高质量的视频素材是任何成功视频项目的核心。无论是加强品牌宣传、提升社交媒体互动还是制作引人注目的广告&#xff0c;这些精选的全球视频素材网站都将为你的创意注入活力&#xff0c;帮助你在激烈的市场竞争中脱颖而出。 1.…

2024LarkXR新增功能系列之六 | ⽀持8K分辨率

Paraverse平行云企业级实时云渲染解决方案LarkXR是平行云自主研发的CloudXR解决方案&#xff0c;在业界实现了创新突破。通过分钟级部署大规模云端资源、高度适配XR所有主流引擎、以及灵活支持不同交互和沉浸方式的内容形式&#xff0c;LarkXR解决了Cloud XR商业化过程中所面临…

Linux之进程间通信(二)

system V system V共享内存是内核中专门设计的通信的方式, 粗粒度划分操作系统分为进程管理, 内存管理, 文件系统, 驱动管理.., 粒度更细地分还有 进程间通信模块. 对于操作系统, 通信的场景有很多, 有以传送数据, 快速传送数据, 传送特定数据块, 进程间协同与控制以目的, 它…

SystemUI GlobalActions plugin解析

com.android.systemui.action.PLUGIN_GLOBAL_ACTIONS 系统的默认实现为GlobalActionsImpl: 是谁发送了showShutdownUi指令&#xff1f; GlobalActionsImpl 是通过inject的方式创建的 GlobalActionsComponent是一个system UI services&#xff0c;配置在config.xml中&#xff…

Docker容器:网络模式与资源控制

目录 一、Docker 网络模式 1、Docker 网络实现原理 2、Docker 网络模式概述 2.1 Host 模式 2.2 Container 模式 2.3 None 模式 2.4 Bridge 模式 2.5 自定义网络&#xff08;user-defined network&#xff09; 3、配置 docker 网络模式 3.1 查看网络基础命令 3.1.1 查…

“怡宝”冲刺港股,饮用水基本盘稳如磐石

最近&#xff0c;饮用水市场异常热闹。 先是“怡宝”所属的华润饮料正式向港交所提交上市申请。随即&#xff0c;多名农夫山泉员工在朋友圈发文“推出绿瓶纯净水”&#xff0c;撞脸怡宝经典包装。“怡宝”遭遇奇袭的背后&#xff0c;是双方持续“交锋”的多年&#xff0c;随着…

Vue从入门到精通-01-Vue的介绍和vue-cli

MVVM模式 Model&#xff1a;负责数据存储 View&#xff1a;负责页面展示 View Model&#xff1a;负责业务逻辑处理&#xff08;比如Ajax请求等&#xff09;&#xff0c;对数据进行加工后交给视图展示 关于框架 为什么要学习流行框架 1、企业为了提高开发效率&#xff1a;…

【Harmony3.1/4.0】笔记三-计算器

概念 网格布局是由“行”和“列”分割的单元格所组成&#xff0c;通过指定“项目”所在的单元格做出各种各样的布局。网格布局具有较强的页面均分能力&#xff0c;子组件占比控制能力&#xff0c;是一种重要自适应布局&#xff0c;其使用场景有九宫格图片展示、日历、计算器等…

python-pytorch 如何使用python库Netron查看模型结构(以pytorch官网模型为例)0.9.2

Netron查看模型结构 参照模型安装Netron写netron代码运行查看结果需要关注的地方 2024年4月27日14:32:30----0.9.2 参照模型 以pytorch官网的tutorial为观察对象&#xff0c;链接是https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html 模型代…