Spring Boot 知识集锦之Spring-Batch批处理组件详解

文章目录

  • 0.前言
  • 1.参考文档
  • 2.基础介绍
    • 2.1. 核心组件
  • 3.步骤
    • 3.1. 引入依赖
    • 3.2. 配置文件
    • 3.3. 核心源码
  • 4.示例项目
  • 5.总结

在这里插入图片描述

0.前言

背景: 一直零散的使用着Spring Boot 的各种组件和特性,从未系统性的学习和总结,本次借着这个机会搞一波。共同学习,一起进步。哈哈

在这里插入图片描述

Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它为开发者提供了一种简化和便捷的方式来处理大规模数据处理任务。批处理应用程序通常涉及大量数据的读取、处理和写入,而 Spring Batch 提供了强大的功能和组件来管理和执行这些任务。

本文将带你深入探索 Spring Boot Starter Batch 的核心源码和机制, 将介绍作业配置类的创建和使用,以定义作业结构和配置步骤、读取器、处理器、写入器等组件。

你还将了解到作业监听器和步骤监听器的用法,以及如何处理错误和异常情况。通过深入研究 Spring Batch 的核心组件,你将对作业执行器、作业存储库和事务管理器有更深入的了解。

无论你是菜鸟还是大牛,都可以看看指点一番。让我们一起开始这个令人兴奋的探索之旅吧!

1.参考文档

  1. 《SpringBoot集成Spring Batch批处理框架入门案例实战》https://blog.51cto.com/u_15891990/5908727
    以下是一些参考文档,可以帮助你更深入地了解 Spring Boot Starter Batch 的使用和内部机制:

  2. Spring Boot 官方文档提供了关于 Spring Boot Starter Batch 的详细介绍和使用方法。 可以了解如何使用 Spring Boot Starter Batch 来简化批处理应用程序的开发和部署,并深入了解自动配置和属性的使用。

    官方文档链接:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-batch-applications

2.基础介绍

Spring Boot Starter for Batch(spring-boot-starter-batch)是一个用于在 Spring Boot 项目中实现批处理任务的启动器。它是基于 Spring Batch 框架构建的,通过自动配置功能简化了批处理作业的配置和依赖项管理。

Spring Batch 是一个功能强大的开源批处理框架,旨在处理大规模数据处理和批量任务。它提供了读取、处理和写入大量数据的机制,并支持事务管理、错误处理、跳过策略等功能。

使用 Spring Boot Starter for Batch 可以带来以下好处:

  1. 简化配置:Spring Boot Starter for Batch 利用 Spring Boot 的自动配置机制,减少了繁琐的配置步骤,使批处理任务的配置更加简单和直观。
  2. 依赖管理:启动器自动管理 Spring Batch 和其他相关依赖项的版本兼容性,避免了手动解决依赖冲突的问题。
  3. 快速启动:通过使用启动器,你可以快速搭建和启动一个批处理任务,无需手动配置各种依赖项和组件。
  4. 集成监控:Spring Boot Starter for Batch 与 Spring Boot Actuator 集成,提供了监控和管理批处理作业的端点,方便对作业进行跟踪和管理。

要使用 Spring Boot Starter for Batch,你只需在项目的构建文件中添加相应的依赖项,然后配置作业和步骤的读取器、处理器和写入器组件。启动器将自动加载所需的配置和组件,并提供简洁的编程模型来实现批处理任务。

2.1. 核心组件

当构建批处理作业时,Spring Batch 提供了以下核心组件,每个组件都有特定的功能和责任,通过配置和定义它们的行为,可以实现具体的批处理逻辑:
在这里插入图片描述

  1. Job(作业):Job 是批处理的最高级别组件,表示一个完整的批处理作业。它由一个或多个步骤(Step)组成。Job 可以包含作业参数、监听器、错误处理策略等。

  2. Step(步骤):Step 是批处理作业的基本处理单元,表示一个独立的处理步骤。每个步骤可以包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。步骤可以定义事务管理、错误处理、跳过策略等。

  3. ItemReader(读取器):ItemReader 用于读取输入数据。它提供了不同的实现方式,如从文件、数据库、消息队列等读取数据。ItemReader 从源数据中逐条读取数据,并将其传递给 ItemProcessor 进行处理。

  4. ItemProcessor(处理器):ItemProcessor 用于处理输入数据。它接收从 ItemReader 读取的数据,并进行自定义的业务逻辑处理,如数据转换、过滤、验证等。ItemProcessor 可以对输入数据进行任意的处理操作,并返回处理后的数据。

  5. ItemWriter(写入器):ItemWriter 用于写入处理后的数据。它将 ItemProcessor 处理后的数据写入目标位置,如文件、数据库等。ItemWriter 提供了不同的实现方式,以适应不同的输出场景。

