设计模式:责任链实现数据流风格的数据处理

数据流风格

数据流风格是软件架构中的一种风格,主要是面向数据,用于进行流式的数据处理;数据流风格的代表有管道-过滤器风格和批处理序列风格,这里主要是指管道-过滤器风格。

管道-过滤器风格就像其名字一样,是以一个个的组件连接,数据像水一样,顺序的流向到管道中,然后逐一被组件处理,最终达到目标形式。此种风格是比较适合数据治理或者进行简单的数据接入的。

场景引入

假设需要从一个topic中实时接入数据,其中的每条数据都有五个属性,分别是data_type,source_from,source_to,detail,op_time;

下面是数据处理流程的规则:

  • 如果data_type等于“unknow”,则该条数据丢弃,流程结束,否则,继续处理;
  • 判断source_from和source_to 是否均为空,如果是,则数据丢弃,结束流程,否则继续处理;
  • 如果source_from和source_to 均不为空,则判断op_time是否大于‘2023-01-01 08:09:00’,若大于,则存储到表B,流程结束;
  • 如果source_from不为空但source_to为空,则数据存储到表A,流程结束;

该场景是典型的基于规则,对数据进行处理与处置,换算为逻辑流程应该是:

Kafka 作为一个分布式流处理平台,广泛用于构建实时数据管道和流应用。它能够处理大量的数据流,具有高吞吐量、可持久化存储、容错性和扩展性等特性。这里我们以Kafka作为流数据的开始,也就是系统的输入。SpringBoot的应用也就是消费者的角色,去接入、处理kafka中的数据。

具体关于SpringBoot集成Kafka的基础,可以参考我之前的文章👇

关于SpringBoot集成Kafka-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/qq_40690073/article/details/143960276

我们直接基于SpringBoot和Kafka进行基本实现:

基于@KafkaListener注解进行消息监听,定义消息处理接口及其实现类,然后进行数据处理

定义数据类:

@Data
public class MyData {
    @JsonProperty("data_type")
    private String dataType;

    @JsonProperty("source_from")
    private String sourceFrom;

    @JsonProperty("source_to")
    private String sourceTo;

    @JsonProperty("detail")
    private String detail;

    @JsonProperty("op_time")
    private String opTime;
}

定义数据处理Service及其实现类:

public interface MessageDealService {
    void process(MyData data);
}

@Service
public class MessageDealServiceImpl implements MessageDealService {

     @Autowired
    private TableARepository tableARepository;
    
     @Autowired
    private TableBRepository tableBRepository;


    @Override
    public void process(MyData data) {
        if ("unknow".equals(data.getDataType())) {
            return; 
        }

        if (data.getSourceFrom() == null && data.getSourceTo() == null) {
            return; 
        }

        if (data.getSourceFrom() != null && data.getSourceTo() != null) {
            if (isAfter(data.getOpTime(), "2023-01-01 08:09:00")) {
                TableB tableB = new TableB();
                tableB.setDataType(data.getDataType());
                tableB.setSourceFrom(data.getSourceFrom());
                tableB.setSourceTo(data.getSourceTo());
                tableB.setDetail(data.getDetail());
                tableB.setOpTime(data.getOpTime());
                tableBRepository.save(tableB);
            }
        } else if (data.getSourceFrom() != null && data.getSourceTo() == null) {
            TableA tableA = new TableA();
            tableA.setDataType(data.getDataType());
            tableA.setSourceFrom(data.getSourceFrom());
            tableA.setDetail(data.getDetail());
            tableA.setOpTime(data.getOpTime());
            tableARepository.save(tableA);
        }
    }

    private boolean isAfter(String opTime, String threshold) throws ParseException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.parse(opTime).after(sdf.parse(threshold));
    }
}

kafka中进行监听并调用

@Component
public class KafkaConsumer {
    
    @Autowired
    private MessageDeal messageDeal;


