业务数据批量插入数据库实践

业务数据如何存储一直以来都是项目开发中的一个比较重要的话题。我们要从资源的利用率,业务场景和技术实现多个方面考虑存储的问题。“抛开业务谈技术就是耍流氓”,所有技术架构都要站在实际的业务场景中分析。比如个人端的产品,这种就属于读多写少的业务,对于这种系统的架构就要更多的考虑增加缓存来减少数据库读压力;B端的产品,基本上属于写多读少的业务,客户数量不多但是单个客户的qps会非常高,要减少这种业务对数据库的压力,数据写入一般要采用批处理。物联网类型的产品,基本上也是属于写多读少的场景,一个设备会有几百上千的测点数据上报,并且上报的频率非常均匀,对于这种类型的系统我们要保证数据能够及时写入数据库并且不能对数据库产生太大的压力,一般我们就采用批量写库方案。下面就针对这种写多读少的物联网系统设计方案分享一下个人的经验。
我最近在做的一个系统是接收储能系统上报的数据,设备每10s上报一轮数据,云端接收到数据后做一些基本的处理判断逻辑,就将数据存入数据库。业务端展示数据分为历史数据和实时数据,历史数据用于其他系统或业务部门分析,实时数据需要做一些预警和最新数据展示。所以我设计的系统机构图大致如下:
系统架构图
系统使用kafka消息队列将接收数据和处理数据的两个服务进行解耦,在这个系统中,最为核心的服务就是数据服务,它要负责数据的处理、入库、查询等多个功能。在批量写数据库时我们要考虑两个方面:一个是批量插入数据多少条比较合适,另外一个就是如果数据积累比较缓慢最长等待多久就要执行一次批量写。
综合以上的分析,我设计的批量写入数据库采用 缓存队列+定时任务 方式实现,这样既能满足批量写入数据库的要求,又能满足及时更新数据库。
既然数据有缓存,就存在缓存数据丢失的风险,规避数据丢失的问题,行业内采用比较多的方案就是WAL(Write-Ahead Logging)。对于我的系统就是数据服务从kafka消费到数据后先写本地日志,再将数据放入缓存队列,然后通知kafka消费数据成功;数据在缓存队列中积累到批量写入数据库的条数或到达一定时间后就插入数据库。在这个过程中如果服务宕机导致缓存队列的数据没有入库,我们也可以通过本地日志找回数据,再次启动服务后重新加载插入数据库。
对于日志中的数据,会有一个标识标记当前已经入库的位置,当缓存队列的数据写入数据库后,更新日志标记点,标记点之前的数据都是写入数据库成功的,标记点之后的数据是目前还在缓存中并未写入数据库。如果程序正常退出,会有一个钩子函数清理缓存队列,将队列中的所有数据都写入数据库,清理标记点;如果程序异常退出,那么标记点之后的数据都是不安全的,在程序下一次正常启动时检查这个标记点,将标记点之后的数据重新加载进程序并写入数据库。这里的标记点通过在项目部署目录下创建一个文件来实现:程序每次写入数据库后更新这个标记点并将它写入文件,程序正常退出就通过钩子函数删除这个文件。下次程序启动后检查文件是否存在:不存在就是正常退出不用做任何处理;存在就是异常退出,这时就要读取标记点的值,把它之后的所有数据重新写入数据库。
通过上面的分析,整个写入数据的流程图如下:
处理数据业务流程
对于标记点的设计结构如下:
标记点结构
由于设备上报的数据有多种类型,那么缓存队列就存在多个,每个队列对应的标记点就有多个,这里选择数据存入缓存队列的时间戳作为标记,占用4个字节,每次写入数据库成功后就将这个时间戳写入到文件,多个缓存队列就依次向后写入文件,为了提高写文件性能,使用mmap将文件映射到内存中,通过修改内存中的值达到写文件的目的。

以上分析了整个项目的架构和实现逻辑,接下来就通过代码实现上面的这些逻辑。
talk is cheap, show me the code

一、项目结构:

使用SpringBoot3.2 + kafka3.6 + jdk17。
maven依赖如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
    <version>1.18.30</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- redis相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
<!-- 数据库相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.5</version>
    <exclusions>
        <exclusion>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.33</version>
</dependency>

二、日志记录实现

写日志部分,就不单独实现了,采用logback框架写日志,单个日志文件大小设置为1GB,日志滚动策略采用大小+日期方式。配置文件在resource目录下的 logback-spring.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml" />
    <property name="LOG_PATH" value="logs"/>
    <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%t|%-5p|%c|%m%n"/>

    <!-- 业务日志:记录系统接收到的数据,用于数据丢失处理 -->
    <appender name="redo" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/redo.log</file>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <FileNamePattern>${LOG_PATH}/redo-%i.log.%d{yyyy-MM-dd}</FileNamePattern>
            <maxHistory>7</maxHistory>
            <maxFileSize>1GB</maxFileSize>
            <totalSizeCap>40GB</totalSizeCap>
        </rollingPolicy>
    </appender>

    <logger name="redo" level="INFO" additivity="false">
        <appender-ref ref="redo"/>
    </logger>

    <!-- 控制台 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 错误日志:输出所有错误日志信息 -->
    <appender name="FILE-ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/error/error.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/error/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- WARN日志:输出所有WARN日志信息 -->
    <appender name="FILE-WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/warn/warn.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/warn/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>WARN</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 消息日志:输出所有消息日志信息 -->
    <appender name="FILE-INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/info/info.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/info/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 调试日志:输出所有消息日志信息 -->
    <appender name="FILE-DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/debug/debug.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/debug/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>DEBUG</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE-ERROR"/>
        <appender-ref ref="FILE-WARN"/>
        <appender-ref ref="FILE-INFO"/>
        <appender-ref ref="FILE-DEBUG"/>
    </root>
