【Spring底层原理高级进阶】Spring Batch清洗和转换数据,一键处理繁杂数据!Spring Batch是如何实现IO流优化的?本文详解!

🎉🎉欢迎光临,终于等到你啦🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟持续更新的专栏《Spring 狂野之旅:从入门到入魔》 🚀

本专栏带你从Spring入门到入魔 

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/


Spring Batch的应用场景和作用

批处理是企业级业务系统不可或缺的一部分,spring batch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务.SpringBatch基于POJO和Spring框架,相当容易上手使用,让开发者很容易地访问和利用企业级服务.spring batch具有高可扩展性的框架,简单的批处理,复杂的大数据批处理作业都可以通过SpringBatch框架来实现。

先来个例子

假设一家电商公司,每天从不同渠道收集大量的销售数据。这些数据包含了各种商品的销售记录,但是格式和质量可能不一致。您希望将这些销售数据进行清洗和转换,以便进行后续的分析和报告生成。

使用Spring Batch,可以创建一个批处理作业来处理销售数据。作业的步骤可以包括从不同渠道读取销售数据,对数据进行清洗和转换,例如去除无效数据、修复格式错误、计算额外的指标等。然后,将清洗和转换后的数据写入数据库,以备后续的分析和报告生成使用。

先来介绍其架构

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

为什么它能够如此优秀?

Chunk 的中文意思是:大块、厚块;大部分,大量。Chunk 在Spring Batch 中就是“批量操作”的概念的抽象。它本身是一个类,这个类就是用来将原本的单条操作改成批量进行。
在Spring Batch 中就提出了chunk 的概念。首先我们设定一个chunk 的size,随后Spring Batch 一条条地区处理数据,但是到ItemWriter 阶段,Spirng Batch 不会选择立刻将数据提交到数据库,只有在处理的数据累积数量达到了之前设置的chunk 的size 之后,才会进行提交操作。

 

实战详细操作

引入 依赖

首先,引Spring Batch的依赖项。在Maven项目中,在pom.xml文件中添加以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

创建一个Spring配置文件(例如batch-config.xml),并配置Spring Batch的相关组件和属性。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <!-- 数据源配置 -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url" value="jdbc:mysql://localhost:3306/mydb" />
        <property name="username" value="root" />
        <property name="password" value="password" />
    </bean>

    <!-- JobRepository配置 -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseType" value="mysql" />
    </bean>

    <!-- 并发任务执行器配置 -->
    <task:executor id="taskExecutor" pool-size="10" />

    <!-- 事务管理器配置 -->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <!-- JobLauncher配置 -->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="taskExecutor" />
    </bean>

</beans>

定义数据模型:


根据需求,定义需要清洗和转换的数据模型。例如,假设数据模型是一个简单的用户对象,包含id、姓名和年龄字段。创建一个名为User的Java类,如下所示:

public class User {
    private Long id;
    private String name;
    private Integer age;

    // 省略构造函数、getter和setter方法
}

创建ItemReader:


创建一个实现ItemReader接口的自定义类,用于从数据源中读取数据。以下是一个读取数据库中用户数据的示例:

public class UserItemReader implements ItemReader<User> {

    private JdbcTemplate jdbcTemplate;
    private int rowCount = 0;
    private int currentRow = 0;

    public UserItemReader(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public User read() throws Exception {
        if (currentRow < rowCount) {
            String sql = "SELECT id, name, age FROM users LIMIT ?, 1";
            User user = jdbcTemplate.queryForObject(sql, new Object[]{currentRow}, (rs, rowNum) -> {
                User u = new User();
                u.setId(rs.getLong("id"));
                u.setName(rs.getString("name"));
                u.setAge(rs.getInt("age"));
                return u;
            });
            currentRow++;
            return user;
        } else {
            return null;
        }
    }

    public void setRowCount(int rowCount) {
        this.rowCount = rowCount;
    }
}

在此示例中,我们使用JdbcTemplate来执行数据库查询,并在read方法中逐行读取用户数据。

这里就可以根据你的业务需求设置各种各样的任务

创建ItemProcessor:
创建一个实现ItemProcessor接口的自定义类,用于对读取的数据进行清洗和转换。

temProcessor的作用是在Spring Batch的批处理作业中对读取的数据进行处理、清洗和转换。它是Spring Batch框架中的一个关键接口,用于执行中间处理逻辑,并将处理后的数据传递给ItemWriter进行写入操作。

以下是一个对用户数据进行简单处理的示例:

public class UserProcessor implements ItemProcessor<UserData, ProcessedUserData> {