    @KafkaListener(topics = "your-topic", groupId = "group-id")
    public void consume(String message) {
        try {
            // 解析消息
            ObjectMapper objectMapper = new ObjectMapper();
            MyData data = objectMapper.readValue(message, MyData.class);
            // 调用消息处理器
            messageDeal.process(data);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

模式改造

责任链设计模式

责任链的设计源于数据结构中的链表,从模式的定义中就能看出,它需要一串走下去,而每一个处理请求的对象,都需要记录下一个处理请求的对象,即标准的数据链表方式。

职责链模式的实现主要包含以下角色。

  • 抽象处理者(Handler)角色:定义一个处理请求的接口,包含抽象处理方法和一个后继连接。
  • 具体处理者(Concrete Handler)角色:实现抽象处理者的处理方法,判断能否处理本次请求,如果可以处理请求则处理,否则将该请求转给它的后继者。
  • 客户类(Client)角色:创建处理链,并向链头的具体处理者对象提交请求,它不关心处理细节和请求的传递过程。

责任链模式的本质是解耦请求与处理,让请求在处理链中能进行传递与被处理;理解责任链模式应当理解其模式,而不是其具体实现。责任链模式的独到之处是将其节点处理者组合成了链式结构,并允许节点自身决定是否进行请求处理或跳跃,相当于让请求流动起来。

 UML类图如下:

管道过滤器风格与责任链的结合的思路

基于责任链的模式与数据流的概念图对比:

and

  

 我们可以得出,责任链中的具体处理者(Concrete Handler)角色恰好可以充当数据流风格中的过滤器,然后基于此,我们将繁琐的if else逻辑抽象到一个个的过滤器中,然后让这些过滤器链成数据处理链,让接入的数据走入到对应的数据处理链中即可。

SpringBoot中重新实现

定义数据处理组件框架

首先,基于责任链模式进行数据处理器框架的定义:

/**
* 定义数据处理器接口
**/
interface DataStreamProcessor {
    void setNext(DataStreamProcessor nextProcessor);
    void handle(Object data);
}

/**
 * 定义数据处理器抽象类,完成基本的责任链注册机制
 * 以及预留业务扩展口
 **/
public abstract class AbstractDataStreamProcessor implements DataStreamProcessor {
    private DataStreamProcessor nextProcessor;

    @Override
    public void setNext(DataStreamProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    @Override
    public void handle(Object data) {
        AtomicBoolean flag = disposeData(data);
        if(flag.get() && null != nextProcessor){
              nextProcessor.handle(data);
        }
    }
    
    /**
     * 处理数据
     * @param data 数据
     * @return AtomicBoolean 如果返回为true,则代表继续向下处理,否则,则终止     
     */
    abstract AtomicBoolean disposeData(Object data);
    
}

使用时,则根据要处理的逻辑,继承 AbstractDataStreamProcessor 类即可,我们以data_type判断为例:

public class DataTypeFilterProcessor extends AbstractDataStreamProcessor {
    
    private boolean flag;

    @Override
    void disposeData(Object data) {
        Map<String, Object> record = (Map<String, Object>) data;
        if ("unknow".equals(record.get("data_type"))) {
            //结束处理,后续不做处理
            return new AtomicBoolean(false);
        }
        return new AtomicBoolean(false);
    }

}

除DataTypeFilterProcessor外,还需要根据其他的逻辑,新建其他的处理器类A、B、C,才能完成一个完整的链式。

public class DataStreamProcessorTest {
    public static void main(String[] args) {
        // 创建处理器实例
        UnknownTypeProcessor unknownTypeProcessor = new UnknownTypeProcessor();
        XXXXProcessorA  emptySourceProcessor = new XXXXProcessorA();
        XXXXProcessorB  sourceToProcessor = new XXXXProcessorB  ();
        XXXXProcessorC  sourceFromProcessor = new XXXXProcessorC  ();

        // 构建责任链
        unknownTypeProcessor.setNext(emptySourceProcessor);
        emptySourceProcessor.setNext(sourceToProcessor);
        sourceToProcessor.setNext(sourceFromProcessor);

        // 测试数据
        MyData data1 = new MyData();
        data1.setDataType("unknow");
        data1.setSourceFrom("source1");
        data1.setSourceTo("source2");
        data1.setDetail("detail1");
        data1.setOpTime("2023-01-02 09:10:00");
        //处理流程 
        unknownTypeProcessor.handle(data1);
}

基于SpringBoot进行处理器自动化注册

如果单纯的使用main函数调用,则是根据逻辑流程图进行一个个的链式注入,这显然无法在SpringBoot中使用,如果想在SpringBoot中使用,我们需要解决两个问题:

  • 第一,要保证我们的处理器是Spring的Bean,受Spring的上下文管理,这样才可以自由的使用@Autowired等注解完美的进行其他Service的使用;
  • 第二,最好是摒弃手动逐一注入的情况,对于所处的数据流,最好在处理器类编写的时候就可以指定。

针对以上两点需求,解决方案如下:

  • 对新建的处理器类上使用@Compoent注解即可使其成为Spring上下文管理的Bean,且可以随意依赖Spring环境中其他的Bean
  • 进行自动注入需要两个参数,一个是这个处理器需要到哪个数据处理流中,另一个是在所处的数据流中的位置,基于这两个参数就可以实现自动注册,所有需要一个注解来额外标明这两个参数
定义注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DataStream {
   
    String dataStreamName();

    int order() default 0;
}
自动化注册
   private final Map<String, List<AbstractDataStreamProcessor>> dataStreamChains = new ConcurrentHashMap<>();

    @Autowired
    public void setDataStreamProcessors(Map<String, AbstractDataStreamProcessor> processors) {
        processors.forEach((beanName, processor) -> {
            DataStream annotation = processor.getClass().getAnnotation(DataStream.class);
            if (annotation != null) {
                String dataStreamName = annotation.dataStreamName();
                int order = annotation.order();
                dataStreamChains.computeIfAbsent(dataStreamName, k -> new ArrayList<>()).add(processor);
            }
        });

        dataStreamChains.forEach((dataStreamName, processorsList) -> {
            Collections.sort(processorsList, (p1, p2) -> {
                DataStream a1 = p1.getClass().getAnnotation(DataStream.class);
                DataStream a2 = p2.getClass().getAnnotation(DataStream.class);
                return Integer.compare(a1.order(), a2.order());
            });

            // 构建责任链
            AbstractDataStreamProcessor current = null;
            for (AbstractDataStreamProcessor processor : processorsList) {
                if (current == null) {
                    current = processor;
                } else {
                    current.setNext(processor);
                    current = processor;
                }
            }
        });
    }

    @Bean
    public BeanPostProcessor beanPostProcessor() {
        return new BeanPostProcessor() {
            @Override
            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof AbstractDataStreamProcessor) {
                    Field field = ReflectionUtils.findField(bean.getClass(), "nextProcessor");
                    if (field != null) {
                        ReflectionUtils.makeAccessible(field);
                        ReflectionUtils.setField(field, bean, getNextHandler((AbstractDataStreamProcessor) bean));
                    }
                }
                return bean;
            }

            private AbstractDataStreamProcessor getNextHandler(AbstractDataStreamProcessor processor) {
                DataStream annotation = processor.getClass().getAnnotation(DataStream.class);
                if (annotation != null) {
                    String dataStreamName = annotation.dataStreamName();
                    List<AbstractDataStreamProcessor> processorsList = dataStreamChains.get(dataStreamName);
                    if (processorsList != null) {
                        int currentIndex = processorsList.indexOf(processor);
                        if (currentIndex < processorsList.size() - 1) {
                            return processorsList.get(currentIndex + 1);
                        }
                    }
                }
                return null;
            }
        };
    }

    @Bean
    public Map<String, AbstractDataStreamProcessor> dataStreamProcessorMap() {
        return dataStreamChains.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0)));
    }