</configuration>

日志格式为: 日志时间|消息主题|消息内容 ,消息主题就是kafka的主题,时间戳是接收到消息的时间戳,这个时间戳用于异常退出时下次启动服务恢复消息时判断数据是在标记点前后的依据,消息内容就是具体消费的信息。示例数据如下:

2024-09-23 15:03:19.378|batch-message|{"data":"{\"id\":\"1727074999121\",\"code\":\"code1--1\",\"name\":\"name1--1\",\"random\":\"0eaf04363d5a49419967d500dc149e5a\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work1Data"}
2024-09-23 15:03:19.734|batch-message|{"data":"{\"id\":\"1727074999359\",\"code\":\"code2--1\",\"name\":\"name2--1\",\"random\":\"89b8b315f4e84fedb5af799ce4860c56\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work2Data"}

三、消费KAFKA数据

数据是通过消费kafka队列接收消息,如果消息被成功接收并且写入本地日志和缓存队列,那么就提交ack给kafka,实现代码如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.consumer.batch.BatchWorkFactory;
import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;
import org.xingo.domain.Work1Data;
import org.xingo.domain.Work2Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class KafkaConsumer {

    Logger redoLogger = LoggerFactory.getLogger("redo");

    @Autowired
    private BatchWorkFactory batchWorkFactory;

    /**
     * 数据消费者
     */
    @KafkaListener(topics = "batch-message")
    public void linsten01(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        if(records != null) {
            for (ConsumerRecord<String, String> record : records) {
                // 写本地日志
                long ts = System.currentTimeMillis();
                System.out.println(record.value());
                redoLogger.info("{}|{}", record.topic(), record.value());
            
                // 解析消息内容,保存到本地缓存队列
                try {
                    JsonNode json = JacksonUtils.getObjectMapper().readTree(record.value());
                    String dataType = json.get("dataType").asText();
                    if("Work1Data".equals(dataType)) {
                        Work1Data work1Data = JacksonUtils.parseObject(json.get("data").asText(), Work1Data.class);
                        work1Data.setTs(ts);
                        batchWorkFactory.get(BatchWork1.class).add(work1Data);
                    } else if("Work2Data".equals(dataType)) {
                        Work2Data work2Data = JacksonUtils.parseObject(json.get("data").asText(), Work2Data.class);
                        work2Data.setTs(ts);
                        batchWorkFactory.get(BatchWork2.class).add(work2Data);
                    }
                } catch (JsonProcessingException e) {
                    log.error("解析数据异常", e);
                }
            }

            //提交offset消费成功
            ack.acknowledge();
        }
    }
}

这里面json反序列化使用到了Jackson工具类:

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;

import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;

/**
 * json工具
 *
 * @Author xingo
 * @Date 2023/12/15
 */
public class JacksonUtils {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    static {
        // Long类型处理,避免前端处理长整型时精度丢失
        SimpleModule module1 = new SimpleModule();
        module1.addSerializer(Long.class, ToStringSerializer.instance);
        module1.addSerializer(Long.TYPE, ToStringSerializer.instance);

        JavaTimeModule module2 = new JavaTimeModule();
        // java8日期处理
        module2.addSerializer(LocalDateTime.class,
                new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        module2.addSerializer(LocalDate.class,
                new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        module2.addSerializer(LocalTime.class,
                new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
        module2.addDeserializer(LocalDateTime.class,
                new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        module2.addDeserializer(LocalDate.class,
                new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        module2.addDeserializer(LocalTime.class,
                new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));

        OBJECT_MAPPER
                // 添加modules
                .registerModules(module1, module2, new Jdk8Module())
                // 日期类型不转换为时间戳
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false)
                // 反序列化的时候如果多了其他属性,不抛出异常
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                // 如果是空对象的时候,不抛异常
                .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                // 空对象不序列化
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                // 日期格式化
                .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))
                // 设置时区
                .setTimeZone(TimeZone.getTimeZone("GMT+8"))
                // 驼峰转下划线
                // .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
                // 语言
                .setLocale(Locale.SIMPLIFIED_CHINESE);
    }