这些组件通过配置和组合,形成了一个完整的批处理作业。可以使用 Spring Batch 提供的注解和构建器来定义这些组件,并将它们组装成一个作业流程。例如,可以创建一个 Job,它包含一个或多个 Step,每个 Step 包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。通过定义这些组件的行为和属性,可以实现从输入数据读取、处理、写入输出数据的批处理逻辑。

此外,Spring Batch 还提供了其他辅助组件,如监听器(Listeners)、错误处理(Error Handling)、事务管理(Transaction Management)等,用于监控和管理批处理作业的执行过程和异常情况。这些组件可以通过配置和扩展来实现特定的需求和处理逻辑。

3.步骤

3.1. 引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

3.2. 配置文件

可以通过配置文件来自定义和配置批处理作业的行为。常用的配置属性

# 指定要运行的作业的名称
spring.batch.job.names=job1,job2

# 是否启用 Spring Batch 的作业
spring.batch.job.enabled=true

# 是否在启动时初始化 Spring Batch 的数据库模式
spring.batch.initialize-schema=false

# 指定 Spring Batch 的数据库模式(Schema)名称
spring.batch.schema=classpath:org/springframework/batch/core/schema-*.sql

# 指定 Spring Batch 数据库表的前缀
spring.batch.table-prefix=BATCH_

# 配置数据源
spring.datasource.url=jdbc:mysql://localhost:3306/batchdb
spring.datasource.username=root
spring.datasource.password=secret

# 配置数据源连接池
spring.datasource.hikari.maximum-pool-size=10

# 配置事务管理器
spring.batch.transaction.manager-bean-name=myTransactionManager

# 配置作业监听器
spring.batch.job.job-listeners=myJobListener

# 配置步骤监听器
spring.batch.step.step-listeners=myStepListener

# 配置错误处理策略
spring.batch.job.error-handlers=skipExceptionHandler

# 配置作业参数
spring.batch.job.parameters.param1=value1
spring.batch.job.parameters.param2=value2

3.3. 核心源码

Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它简化了批处理作业的配置和部署。通过自动配置和注解扫描,Spring Boot Starter Batch 提供了简化和便捷的方式来开发和配置批处理应用程序。它集成了 Spring Batch 的核心功能,并提供了默认的配置,使得开发者可以更专注于业务逻辑的实现。

  1. 自动配置:Spring Boot Starter Batch 提供了自动配置类 BatchAutoConfiguration,它在应用程序启动时自动配置 Spring Batch 相关的组件和属性。它会根据配置文件中的属性来初始化和配置批处理作业的运行环境。

  2. 注解扫描:自动配置类使用 @EnableBatchProcessing 注解来启用批处理功能,并自动扫描作业和步骤的配置类。

  3. JobRepository:自动配置类会创建一个 JobRepository 实例,用于管理作业的元数据和状态。它使用 Spring Batch 提供的默认实现 JobRepositoryFactoryBean,并根据配置文件中的属性进行配置。

  4. 事务管理:自动配置类会创建一个 PlatformTransactionManager 实例,用于管理批处理作业的事务。它使用 Spring Batch 提供的默认实现 DataSourceTransactionManager,并使用配置文件中的数据源进行配置。

  5. 作业执行器:自动配置类会创建一个 JobLauncher 实例,用于启动和执行批处理作业。它使用 Spring Batch 提供的默认实现 SimpleJobLauncher,并使用配置文件中的作业存储库和事务管理器进行配置。
    Spring Boot 3.x版本中SimpleJobLauncher 已经废弃在这里插入图片描述
    需要使用 TaskExecutorJobLauncher.
    在这里插入图片描述

  6. 作业监听器:自动配置类会自动注册配置类中定义的作业监听器(JobListener),以便在作业执行的不同阶段触发相应的事件。

  7. 步骤监听器:自动配置类会自动注册配置类中定义的步骤监听器(StepListener),以便在步骤执行的不同阶段触发相应的事件。

  8. 作业配置类:开发者可以创建作业配置类来定义批处理作业的结构和行为。作业配置类使用 @Configuration 注解标记,并使用 @EnableBatchProcessing 注解启用批处理功能。在作业配置类中,可以定义一个或多个批处理作业,并配置它们的步骤、读取器、处理器、写入器等组件。

  9. 作业执行:通过调用 JobLauncherrun() 方法,并传入作业名称和作业参数,可以启动和执行批处理作业。JobLauncher 会根据作业配置类中的定义,按照步骤的顺序执行作业的各个阶段,并将数据从读取器传递给处理器和写入器。