改造后场景复现

基于以上设计,规避掉繁琐的if else嵌套,以Java类作为基础单元,进行数据组件化的思路去再次实现场景。

基于之前的封装,此处直接进行数据处理器的实现即可,我们先创建四个具体的处理器来处理这些规则:

  • DataTypeFilterProcessor:过滤掉 data_type 为 "unknow" 的数据。
  • SourceCheckProcessor:检查 source_from 和 source_to 是否均为空,如果是,则丢弃数据。
  • OpTimeFilterAndStoreBProcessor:如果 op_time 大于 '2023-01-01 08:09:00',则存储到表 B。
  • StoreAProcessor:如果 source_from 不为空但 source_to 为空,则存储到表 A。

以下为具体代码:

@Component
//在SpringBoot中只需要DataStream注解就可以自动地注册成为某条数据流地处理
@DataStream(dataStreamName = "default", order = 1)
public class DataTypeFilterProcessor extends AbstractDataStreamProcessor {
    

    @Override
    AtomicBoolean disposeData(Object data) {
        Map<String, Object> record = (Map<String, Object>) data;
        if ("unknow".equals(record.get("data_type"))) {
            //结束处理,后续不做处理
            return new AtomicBoolean(false);
        }
        return new AtomicBoolean(false);
    }
}

@Component
@DataStream(dataStreamName = "default", order = 2)
public class SourceCheckProcessor extends AbstractDataStreamProcessor {
    
    
    @Override
    AtomicBoolean disposeData(Object data) {
        MyData record = (Mydata) data;
        if (record.get("source_from") == null && record.get("source_to") == null) {
           //相关逻辑处理
           return new AtomicBoolean(false);
        }
    }

   
}