    /**
     * 反序列化
     * @param json      json字符串
     * @param clazz     发序列化类型
     * @return
     * @param <T>
     */
    public static <T> T parseObject(String json, Class<T> clazz) {
        if(json == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(json, clazz);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> T parseObject(String json, TypeReference<T> type) {
        if(json == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(json, type);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> T parseObject(byte[] bytes, TypeReference<T> type) {
        if(bytes == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(bytes, type);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 反序列化列表
     * @param json
     * @return
     * @param <T>
     */
    public static <T> List<T> parseArray(String json) {
        if(json == null) {
            return null;
        }
        try {
            TypeReference<List<T>> type = new TypeReference<List<T>>(){};
            return OBJECT_MAPPER.readValue(json, type);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 写为json串
     * @param obj   对象
     * @return
     */
    public static String toJSONString(Object obj) {
        if(obj == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static byte[] toJSONBytes(Object obj) {
        if(obj == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.writeValueAsBytes(obj);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取jackson对象
     * @return
     */
    public static ObjectMapper getObjectMapper() {
        return OBJECT_MAPPER;
    }
}

四、数据缓存队列

缓存数据的队列有两种数据结构可选:数组和链表,链表适用于数据量不定的场景,随机增减节点非常方便;数组长度一定,随机增减性能比较低,对于顺序读写性能比链表要好。这里的缓存主要是尾部添加,一次读取入库的场景,综合来看,数组性能会好一些,由于缓存数据条数不一定,只有最大长度限制,选择ArrayList这个结构非常方便。至于并发问题,在读写时加锁处理。
对于数据缓存这部分逻辑,我们定义一个抽象类叫BatchWork,它主要用于公共处理逻辑的抽取,对于不同数据结构的对象,只需要继承这个父类,并且实现该对象批量处理数据的逻辑即可。
批处理抽象父类定义如下:

import lombok.extern.slf4j.Slf4j;
import org.xingo.domain.WorkData;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 批处理任务父类
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
public abstract class BatchWork<T extends WorkData> {

    /**
     * 最大时间间隔,超过这个时间间隔还没有执行数据批量插入就执行定时任务
     */
    public static final int DIFF_TIMESTAMP = 30_000;
    /**
     * 执行时间间隔:30s
     */
    public static final int period = 30;    // 执行的间隔时间
    /**
     * 批量插入数据最大条数
     */
    protected short batchSize = 1000;

    /**
     * 最近一次执行数据插入时间戳
     */
    protected long lastTimestamp = 0L;

    /**
     * 任务名称
     */
    protected BatchWorkEnum batchWork;

    /**
     * 系统操作线程池:构建一个核心池大小是8、线程池最大线程数是16的线程池,可以根据主机CPU核数进行调整
     */
    public static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(8), new ThreadFactory() {

        private final AtomicInteger number = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("batch-savedb-" + number.getAndIncrement());

            return thread;
        }
    });

    /**
     * 定时任务线程池:定时检查缓存队列,通过判断缓存队列最后一次执行时间来判断是否要将缓存队列数据插入数据库表中
     */
    private static final ScheduledExecutorService taskExecutor = Executors.newScheduledThreadPool(16, new ThreadFactory() {

        private final AtomicInteger number = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("batch-work-" + number.getAndIncrement());

            return thread;
        }
    });

    /**
     * 缓存数据列表
     */
    private List<T> datas = new ArrayList<>(batchSize);

    /**
     * 锁对象
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 启动定时执行任务方法,在项目启动时启动
     */
    public void run() {
        if(batchWork == null) {
            throw new RuntimeException("属性 [batchWork] 不能为空");
        }

        // 最后更新时间与当前时间差大于时间间隔,将执行一次批处理任务
        Runnable task = () -> {
            long diff = System.currentTimeMillis() - lastTimestamp;
            if(diff >= DIFF_TIMESTAMP) {
                batch();
            }
        };

        Random random = new Random();
        long initialDelay = random.nextInt(10, 30);     // 初始延迟时间
        // 为了让任务尽可能均匀分布,所有要批量处理的任务初始时间随机生成
        taskExecutor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

        BatchWorkFactory.WORK_NUMS.incrementAndGet();
        log.info("{}批量任务开始执行", batchWork);
    }

    /**
     * 服务关闭时执行
     */
    public void clear() {
        log.info("项目关闭开始清理{}批量任务", batchWork);
        this.batch();
        BatchWorkFactory.WORK_NUMS.decrementAndGet();
        log.info("{}批量任务已结束", batchWork);
    }

    /**
     * 添加数据方法:将要插入数据添加到缓存队列
     * @param data
     */
    public void add(T data) {
        lock.lock();
        try {
            datas.add(data);
        } finally {
            lock.unlock();
        }
        this.cache(data);
        // 添加数据后判断缓存数据条数,如果缓存条数超过了批处理阈值,执行一次批处理,结合定时任务就实现了“最大缓存条数或时间间隔”内插入数据的逻辑
        if(datas.size() >= batchSize) {
            batch();
        }
    }

    /**
     * 批量新增数据:将一批数据添加到缓存队列
     * @param batch
     */
    public void batchAdd(List<T> batch) {
        lock.lock();
        try {
            datas.addAll(batch);
        } finally {
            lock.unlock();
        }
        if(datas.size() >= batchSize) {
            batch();
        }
    }

    /**
     * 批量处理方法:具体子类要实现这个方法
     */
    public abstract void batchInsertDb(List<T> datas);

    /**
     * 数据缓存:缓存最新一条数据
     * @param data
     */
    public abstract void cache(T data);

    /**
     * 批处理方法:
     * 将当前缓存的批处理数据赋值给其他对象,当前缓存列表重新申请一个空间接收数据
     */
    private void batch() {
        log.info("调用{}批量处理方法", batchWork);
        if(!datas.isEmpty()) {
            List<T> copyList = null;
            lock.lock();
            try {
                if(!datas.isEmpty()) {
                    // 定义一个局部变量指向原有的数组,原有的数组重新申请空间用于接收数据
                    copyList = datas;
                    datas = new ArrayList<>(batchSize);
                }
            } finally {
                lock.unlock();
            }
            if(copyList != null && !copyList.isEmpty()) {
                final List<T> dbList = copyList;
                // 插入数据是比较耗时的过程,这里放入异步线程池慢慢执行,主线程继续接收数据
                threadPool.execute(() -> {
                    log.info("{}写入数据库开始|{}", batchWork, dbList.size());
                    long s = System.currentTimeMillis();
                    this.batchInsertDb(dbList);
                    lastTimestamp = System.currentTimeMillis();
                    long _ts = dbList.get(dbList.size() - 1).getTs();
                    log.info("{}写入数据库完成|{}|{}|{}ms", batchWork, dbList.size(), new Timestamp(_ts), (lastTimestamp - s));
                    // 插入数据完成后,更新标记点
                    MarkPointLog.markPoint(batchWork, _ts);
                });
            }
        }
    }

}

demo项目模拟了两类数据批处理:
批处理任务1:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.domain.Work1Data;
import org.xingo.common.JacksonUtils;
import org.xingo.mapper.Work1Mapper;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 批处理子类1
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Service
public class BatchWork1 extends BatchWork<Work1Data> {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private Work1Mapper work1Mapper;

    public BatchWork1() {
        this.batchWork = BatchWorkEnum.Work1;
    }

    @Override
    public void batchInsertDb(List<Work1Data> datas) {
        try {
            work1Mapper.batchInsert(datas);
        } catch (Exception e) {
            log.error("批量插入数据异常", e);
            // 这里是批量插入数据如果异常,转为单条插入数据,一般的异常都是数据库主键冲突导致的
            datas.forEach(data -> {
                try {
                    work1Mapper.insert(data);
                } catch (Exception ex) {
                    log.error("单条插入数据异常", ex);
                }
            });
        }
        log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
    }

    @Override
    public void cache(Work1Data data) {
        // 缓存最新一条数据供其他业务使用
        redisTemplate.opsForValue().set("data:work1", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
    }
}

批处理任务2:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.common.JacksonUtils;
import org.xingo.domain.Work2Data;
import org.xingo.mapper.Work2Mapper;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 批处理子类2
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Service
public class BatchWork2 extends BatchWork<Work2Data> {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private Work2Mapper work2Mapper;

    public BatchWork2() {
        this.batchWork = BatchWorkEnum.Work2;
    }

    @Override
    public void batchInsertDb(List<Work2Data> datas) {
        try {
            work2Mapper.batchInsert(datas);
        } catch (Exception e) {
            log.error("批量插入数据异常", e);
            datas.forEach(data -> {
                try {
                    work2Mapper.insert(data);
                } catch (Exception ex) {
                    log.error("单条插入数据异常", ex);
                }
            });
        }
        log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
    }

    @Override
    public void cache(Work2Data data) {
        redisTemplate.opsForValue().set("data:work2", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
    }
}

这里只有两个批处理任务,如果批处理任务有更多个,那么就需要一个统一的地方管理这些任务:
定义一个工厂类管理批量任务,创建、销毁、获取这些批量任务都通过这个工厂类来实现:

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 批量任务工厂
 *
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class BatchWorkFactory {

    /**
     * 批量任务集合
     */
    private final Map<Class<? extends BatchWork>, BatchWork> map = new HashMap<>();

    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private MarkPointLog markPointLog;

    /**
     * 工厂初始化是否完成
     */
    private boolean ok = false;

    /**
     * 任务启动完成
     */
    public static final AtomicInteger WORK_NUMS = new AtomicInteger(0);

    @PostConstruct
    public void run() {
        // 构建bean缓存:通过spring提供的类扫描目录下的所有子类加载到集合中做统一管理
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
        provider.addIncludeFilter(new AssignableTypeFilter(BatchWork.class));
        Set<BeanDefinition> components = provider.findCandidateComponents("org/xingo/consumer/batch/impl");
        for (BeanDefinition component : components) {
            try {
                Class<? extends BatchWork> clazz = (Class<? extends BatchWork>) Class.forName(component.getBeanClassName());
                BatchWork bean = applicationContext.getBean(clazz);
                map.put(clazz, bean);
                log.info("初始化加载类实例信息|{}|{}", component.getBeanClassName(), bean);
            } catch (Exception e) {
                log.error("加载类异常", e);
            }
        }

        // 初始标记服务
        markPointLog.initMarkPointLog();

        // 批处理任务启动方法
        map.values().forEach(BatchWork::run);
        try {
            // 标记任务启动成功
            while (WORK_NUMS.get() != map.size()) {
                TimeUnit.MICROSECONDS.sleep(5);
            }
            ok = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("批处理任务全部启动");
    }

    /**
     * 正常关闭处理
     */
    @PreDestroy
    public void clear() {
        try {
            // 批处理任务关闭方法
            map.values().forEach(BatchWork::clear);

            // 等待批量任务全部处理完成
            while (WORK_NUMS.get() != 0) {
                TimeUnit.SECONDS.sleep(1);
                log.info("等待批处理任务结束,剩余{}个任务关闭", WORK_NUMS.get());
            }

            // 清理标记文件
            markPointLog.destroyMarkPointLog();

            log.info("批处理任务全部关闭");
        } catch (Exception e) {
            log.error("清理本地批处理任务异常", e);
        }
    }

    /**
     * 获取批量执行任务类
     * @param workName
     * @return
     */
    public BatchWork get(Class<? extends BatchWork> workName) {
        if(ok) {
            return map.get(workName);
        } else {
            int cnt = 0;
            while (!ok) {
                try {
                    TimeUnit.MILLISECONDS.sleep(3);
                    if(cnt++ >= 10000) {
                        break;
                    }
                } catch (Exception e) {
                    log.error("获取批处理任务异常", e);
                }
            }
            return map.get(workName);
        }
    }

}

在BathWork抽象类中,数据批量插入数据库后要记录下当前插入数据的时间点工具类:MarkPointLog:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 数据标记点,每次插入数据后就要更新标记点
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class MarkPointLog {

    @Value("${user.dir}")
    private String dirPath;
    @Autowired
    private LoadDataService loadDataService;

    /**
     * 标记服务正常关闭文件路径:
     * 服务正常关闭时会将文件删除,
     * 下次启动时如果文件存在,表示上次是异常关闭的,那么就会处理历史消息重新发布一次处理
     */
    private String filePath = null;
    /**
     * 写文件
     */
    private static MappedByteBuffer buffer = null;

    /**
     * 锁对象
     */
    private static final ReentrantLock lock = new ReentrantLock();

    /**
     * 初始化
     */
    public void initMarkPointLog() {
        filePath = dirPath.endsWith(File.separator) ? (dirPath + "work") : (dirPath + File.separator + "work");
        File file = new File(filePath);
        if(file.exists()) {
            RandomAccessFile rw = null;
            try {
                long minTs = System.currentTimeMillis();
                rw = new RandomAccessFile(file, "r");
                SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                for (BatchWorkEnum mark : BatchWorkEnum.values()) {
                    long ts = rw.readLong();
                    if(ts < minTs) {
                        minTs = ts;
                    }
                    log.error("系统异常关闭最后提交数据时间戳|{}|{}|{}", mark, ts, datetimeFormat.format(ts));
                }

                final long min = minTs;
                new Thread(() -> loadDataService.loadData(min), "loadLogWork").start();
            } catch (Exception e) {
                log.error("读取状态文件异常", e);
            } finally {
                if(rw != null) {
                    try {
                        rw.close();
                    } catch (IOException e) {
                        log.error("读取状态文件异常", e);
                    }
                }
            }
        }

        // 映射运行状态文件
        FileChannel channel = null;
        try {
            channel = new RandomAccessFile(file, "rw").getChannel();
            int size = BatchWorkEnum.values().length * 8;
            buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
            buffer.putLong(0);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 清理标记点文件
     */
    public void destroyMarkPointLog() {
        if(filePath != null) {
            File file = new File(filePath);
            if(file.exists()) {
                boolean rs = file.delete();
                log.info("删除标识文件完成|{}|{}", filePath, rs);
            }
        }
    }

    /**
     * 最大标记点
     * @param workEnum
     * @param ts
     */
    public static void markPoint(BatchWorkEnum workEnum, long ts) {
        lock.lock();
        try {
            int position = workEnum.getIdx() * 8;
            buffer.position(position);
            buffer.putLong(ts);
            buffer.force();
        } finally {
            lock.unlock();
        }
    }

}

当数据在缓存队列中还未插入数据库之前系统宕机了,这就存在数据丢失风险,那么在系统启动后就要重新消费这部分数据:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Slf4j
@Component
public class LoadDataService {

    @Value("${user.dir}")
    private String dirPath;
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 加载数据到系统等待再次入库处理
     *
     * @param timestamp
     */
    public void loadData(long timestamp) {
        // 最小时间再减去一个时间间隔,认为这段时间内的数据都是不安全的
        long startTimestamp = timestamp - BatchWork.DIFF_TIMESTAMP;
        long endTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5);

        String logPath = dirPath + File.separator + "logs";
        File kafkaLogs = new File(logPath);
        File file1 = null, file2 = null;
        String yesterday = LocalDate.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));

        // 查找当前的日志文件
        if(kafkaLogs.exists() && kafkaLogs.isDirectory()) {
            File[] files = kafkaLogs.listFiles();
            if(files != null) {
                for(File file : files) {
                    String fname = file.getName();
                    if(fname.equals("redo.log")) {
                        log.info("匹配日志文件|{}", fname);
                        file1 = file;
                    } else if(fname.endsWith(yesterday)) {
                        log.info("匹配历史日志文件|{}", fname);
                        file2 = file;
                    }
                }
            }
        }
        if(file1 != null) {
            sendDataToQueue(file1, startTimestamp, endTimestamp);
        }
        if(file2 != null) {
            sendDataToQueue(file2, startTimestamp, endTimestamp);
        }
    }

    /**
     * 一行数据最大长度,用于数据查找时回退
     */
    private int LINE_MAX_SIZE = 1000;

    /**
     * 发送数据到消息
     * @param file
     * @param startTimestamp
     * @param endTimestamp
     */
    private void sendDataToQueue(File file, long startTimestamp, long endTimestamp) {
        // 查找数据:二分法查找要加载的数据
        RandomAccessFile raf = null;
        SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            raf = new RandomAccessFile(file, "rw");
            long start = 0L;
            long end = file.length();
            long point = 0L;
            String line = null;
            // 2024-09-23 10:20:25.387|batch-message|{"data":{"id":"1727058025075","code":"code1--1727058025075","name":"name1--1727058025075","random":"bdfa8ca9d5084c25b9267f34e7bff6cf","createTime":"2024-09-23 10:20:25"},"dataType":"Work1Data"}
            boolean find = false;
            while (true) {
                if(end - start < 60) {
                    break;
                }
                point = (start + end) / 2;
                raf.seek(Math.max(0, point - LINE_MAX_SIZE));
                line = raf.readLine();
                if (StringUtils.isBlank(line)) {
                    break;
                }
                String[] arr = line.split("\\|");
                if (arr.length != 3) {
                    line = raf.readLine();
                    if (line == null || "".equals(line)) {
                        break;
                    }
                    arr = line.split("\\|");
                } else {
                    try {
                        datetimeFormat.parse(arr[0]);
                    } catch (ParseException e) {
                        line = raf.readLine();
                        if (line == null || "".equals(line)) {
                            break;
                        }
                        arr = line.split("\\|");
                    }
                }

                long time = datetimeFormat.parse(arr[0]).getTime();

                if (time < startTimestamp) {
                    start = point;
                } else {
                    find = true;
                    break;
                }
            }

            if (find) {
                raf.seek(start);
                line = raf.readLine();
                String[] arr = line.split("\\|");
                if (arr.length != 3) {
                    line = raf.readLine();
                } else {
                    try {
                        datetimeFormat.parse(arr[0]);
                    } catch (ParseException e) {
                        line = raf.readLine();
                    }
                }

                while (StringUtils.isNotBlank(line)) {
                    arr = line.split("\\|");
                    long time = datetimeFormat.parse(arr[0]).getTime();
                    if(time > endTimestamp) {
                        break;
                    }
                    // 这里将可能丢失的数据重新发送到消息队列再次消费持久化
                    if(time >= startTimestamp) {
                        kafkaTemplate.send(arr[1], arr[2]);
                        log.info("加载可能丢失的数据|{}", arr[2]);
                    }

                    line = raf.readLine();
                }
            }
        } catch (Exception e) {
            log.error("读取文件异常", e);
        } finally {
            try {
                if(raf != null) {
                    raf.close();
                }
            } catch (IOException e) {
                log.error("关闭文件异常", e);
            }
        }
    }
}

是否加载数据主要是根据项目目录下释放存在一个标记文件:work。这个文件在项目启动时创建,在项目正常关闭时删除这个文件。在数据批量插入成功后会把标记点也写入到这个文件中,由于存在多个缓存队列,每个缓存队列需要写入到这个文件的不同位置,这里需要定义一个枚举类记录每个缓存队列写入到这个文件的位置:

import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;

/**
 * 批量任务名
 *
 * @Author xingo
 * @Date 2024/9/13
 */
public enum BatchWorkEnum {

    /**
     * 批量任务1
     */
    Work1((byte) 0, BatchWork1.class),

    /**
     * 批量任务2
     */
    Work2((byte) 1, BatchWork2.class),


    ;

    /**
     * 批量任务队列的序列号,是一个从0开始递增的值;
     * 它主要用于批量数据入库后最新数据时间点记录文件查找和写入时使用
     * 如果新增缓存队列这个序号递增,不能更改已经存在的序号
     * 如果要调整或删除已有的序号,要保证系统正常退出后再操作,否则会有数据丢失风险
     */
    private byte idx;

    /**
     * 批量任务对应的类
     */
    private Class<? extends BatchWork> work;

    BatchWorkEnum(byte idx, Class<? extends BatchWork> work) {
        this.idx = idx;
        this.work = work;
    }

    public byte getIdx() {
        return idx;
    }

    public Class<? extends BatchWork> getWork() {
        return work;
    }
}

五、数据对象处理

上面这些逻辑都完成了,最后一步就是数据持久化,我这里使用mysql数据库做持久化测试,数据对象和持久化代码如下:
统一的父类:

import java.io.Serializable;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
public class WorkData implements Serializable {

    /**
     * 数据放入缓存队列时间
     */
    protected long ts;

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }
}

两个实体类:

实体类1:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Data
@TableName("t_work1")
public class Work1Data extends WorkData {

    private Long id;

    private String code;

    private String name;

    private String random;

    private LocalDateTime createTime;
}

实体类2:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Data
@TableName("t_work2")
public class Work2Data extends WorkData {

    private Long id;

    private String code;

    private String name;

    private String random;

    private LocalDateTime createTime;
}

xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work1Mapper">

    <insert id="batchInsert" parameterType="hashmap">
        INSERT INTO t_work1 (id, code, name, random, ts, create_time) VALUES
        <foreach collection="datas" item="item" index="index" separator=",">
            (#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
        </foreach>
    </insert>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work2Mapper">

    <insert id="batchInsert" parameterType="hashmap">
        INSERT INTO t_work2 (id, code, name, random, ts, create_time) VALUES
        <foreach collection="datas" item="item" index="index" separator=",">
            (#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
        </foreach>
    </insert>
</mapper>

Mapper接口:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work1Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Mapper
public interface Work1Mapper extends BaseMapper<Work1Data> {

    void batchInsert(List<Work1Data> datas);
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work2Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Mapper
public interface Work2Mapper extends BaseMapper<Work2Data> {

    void batchInsert(List<Work2Data> datas);
}

数据库建表SQL语句:

CREATE TABLE `t_work1`  (
  `id` bigint NOT NULL,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `create_time` datetime NULL DEFAULT NULL,
  `ts` bigint NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

CREATE TABLE `t_work2`  (
  `id` bigint NOT NULL,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `create_time` datetime NULL DEFAULT NULL,
  `ts` bigint NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

六、测试验证

  1. 正常情况:验证正常插入情况非常简单,编写一个测试方法不断的向kafka的主题中写入数据,观察数据消费和入库情况,通过对比发送数据和接收数据是否存在差异来确定服务是否有问题。
  2. 正常退出:在正常运行情况下,对批量处理服务执行 kill -15 关闭程序,正常退出情况下项目根目录下的 work 文件会被清除,那么下次程序启动时就不需要重新加载日志数据。
  3. 异常退出:在正常运行情况下对批量处理服务执行kill -9 命令杀死进程,观察项目根目录下的 work 文件还存在,并没有被清除,这就表明程序在上次退出时是异常退出的,那么在程序再次启动时会重新加载日志数据,避免程序异常导致数据丢失的风险。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/882977.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于JAVA+SpringBoot+Vue的医院后台管理系统

基于JAVASpringBootVue的医院后台管理系统 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345; 哈…

企业源代码也需要加密?2024十款源代码加密软件排行榜

在当今数字化时代&#xff0c;源代码作为企业的核心资产&#xff0c;其重要性不言而喻。源代码不仅是企业技术创新的体现&#xff0c;更是商业机密和竞争优势的基石。然而&#xff0c;随着网络攻击和数据泄露事件的频发&#xff0c;源代码的安全性面临着前所未有的挑战。企业源…

【Unity3d Shader】毛玻璃效果

毛玻璃也叫​磨砂玻璃​:是用物理或化学方法处理过的一种表面粗糙不平整的半透明玻璃。 毛玻璃成像原理:毛玻璃表面不平整,光线通过毛玻璃被反射后向四面八方射出去(因为毛玻璃表面不是光滑的平面,使光产生了漫反射),折射到视网膜上已经是不完整的像,于是就看不清楚(…

菱形继承的类对父类的初始化、组合、多态、多态的原理等的介绍

文章目录 前言一、菱形继承的类对父类的初始化二、组合三、 多态1. 构成多态2. 虚函数3. 虚函数的重写4. 虚函数重写的两个例外1. 协变2. 析构函数的重写 5. C11 final 和 override1. final2. override 6. 设计不想被继承的类7. 重载、覆盖&#xff08;重写&#xff09;、 隐藏…

MoFA: 迈向AIOS

再一次向朋友们致以中秋的祝福&#xff01; MoFA (Modular Framework for Agents)是一个独特的模块化AI智能体框架。MoFA以组合&#xff08;Composition)的逻辑和编程&#xff08;Programmable&#xff09;的方法构建AI智能体。开发者通过模版的继承、编程、定制智能体&#xf…

黑马智数Day4-2

渲染基础Table列表 封装获取企业列表接口 export function getEnterpriseListAPI(params) {return request({url: /park/enterprise,params}) } 组件中获取数据 <script> import { getEnterpriseListAPI } from /apis/enterprise export default {name: Building,dat…

QT C++ 自学积累 『非技术文』

QT C 自学积累 『非技术文』 最近一段时间参与了一个 QT 项目的开发&#xff0c;使用的是 C 语法&#xff0c;很遗憾的是我之前从来没有接触过 C &#xff0c;大学没有开过这堂课&#xff0c;也没用自己学习过&#xff0c;所有说上手贼慢&#xff0c;到现在为止其实也不是很清楚…

大数据毕业设计选题推荐-超市进货推荐系统-Hive-Hadoop-Spark

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

如何瞬间建造一个百亿商品的网上商城?借助API,无需逐个上传商品数据,自动对接电商平台百亿商品数据

在快速发展的电商时代&#xff0c;构建一个拥有百亿级商品数据的网上商城曾是许多企业遥不可及的梦想。然而&#xff0c;随着技术的不断进步&#xff0c;特别是电商平台API接口的广泛应用&#xff0c;这一梦想正逐渐变为现实。本文将详细介绍如何通过调用电商平台API接口&#…

# 高可用的并发解决方案nginx+keepalived(四)

高可用的并发解决方案nginxkeepalived&#xff08;四&#xff09; 一、Keepalived安装 1、keepalived 介绍 Keepalived 是一种高性能的服务器高可用或热备解决方案&#xff0c;Keepalived 可以用来防止服务器单点故障的发生&#xff0c;通过配合 Nginx 可以实现 web 前端服务…

Git从了解到操作

Git常用命令 基本的linux命令 ls / ll 查看当前目录( ls 是查看目录有哪些文件夹&#xff0c;ll 是查看隐藏文件)cat 查看文件内容touch 创建文件vi vi编辑器 (使用 vi 编辑器是为了方便展示效果&#xff0c;也可以记事本、editPlus、notPad等其它编辑器) 备注 Git GUl: Gi…

Coze:如何使用插件商店?

你好&#xff0c;我是三桥君 本节我们来介绍插件商店。 点击左侧菜单的插件商店。可以看到&#xff0c;打开了插件商店的页面。 插件商店的顶部是“搜索框”&#xff0c;这里可以搜索整个插件商店的插件。 比如&#xff0c;我们输入“图片”进行搜索&#xff0c;会发现下方列…

华为昇腾系列-jupyter安装torch_npu

使用背景 国产算力的兴起&#xff0c;异构算力成为各大厂商的选择&#xff0c;以摆脱对英伟达算力过大的依赖&#xff0c;保障算力安全。本文将会讲解如何使用昇腾算力卡来制作一个镜像&#xff0c;然后交给k8s进行算力调度&#xff0c;显示国产算力的真正应用落地。 安装步骤…

【Python篇】深入机器学习核心:XGBoost 从入门到实战

文章目录 XGBoost 完整学习指南&#xff1a;从零开始掌握梯度提升1. 前言2. 什么是XGBoost&#xff1f;2.1 梯度提升简介 3. 安装 XGBoost4. 数据准备4.1 加载数据4.2 数据集划分 5. XGBoost 基础操作5.1 转换为 DMatrix 格式5.2 设置参数5.3 模型训练5.4 预测 6. 模型评估7. 超…

网络安全前景大好,转行这些职位成了“香饽饽”

网络安全就业前景 大数据、人工智能、云计算、物联网、5G等新兴技术的高速发展&#xff0c;蒸蒸日上。但是随之也出现了许多问题&#xff0c;比如&#xff1a;政府单位、企业、个人信息泄露&#xff0c;网络安全问题日益严峻&#xff0c;网络空间安全建设刻不容缓。 网络安全…

真正厉害的项目经理都做到了这一点

在项目管理中&#xff0c;沟通是贯穿始终的关键要素&#xff0c;它能够打开团队协作的大门&#xff0c;推动项目稳步前行。 而那些真正厉害的项目经理&#xff0c;无一不是沟通的高手&#xff0c;他们通过清晰、有效且富有策略性的沟通&#xff0c;确保了项目的每一个环节都紧密…

小程序地图展示poi帖子点击可跳转

小程序地图展示poi帖子点击可跳转 是类似于小红书地图功能的需求 缺点 一个帖子只能有一个点击事件&#xff0c;不适合太复杂的功能&#xff0c;因为一个markers只有一个回调回调中只有markerId可以使用。 需求介绍 页面有地图入口&#xff0c;点开可打开地图界面地图上展…

x-cmd pkg | gotop - 简洁美观的终端监控工具,内存 CPU 网络 清晰可见

目录 简介用户首次快速实验指南友好的监控界面可定制化界面类似工具进一步探索 简介 gotop 是一个基于终端(TUI)的系统监视器。由 Andrew Gallant 于 2018 年 1 月发布。Andrew Gallant 在2020 年 8 月停止了对 gotop 的维护转由 Gregory Anders 维护和更新。 gotop 在终端使…

【ARM】armv8的虚拟化深度解读

Type-1 hypervisor Type-1虚拟化也叫做Bare metal, standalone, Type1 Type2 hypervisor Type-2虚拟化也叫做hosted, Type-2 VM和vCPU(虚拟机和虚拟cpu) 在一个VM&#xff08;虚拟机&#xff09;中有多个vCPU&#xff0c;多个vCPU可能属于同一个Vritual Processor。 EL2…

mysql如何快速编写单表查询语句

目录 背景生成sql语句 背景 我们在编写查询语句的时候&#xff0c;都提倡需要用到哪些字段就查哪些字段&#xff0c;这样有两个好处&#xff1a;1、可以增加程序运行效率&#xff0c;2、可以避免无关字段被暴露。那我们一个字段一个字段写就比较烦&#xff0c;有没有方法快速生…