4.示例项目

从一个包含学生成绩的 CSV 文件中读取数据,并根据一定的条件进行处理和分析。我们简化一下,实际业务肯定比较复杂,我们为了演示,简化一个版本方便大家理解。实现一个批处理任务,从 students.csv 文件中读取学生数据,将分数乘以 10 进行处理,并将处理后的学生数据写入到日志中。

  1. 创建一个名为 Student 的类
@Data
public class Student {
    private String name;
    private int score;
}
  1. StudentItemReader 的类,实现 Spring Batch 的 ItemReader 接口,用于从 CSV 文件中读取学生数据。
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

public class StudentItemReader implements ItemReader<Student> {
    private FlatFileItemReader<Student> reader;

    public StudentItemReader() {
        reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("students.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("name", "score");

        BeanWrapperFieldSetMapper<Student> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Student.class);

        DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);

        reader.setLineMapper(lineMapper);
    }

    @Override
    public Student read() throws Exception {
        return reader.read();
    }
}
  1. 创建一个名为 StudentItemProcessor 的类,实现 Spring Batch 的 ItemProcessor 接口,用于处理学生数据。
import org.springframework.batch.item.ItemProcessor;

public class StudentItemProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student student) throws Exception {
        // 在这里进行学生数据的处理和分析
        // 这里只是简单地将分数乘以 10
        student.setScore(student.getScore() * 10);

        return student;
    }
}
  1. 创建一个名为 StudentItemWriter 的类,实现 Spring Batch 的 ItemWriter 接口,用于将处理后的学生数据写入到日志中。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;

public class StudentItemWriter implements ItemWriter<Student> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StudentItemWriter.class);

    @Override
    public void write(List<? extends Student> items) throws Exception {
        for (Student student : items) {
            LOGGER.info("Processed student: {}", student);
        }
    }
}
  1. 创建一个名为 BatchConfig 的配置类,用于配置批处理作业的相关组件。
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建 ItemReader:读取输入数据
    @Bean
    public ItemReader<Student> itemReader() {
        return new StudentItemReader();
    }

    // 创建 ItemProcessor:处理输入数据
    @Bean
    public ItemProcessor<Student, Student> itemProcessor() {
        return new StudentItemProcessor();
    }

    // 创建 ItemWriter:写入输出数据
    @Bean
    public ItemWriter<Student> itemWriter() {
        return new StudentItemWriter();
    }

    // 创建 Step:定义处理步骤
    @Bean
    public Step step(ItemReader<Student> reader, ItemProcessor<Student, Student> processor,
                     ItemWriter<Student> writer) {
        return stepBuilderFactory.get("step")
                .<Student, Student>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // 创建 Job:定义作业
    @Bean
    public Job job(Step step) {
        return jobBuilderFactory.get("job")
                .start(step)
                .build();
    }
}
  1. 创建一个名为 Application 的启动类,用于启动 Spring Boot 应用和运行批处理作业。