    @Override
    public ProcessedUserData process(UserData userData) throws Exception {
        // 获取用户数据
        String input = userData.getData();

        // 去除首尾空格
        String trimmedInput = input.trim();

        // 过滤敏感信息
        String filteredInput = filterSensitiveData(trimmedInput);

        // 转换为大写
        String upperCaseInput = filteredInput.toUpperCase();

        // 创建处理后的用户数据对象
        ProcessedUserData processedUserData = new ProcessedUserData();
        processedUserData.setProcessedData(upperCaseInput);

        return processedUserData;
    }

    private String filterSensitiveData(String input) {
        // 在这里可以根据实际需求实现敏感信息过滤逻辑
        // 使用正则表达式、敏感词库或其他方法进行过滤
        // 这里是过滤手机号码和邮箱地址

        String filteredInput = input
                .replaceAll("\\b\\d{11}\\b", "[PHONE_NUMBER]")
                .replaceAll("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}\\b", "[EMAIL]");

        return filteredInput;
    }
}

我们做了以下处理和转换:

  1. 使用trim方法去除用户数据字符串首尾的空格。
  2. 使用filterSensitiveData方法过滤敏感信息,例如手机号码和邮箱地址。在示例中,我们使用了简单的正则表达式来过滤手机号码和邮箱地址,并将其替换为占位符。
  3. 使用toUpperCase方法将字符串转换为大写形式。
  4. 创建一个ProcessedUserData对象,将处理后的数据设置到输出对象中。

创建ItemWriter:


创建一个实现ItemWriter接口的自定义类,用于将处理后的数据写入目标位置。以下是一个将用户数据写入数据库的示例:

public class UserItemWriter implements ItemWriter<User> {

    private JdbcTemplate jdbcTemplate;

    public UserItemWriter(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void write(List<? extends User> users) throws Exception {
        for (User user : users) {
            String sql = "INSERT INTO processed_users (id, name, age) VALUES (?, ?, ?)";
            jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());
        }
    }
}

在此示例中,我们使用JdbcTemplate将处理后的用户数据插入到名为processed_users的数据库表中。

创建作业配置:


创建一个包含作业配置的类,用于将ItemReader、ItemProcessor和ItemWriter组合在一起,定义一个批处理作业。以下是一个示例的作业配置类:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public ItemReader<User> userItemReader() {
        return new UserItemReader(dataSource);
    }

    @Bean
    public ItemProcessor<User, User> userItemProcessor() {
        return new UserItemProcessor();
    }

    @Bean
    public ItemWriter<User> userItemWriter() {
        return new UserItemWriter(dataSource);
    }