@Component
@DataStream(dataStreamName = "default", order = 3)
class OpTimeFilterAndStoreBProcessor extends AbstractDataStreamProcessor {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Autowired
    TableBRepository repository;

    @Override
    AtomicBoolean disposeData(Object data) {
       //相关逻辑处理
    }

    private void storeToTableB(Map<String, Object> record) {
        // 实现存储到表B的逻辑
    }

  
}

@Component
@DataStream(dataStreamName = "default", order = 4)
class StoreAProcessor extends AbstractDataStreamProcessor {
    @Override
    AtomicBoolean disposeData(Object data) {
       //相关逻辑处理
    }

  
}

该数据流使用:

@Component
public class KafkaMessageConsumer {
           
        @Autowired
        private Map<String, AbstractDataStreamProcessor> dataStreamProcessorMap;

        @KafkaListener(topics = "default", groupId = "my-group")
        public void listen(@Payload String message) {
           AbstractDataStreamProcessor processor = dataStreamProcessorMap.get("default");
           processor.handle(data);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/923722.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BERT简单理解;双向编码器优势

目录 BERT简单理解 一、BERT模型简单理解 二、BERT模型使用举例 三、BERT模型的优势 双向编码器优势 BERT简单理解 (Bidirectional Encoder Representations from Transformers)模型是一种预训练的自然语言处理(NLP)模型,由Google于2018年推出。以下是对BERT模型的简…

摄像头原始数据读取——V4L2(mmap模式,V4L2_MEMORY_MMAP)

摄像头原始数据读取——V4L2(mmap模式,V4L2_MEMORY_MMAP) 内存映射模式&#xff0c;是将设备在内核态申请的用于存储视频数据的物理内存映射到用户空间&#xff0c;使得用户应用程序可以直接访问和操作设备数据物理内存&#xff0c;避免了数据的拷贝。因此采集速度较快&#x…

SpringCloud框架学习(第五部分:SpringCloud Alibaba入门和 nacos)

目录 十二、SpringCloud Alibaba入门简介 1. 基本介绍 2.作用 3.版本选型 十三、 SpringCloud Alibaba Nacos服务注册和配置中心 1.简介 2.各种注册中心比较 3.下载安装 4.Nacos Discovery服务注册中心 &#xff08;1&#xff09; 基于 Nacos 的服务提供者 &#xf…

Ubuntu下用Docker部署群晖系统---Virtual DSM --zerotier实现连接

Ubuntu下用Docker部署群晖系统—Virtual DSM --zerotier实现连接 1. Docker 安装 安装最新docker curl -fsSL get.docker.com -o get-docker.sh sudo sh get-docker.sh sudo docker run hello-world2.docker-compose 安装 sudo pip install docker-compose测试安装是否成功…

【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法

毕业设计排版时&#xff0c;一般要求每章节的起始页为奇数页&#xff0c;空白页不显示页眉和页脚。具体做法如下&#xff1a; 1 Word 在一个章节的内容完成后&#xff0c;在【布局】中&#xff0c;点击【分隔符】&#xff0c;然后选择【奇数页】 这样在下一章节开始的时&…

241125学习日志——[CSDIY] [InternStudio] 大模型训练营 [17]

CSDIY&#xff1a;这是一个非科班学生的努力之路&#xff0c;从今天开始这个系列会长期更新&#xff0c;&#xff08;最好做到日更&#xff09;&#xff0c;我会慢慢把自己目前对CS的努力逐一上传&#xff0c;帮助那些和我一样有着梦想的玩家取得胜利&#xff01;&#xff01;&…

JVM类加载和垃圾回收算法详解

文章目录 JVM一、JVM运行流程1. JVM执行流程 二、JVM运行时数据区1. 程序计数器&#xff08;线程私有&#xff09;2. 虚拟机栈 &#xff08;线程私有&#xff09;3. 本地方法栈&#xff08;线程私有&#xff09;4. 堆&#xff08;线程共享&#xff09;5. 元空间&#xff08;线程…

1、正则表达式

grep匹配 grep用来过滤文本内容&#xff0c;以匹配要查询的结果。 grep root /etc/passwd&#xff1a;匹配包含root的行 -m 数字&#xff1a;匹配几次后停止 -v&#xff1a;取反-i&#xff1a;忽略字符的大小写&#xff0c;默认的&#xff0c;可以不加-n&#xff1a…

Java学习笔记--继承的介绍,基本使用,成员变量和成员方法访问特点

目录 一&#xff0c;继承 1.什么是继承 2.怎么去继承: 3.注意: 4.继承怎么学 二&#xff0c;继承基本使用 三&#xff0c;成员变量和成员方法访问特点 1.成员变量访问特点 1&#xff0c;子类和父类中的成员变量不重名: 总结: 2&#xff0c;子类和父类中的成员变量重…

初级数据结构——二叉树题库(c++)

这里写目录标题 前言[1.——965. 单值二叉树](https://leetcode.cn/problems/univalued-binary-tree/)[2.——222. 完全二叉树的节点个数](https://leetcode.cn/problems/count-complete-tree-nodes/)[3.——144. 二叉树的前序遍历](https://leetcode.cn/problems/binary-tree-…

redmi 12c 刷机

刷机历程 一个多月前网购了redmi 12c这款手机, 价格只有550,用来搞机再适合不过了, 拆快递后就开始倒腾,网上有人说需要等7天才能解锁,我绑定了账号过了几天又忍不住倒腾,最后发现这块手机不用等7天解锁成功了,开始我为了获取root权限, 刷入了很火的magisk,但是某一天仍然发现/…

Python 爬虫入门教程:从零构建你的第一个网络爬虫

网络爬虫是一种自动化程序&#xff0c;用于从网站抓取数据。Python 凭借其丰富的库和简单的语法&#xff0c;是构建网络爬虫的理想语言。本文将带你从零开始学习 Python 爬虫的基本知识&#xff0c;并实现一个简单的爬虫项目。 1. 什么是网络爬虫&#xff1f; 网络爬虫&#x…

计算机毕业设计Hadoop+Spark音乐推荐系统 音乐预测系统 音乐可视化大屏 音乐爬虫 HDFS hive数据仓库 机器学习 深度学习 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

JAVA题目笔记(二十)Stream流综合练习+方法引用

一、数据过滤 import java.util.*; import java.util.stream.Collectors;public class Co {public static void main(String[] args) {List<Integer> listnew ArrayList<>();Collections.addAll(list,1,2,3,4,5,6,7,8,9,10);List<Integer> newlist list.str…

Python学习34天

import random class Game: peo0 rob0 # # def __init__(self,peo,rob): # self.peopeo # self.robrob def Play(self): """ 石头剪刀布游戏&#xff0c;0代表石头&#xff0c;1代见到&#xff0c;2代表石头 …

MATLAB支持的距离度量

距离度量是用于量化两个点或样本之间差异的一种方法。在不同的领域和应用场景中&#xff0c;距离度量的选择可能会有所不同。 欧几里得距离&#xff08;Euclidean Distance&#xff09;&#xff1a;这是最直观的距离定义&#xff0c;适用于n维空间中的两点。对于二维空间中的点…

Jmeter中的测试片段和非测试原件

1&#xff09;测试片段 1--测试片段 功能特点 重用性&#xff1a;将常用的测试元素组合成一个测试片段&#xff0c;便于在多个线程组中重用。模块化&#xff1a;提高测试计划的模块化程度&#xff0c;使测试计划更易于管理和维护。灵活性&#xff1a;可以通过模块控制器灵活地…

【1.2 Getting Started--->Installation Guide】

NVIDIA TensorRT DOCS 此 NVIDIA TensorRT 10.6.0 安装指南提供安装要求、TensorRT 包中包含的内容列表以及安装 TensorRT 的分步说明。 安装指南 摘要&#xff1a; 本 NVIDIA TensorRT 10.3.0 安装指南提供了安装要求、TensorRT 软件包中包含的内容列表以及安装 TensorRT 的…

ubuntu设置程序开机自启动

文章目录 1、概述2、图形界面设置3、设置为Systemd服务 1、概述 测试环境&#xff1a;ubuntu22.04 带图形界面 实现方式1&#xff1a;通过图形界面的【启动应用程序】设置开机自启动&#xff1b; 实现方式2&#xff1a;通过配置为服务实现开机自启动。 2、图形界面设置 优点&am…

IDEA2024创建一个spingboot项目

以下是创建一个基本的 Spring Boot 项目的步骤和示例&#xff1a; 初始化一个springboot工程其实有许多方法&#xff0c;笔者这里挑了一个最快捷的方式搭建一个项目。我们直接通过官方平台&#xff08;start.spring.io&#xff09;进行配置&#xff0c;然后下载压缩包就可以获取…