import org.springframework.batch.core.Job;
import org.springframework.batch.core```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);

        // 获取 job bean
        Job job = context.getBean(Job.class);

        try {
            // 启动批处理作业
            context.getBean(JobLauncher.class).run(job, new JobParameters());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            context.close();
        }
    }
}
  1. 创建一个名为 students.csv 的 CSV 文件,包含学生的姓名和分数数据,例如:
姓名分数
张三80
小明75
小花90
大卫85
艾玛92
弗兰克78
格蕾丝88
海伦91
伊万83
爱迪生87

5.总结

使用 Spring Boot Starter Batch,开发者可以更专注于业务逻辑的实现,而不必过多关注底层的配置和管理细节。它提供了一种高效、可靠的方式来处理大规模数据处理任务,为批处理应用程序的开发带来了便利和灵活性。
Spring Boot Starter Batch 是基于 Spring Batch 的开发套件,为批处理应用程序提供了便捷的开发和配置方式。通过自动配置和注解扫描,它简化了批处理作业的搭建和部署过程。

在 Spring Boot Starter Batch 的核心源码中,关键的组件包括自动配置类、作业配置类、JobRepository、事务管理器、作业执行器等。自动配置类负责初始化和配置这些组件,并根据配置文件中的属性进行设置。

开发者可以创建作业配置类,通过定义作业结构和配置步骤、读取器、处理器、写入器等组件来定制批处理作业的行为。作业监听器和步骤监听器可以用来处理作业执行过程中的事件和异常情况。

在这里插入图片描述

大家好,我是冰点,今天的Spring Boot Starter Batch 全部内容就是这些。如果你有疑问或见解可以在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/81441.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch DSL语句(bool查询、算分控制、地理查询、排序、分页、高亮等)

文章目录 DSL 查询种类DSL query 基本语法1、全文检索2、精确查询3、地理查询4、function score &#xff08;算分控制&#xff09;5、bool 查询 搜索结果处理1、排序2、分页3、高亮 RestClient操作 DSL 查询种类 查询所有&#xff1a;查询所有数据&#xff0c;一般在测试时使…

【unity】Pico VR 开发笔记(基础篇)

Pico VR 开发笔记(基础篇) XR Interaction Tooikit 版本 2.3.2 一、环境搭建 其实官方文档已经写的很详细了&#xff0c;这里只是不废话快速搭建&#xff0c;另外有一项官方说明有误的&#xff0c;补充说明一下&#xff0c;在开发工具部分说明 插件安装——安装pico的sdk和XR…

12. Docker可视化工具

目录 1、前言 2、Docker UI 2.1、部署Docker UI 2.2、管理容器 3、Portainer 3.1、部署Portainer 3.2、管理容器 3.3、添加远程Docker 4、Shipyard 1、前言 Docker 提供了命令行工具来管理 Docker 的镜像和运行 Docker 的容器。我们也可以使用图形工具来管理 Docker。…

【k8s、云原生】基于metrics-server弹性伸缩

第四阶段 时 间&#xff1a;2023年8月18日 参加人&#xff1a;全班人员 内 容&#xff1a; 基于metrics-server弹性伸缩 目录 一、Kubernetes部署方式 &#xff08;一&#xff09;minikube &#xff08;二&#xff09;二进制包 &#xff08;三&#xff09;Kubeadm 二…

STM32——RTC实时时钟

文章目录 Unix时间戳UTC/GMT 时间戳转换BKP简介BKP基本结构读写BKP备份寄存器电路设计关键代码 RTC简介RTC框图RTC基本结构硬件电路RTC操作注意事项读写实时时钟电路设计关键代码 Unix时间戳 Unix 时间戳&#xff08;Unix Timestamp&#xff09;定义为从UTC/GMT的1970年1月1日…

周易卦爻解读笔记——节卦

第六十卦节 水泽节 坎上兑下 节卦由泰卦所变&#xff0c;泰卦的九三与六五换位。象征节制有度。 地天泰 节卦是涣卦的覆卦&#xff0c;序卦传【物不可以终离&#xff0c;故受之以节】 节&#xff1a;亨。苦节不可贞。 节卦通达&#xff0c;但不可一直过度辛苦的节制。 彖曰…

机器学习|Softmax 回归的数学理解及代码解析

机器学习&#xff5c;Softmax 回归的数学理解及代码解析 Softmax 回归是一种常用的多类别分类算法&#xff0c;适用于将输入向量映射到多个类别的概率分布。在本文中&#xff0c;我们将深入探讨 Softmax 回归的数学原理&#xff0c;并提供 Python 示例代码帮助读者更好地理解和…

JVM面试题-1

1、什么是JVM内存结构&#xff1f; jvm将虚拟机分为5大区域&#xff0c;程序计数器、虚拟机栈、本地方法栈、java堆、方法区&#xff1b; 程序计数器&#xff1a;线程私有的&#xff0c;是一块很小的内存空间&#xff0c;作为当前线程的行号指示器&#xff0c;用于记录当前虚拟…

【以太网通信】RS232 串口转以太网

最近和 RK 研发同事在调试通信接口&#xff0c;排查与定位 RK3399 接收数据出错的问题。FPGA 与 RK3399 之间使用一路 RS232 串口进行通信&#xff0c;由于串口数据没有分包&#xff0c;不方便排查问题&#xff0c;想到可以开发一个 RS232 串口转以太网的工具&#xff0c;将串口…

nginx部署时http接口正常,ws接口404

可以这么配置 map $http_upgrade $connection_upgrade {default upgrade; close; }upstream wsbackend{server ip1:port1;server ip2:port2;keepalive 1000; }server {listen 20038;location /{ proxy_http_version 1.1;proxy_pass http://wsbackend;proxy_redirect off;proxy…

谷歌浏览器插件篇之console Importer

前言 作为一名前端开发者&#xff0c;相信在开发实践中&#xff0c;使用过诸多第三方库。譬如&#xff1a;lodash、moment、dayjs、antd等数不胜数。 然每每使用&#xff0c;经起繁琐&#xff0c;便令人有反抗之意。其步骤如下&#xff1a;首先要在搭建好的项目里&#xff0c…

学习笔记:Opencv实现限制对比度得自适应直方图均衡CLAHE

2023.8.19 为了完成深度学习的进阶&#xff0c;得学习学习传统算法拓展知识面&#xff0c;记录自己的学习心得 CLAHE百科&#xff1a; 一种限制对比度自适应直方图均衡化方法&#xff0c;采用了限制直方图分布的方法和加速的插值方法 clahe&#xff08;限制对比度自适应直方图…

C++ string类详解

⭐️ string string 是表示字符串的字符串类&#xff0c;该类的接口与常规容器的接口基本一致&#xff0c;还有一些额外的操作 string 的常规操作&#xff0c;在使用 string 类时&#xff0c;需要使用 #include <string> 以及 using namespace std;。 ✨ 帮助文档&…

4 STM32标准库函数 之 FLASH存储器(FLASH)所有函数的介绍及使用

3 STM32标准库函数 之 FLASH存储器所有函数的介绍及使用 1. 图片有格式2 文字无格式二、FLASH 库函数固件库函数预览2.1 函数FLASH_SetLatency2.2 函数FLASH_HalfCycleAccessCmd2.3 函数FLASH_PrefetchBufferCmd2.4 函数FLASH_Unlock2.5 函数FLASH_Lock2.6 函数FLASH_ErasePage…

音视频 FFmpeg音视频处理流程

ffmpeg -i test_1920x1080.mp4 -acodec copy -vcodec libx264 -s 1280x720 test_1280x720.flv推荐一个零声学院项目课&#xff0c;个人觉得老师讲得不错&#xff0c;分享给大家&#xff1a; 零声白金学习卡&#xff08;含基础架构/高性能存储/golang云原生/音视频/Linux内核&am…

GAN!生成对抗网络GAN全维度介绍与实战

目录 一、引言1.1 生成对抗网络简介1.2 应用领域概览1.3 GAN的重要性 二、理论基础2.1 生成对抗网络的工作原理2.1.1 生成器生成过程 2.1.2 判别器判别过程 2.1.3 训练过程训练代码示例 2.1.4 平衡与收敛 2.2 数学背景2.2.1 损失函数生成器损失判别器损失 2.2.2 优化方法优化代…

Linux设置临时目录路径的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

第 7 章 排序算法(1)(介绍,分类,时间复杂度,空间复杂度)

7.1排序算法的介绍 排序也称排序算法(Sort Algorithm)&#xff0c;排序是将一组数据&#xff0c;依指定的顺序进行排列的过程。 7.2排序的分类&#xff1a; 内部排序: 指将需要处理的所有数据都加载到**内部存储器(内存)**中进行排序。外部排序法&#xff1a; 数据量过大&am…

安全学习DAY18_信息打点-APP资产搜集

信息打点-APP资产&静态提取&动态抓包&动态调试 文章目录 信息打点-APP资产&静态提取&动态抓包&动态调试本节知识&思维导图本节使用到的链接&工具 如何获取目标APP从名称中获取APP从URL获取APP APP搜集资产信息APP提取信息分类信息提取方式信息…