    @Bean
    public Step step1(ItemReader<User> reader, ItemProcessor<User, User> processor, ItemWriter<User> writer) {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job dataCleanupJob(Step step1) {
        return jobBuilderFactory.get("dataCleanupJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1)
                .end()
                .build();
    }
}

在此示例中,我们通过Spring Batch的注解@EnableBatchProcessing启用批处理功能,并定义了一个名为dataCleanupJob的作业,其中包含一个名为step1的步骤。

运行作业:

创建Job和Step配置:使用Spring Batch的配置文件,配置Job和Step。使用JobParametersBuilder创建一个包含当前时间戳的Job参数,然后通过jobLauncher.run()方法启动作业。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class BatchApplication {
    public static void main(String[] args) {
        // 创建Spring应用上下文
        ApplicationContext context = new AnnotationConfigApplicationContext(BatchConfiguration.class);

        // 获取JobLauncher和Job实例
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean("dataCleanupJob", Job.class);

        try {
            // 创建Job参数
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();

            // 启动作业
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

监听Listener

可以通过Listener接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

import org.springframework.batch.core.*;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

public class MyJobListener extends JobExecutionListenerSupport {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // 在作业执行之前执行的逻辑
        System.out.println("作业开始执行");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        // 在作业执行之后执行的逻辑
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("作业执行成功");
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            System.out.println("作业执行失败");
        }
    }

    @Override
    public void onSkipInRead(Throwable t) {
        // 在读取过程中发生跳过记录的逻辑
        System.out.println("跳过读取记录");
    }

    @Override
    public void onSkipInProcess(Object item, Throwable t) {
        // 在处理过程中发生跳过记录的逻辑
        System.out.println("跳过处理记录");
    }

    @Override
    public void onSkipInWrite(Object item, Throwable t) {
        // 在写入过程中发生跳过记录的逻辑
        System.out.println("跳过写入记录");
    }
}

将这个自定义的监听器添加到作业配置中:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 省略其他配置

    @Bean
    public Job dataCleanupJob(Step step1, JobExecutionListener jobListener) {
        return jobBuilderFactory.get("dataCleanupJob")
                .incrementer(new RunIdIncrementer())
                .listener(jobListener) // 添加自定义的监听器
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public JobExecutionListener jobListener() {
        return new MyJobListener();
    }

    // 省略其他配置
}

这样  我们就能很清晰的看到 任务运行的情况啦

Spring Batch 使用内存缓冲机制,将读取的数据记录暂存于内存中,然后批量处理这些数据。通过减少对磁盘或数据库的频繁访问,内存缓冲可以提高读取和处理的效率,而且Spring Batch 提供了批量读取的机制,允许一次性读取和处理多个数据记录,这两点都减轻 I/O 压力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/438309.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

蚂蚁感冒 刷题笔记

/* 解题思路 首先根据题意可知 1.蚂蚁速度均为1 即同向蚂蚁永远不可能追上 我们需要求最后感冒蚂蚁的数量 因为蚂蚁碰头将会掉头 效果和俩蚂蚁互相穿过继续走是一样的 所以我们将俩蚂蚁碰头视作穿过 2. 如果俩蚂蚁相向而行 则俩蚂蚁必定碰头 首先 我们获得第一个感冒蚂蚁的…

Vue+OpenLayers7入门到实战:OpenLayers7如何使用gifler库来实现gif动态图图片叠加到地图上

返回《Vue+OpenLayers7》专栏目录:Vue+OpenLayers7 前言 OpenLayers7本身不支持gif图片作为图标要素显示到地图上,所以需要通过其他办法来实现支持gif图片。 本章介绍如何使用OpenLayers7在地图上使用gifler库先生成canvas画板,然后通过canvas画板的重绘事件来重新渲染地图…

URL输入到页面渲染过程详解

当我们在浏览器中输入一个URL并按下回车键时&#xff0c;浏览器会执行一系列步骤来解析URL、发送请求、接收响应&#xff0c;并最终渲染页面。这个过程涉及到多个阶段&#xff0c;包括DNS解析、TCP握手、发送HTTP请求、服务器处理请求、返回HTTP响应、浏览器解析和渲染等。下面…

19-Java中介者模式 ( Mediator Pattern )

Java中介者模式 摘要实现范例 中介者模式&#xff08;Mediator Pattern&#xff09;提供了一个中介类&#xff0c;该类通常处理不同类之间的通信&#xff0c;并支持松耦合&#xff0c;使代码易于维护中介者模式是用来降低多个对象和类之间的通信复杂性中介者模式属于行为型模式…

算法刷题Day2 | 977.有序数组的平方、209.长度最小的子数组、59.螺旋矩阵II

目录 0 引言1 有序数组列表1.1 我的题解&#xff08;双指针&#xff09;1.2 根据官方解题修改后 2 长度最小的子数组2.1 我的题解2.2 官方滑动窗口&#xff08;双指针&#xff09;题解 3 螺旋矩阵3.1 我的题解 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&…

CXYGZL实现钉钉、飞书和微信全面覆盖!!!

非常欣慰能在这里与大家分享&#xff0c;CXYGZL已圆满实现多端互通的目标&#xff01;&#xff01;&#xff01; 无论您是在手机、电脑还是平板上使用钉钉、企微还是飞书&#xff0c;只需将CXYGZL轻松集成到您的办公软件中&#xff0c;即可实现无缝审批处理各项任务&#xff0c…

FreeRTOS_day2

作业&#xff1a;1.使用ADC采样光敏电阻数值&#xff0c;如何根据这个数值调节LED灯亮度。 2.总结DMA空闲中断接收数据的使用方法 打开DAM,允许接收外部设备数据&#xff0c;调用中断接收回调函数

王道机试C++第 3 章 排序与查找:排序问题 Day28(含二分查找)

查找 查找是另一类必须掌握的基础算法&#xff0c;它不仅会在机试中直接考查&#xff0c;而且是其他某些算法的基础。之所以将查找和排序放在一起讲&#xff0c;是因为二者有较强的联系。排序的重要意义之一便是帮助人们更加方便地进行查找。如果不对数据进行排序&#xff0c;…

热插拔更换ESXI宿主机系统硬盘导致紫屏故障案例一则

关键词 vmware、esxi5.5raid、热插拔、紫屏 华为 CH121V3刀片、SSD硬盘 There are many things that can not be broken&#xff01; 如果觉得本文对你有帮助&#xff0c;欢迎点赞、收藏、评论&#xff01; 一、问题现象 现网vmware云平台一台华为E9000刀箱CH121V3刀片服务…

面试经典150题——环形链表

Suffering, for the weak is the tomb of death, and for the strong is the soil of germinal ambition.​ 1. 题目描述 2. 题目分析与解析 2.1 思路一 这个题目就是判断一个链表有没有环&#xff0c;其实我们之讲过一个题目&#xff0c;就实现了判断链表有没有环的步骤&a…

1 数据分析概述与职业操守 (3%)

1、 EDIT数字化模型 E——exploration探索 &#xff08;是什么&#xff09; 业务运行探索&#xff1a;探索关注企业各项业务的运行状态、各项指标是否合规以及各项业务的具体数据情况等。 D——diagnosis 诊断 (为什么) 问题根源诊断&#xff1a;当业务指标偏离正常值时&…

C#,哈夫曼编码(Huffman Code)压缩(Compress )与解压缩(Decompress)算法与源代码

David A. Huffman 1 哈夫曼编码简史&#xff08;Huffman code&#xff09; 1951年&#xff0c;哈夫曼和他在MIT信息论的同学需要选择是完成学期报告还是期末考试。导师Robert M. Fano给他们的学期报告的题目是&#xff0c;寻找最有效的二进制编码。由于无法证明哪个已有编码是…

GCN 翻译 - 2

2 FAST APROXIMATE CONVOLUTIONS ON GRAPHS 在这一章节&#xff0c;我们为这种特殊的的图基础的神经网络模型f(X, A)提供理论上的支持。我们考虑一个多层的图卷积网络&#xff08;GCN&#xff09;&#xff0c;它通过以下方式进行层间的传播&#xff1a; 这里&#xff0c;是无…

Spring事务注解@Transactional的流程和源码分析

Spring事务简介 Spring事务有两种方式&#xff1a; 编程式事务&#xff1a;编程式事务通常使用编程式事务管理API实现&#xff0c;比如Spring提供的PlatformTransactionManager接口&#xff0c;使用它操控事务。声明式事务&#xff1a;注解式事务使用AOP&#xff08;面向切面…

【24春招/简历】如果技术和学历不行,如何包装自己在春招中占得先机?突出你的亮点!

面试讲什么 学历&#xff1a; 行情 要美化&#xff08;吹牛&#xff09; 面试很好 技术能力 让面试官知道你会哪些技术&#xff0c;尽量细节 “熟悉spring” > ioc流程&#xff0c;Bean的生命周期&#xff0c;循环依赖&#xff0c;常见注解 熟悉redis > 缓存穿透&…

【Java设计模式】八、装饰者模式

文章目录 0、背景1、装饰者模式2、案例3、使用场景4、源码中的实际应用 0、背景 有个快餐店&#xff0c;里面的快餐有炒饭FriedRice 和 炒面FriedNoodles&#xff0c;且加配菜后总价不一样&#xff0c;计算麻烦。如果单独使用继承&#xff0c;那就是&#xff1a; 类爆炸不说&a…

Django项目的部署——之环境的重建

Django项目的部署 版本对应 Django 2.0.6 可以匹配mysql5.6.48版本&#xff0c;和mysql5.7.7版本一块用会报错。 其它版本未测试。 创建新的虚拟环境 根据项目版本安装对应的Python包。比如项目开发时用的是python-3.6.4&#xff0c;则安装该版本&#xff0c;并配置环境变量…

【智能家居】东胜物联ODM定制ZigBee网关,助力能源管理解决方案商,提升市场占有率

背景 本文案例服务的客户是专业从事智能家居能源管理的解决方案商&#xff0c;其产品与服务旨在帮助用户监测、管理和优化能源消耗&#xff0c;以提高能源使用效率。 随着公司的扩张&#xff0c;为了增加市场占有率&#xff0c;他们希望找到更好的硬件服务支持&#xff0c;以…

leetcode 3.6

Leetcode hot 100 一.矩阵1.旋转图像 二.链表1. 相交链表2.反转链表3.回文链表4.环形链表5.环形链表 II 一.矩阵 1.旋转图像 旋转图像 观察规律可得&#xff1a; matrix[i][j] 最终会被交换到 matrix [j][n−i−1]位置&#xff0c;最初思路是直接上三角交换&#xff0c;但是会…

学习clickhouse 集群搭建和分布式存储

为什么要用集群 使用集群的主要原因是为了提高系统的可扩展性、可用性和容错性。 可扩展性&#xff1a;当单个节点无法处理增加的负载时&#xff0c;可以通过添加更多的节点到集群来增加处理能力。这使得系统可以处理更大的数据量和更高的查询负载。可用性&#xff1a;在集群…