背景
笔者有一个需求是把将近一亿条数据上传到FTP服务器中,这些数据目前是存储在mysql中,是通过关联几张表查询出来的,查询出来的数据结果集一共是6个字段。要求传输的时候拆分成一个个小文件,每个文件大小不能超过500M。我的测试思路是对测试数据进行分页查询,比如每次分页查询10万条数据,写入到一个txt格式的文件中,攒到50万条数据时,把这个txt文件上传到Ftp中(粗略估算了一下,每个字段长度假设不超过255,),这就是一个小文件的上传。
一、windows下FTP的安装
笔者的开发环境是windows11,所以必须要搭建一个FTP环境以供测试使用
配置IIS web服务器
打开运行窗口【win+R】快捷键,输入 optionalfeatures
后点击确定:
在出来的弹框中找到Internet信息服务,并打开勾选以下配置 ,点击确定,等待windows系统自行添加相关应用配置
配置IIS web站点
现在本地磁盘创建一个FtpServer
空文件夹
然后查看本机IP地址
打开运行【win+R】窗口输入cmd回车
然后输入ipconfig 查看IP
笔者本机连接的是无线网络,如果是连接的有线网络,则需要找对应的以太网适配器连接配置
接着 在开始栏中搜索 IIS 并点击进入IIS管理器
打开后在左侧 “网站” 右键菜单 打开 “添加FTP站点”
主要是填写FTP站点名称和服务的物理路径
点击下一页,填写本机当前网络的ip地址
再点下一页完成身份验证和授权信息
点击完成后,ftp服务器的windows搭建就结束了
打开防火墙,把以下服务勾选上
建立 FTP 服务之后,默认登陆 FTP 服务器的账号和密码就是本机 Administrator 的账户和密码,但是笔者不记得密码了,所以创建一个用户来管理FTP登录
此电脑->右击->显示更多选项->单击管理->本地用户和用户组->用户->右击创建新用户
ftp用户名和密码记好了
再在开始菜单找到IIS服务,点击FTP授权规则
右击编辑权限
点击添加
输入刚才创建的ftp用户名称,点击检查名称
把下面的权限都勾选上,点击确定
回到 Internet Information Services (IIS) 管理器,双击刚才选中的 “FTP授权规则”,点击右侧的"添加允许规则"
然后别忘了启动ftp,右击管理ftp站点,启动
登录ftp
地址是ftp://192.168.1.105,进入此电脑,输入地址回车
输入用户名和密码可以登录
至于浏览器访问,这在很早之前是可以的,但是后来各大浏览器厂商都禁止使用浏览器访问ftp资源,这里也就作罢了
更换ftp的ip
当本机网络环境发生改变时,比如无线网环境变了,导致ip地址变了,那么之前设置好的ip地址就失效了,ftp无法连接。
点开IIS管理器,点击绑定
点击编辑,修改IP地址即可
二、java连接ftp服务器
笔者使用java语言,所以给出springboot框架下访问ftp的方法
首先引入pom依赖 Apache Commons net
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.10.0</version> <!-- 或者使用最新的版本 -->
</dependency>
我这里使用的是最新版,jdk21,可以根据自己的jdk版本适当降低版本,不报错就可以
FTP连接工具类
package com.execute.batch.executebatch.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import java.io.*;
import java.time.Duration;
/**
* FTP工具类
* @author hulei
*/
@Slf4j
public class FtpUtil {
/**
* 上传文件到FTP服务器的根目录
*
* @param host FTP服务器地址
* @param port FTP服务器端口号,默认为21
* @param username 用户名
* @param password 密码
* @param localFile 本地要上传的文件
* @return 成功返回true,否则返回false
*/
public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {
FTPClient ftpClient = null;
FileInputStream fis = null;
try {
ftpClient = connectAndLogin(host, port, username, password);
setBinaryFileType(ftpClient);
ftpClient.setConnectTimeout(1000000000);
Duration timeout = Duration.ofSeconds(1000000000);
ftpClient.setDataTimeout(timeout);
String remoteFileName = localFile.getName();
fis = new FileInputStream(localFile);
return ftpClient.storeFile(remoteFileName, fis);
} catch (IOException e) {
log.error("上传文件失败", e);
return false;
} finally {
assert ftpClient != null;
disconnect(ftpClient);
if(fis != null){
try {
fis.close();
} catch (IOException e) {
log.error("关闭文件流失败", e);
}
}
}
}
/**
* 上传文件到FTP服务器的指定路径
*
* @param host FTP服务器地址
* @param port FTP服务器端口号,默认为21
* @param username 用户名
* @param password 密码
* @param remotePath FTP服务器上的目标路径
* @param localFile 本地要上传的文件
* @return 成功返回true,否则返回false
*/
public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {
FTPClient ftpClient = null;
FileInputStream fis = null;
try {
ftpClient = connectAndLogin(host, port, username, password);
setBinaryFileType(ftpClient);
ftpClient.setConnectTimeout(1000000000);
Duration timeout = Duration.ofSeconds(1000000000);
ftpClient.setDataTimeout(timeout);
createRemoteDirectories(ftpClient, remotePath);
String remoteFileName = localFile.getName();
String fullRemotePath = remotePath + "/" + remoteFileName;
fis = new FileInputStream(localFile);
return ftpClient.storeFile(fullRemotePath, fis);
} catch (IOException e) {
log.error("上传文件失败", e);
return false;
} finally {
assert ftpClient != null;
disconnect(ftpClient);
if(fis != null){
try {
fis.close();
} catch (IOException e) {
log.error("关闭文件流失败", e);
}
}
}
}
/**
* 在FTP服务器上创建指定路径所需的所有目录
*
* @param ftpClient FTP客户端
* @param remotePath 需要创建的远程路径
* @throws IOException 如果在创建目录时发生错误
*/
private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {
String[] directories = remotePath.split("/");
String currentPath = "";
for (String dir : directories) {
if (!dir.isEmpty()) {
currentPath += "/" + dir;
if (!ftpClient.changeWorkingDirectory(currentPath)) {
if (!ftpClient.makeDirectory(dir)) {
throw new IOException("无法创建远程目录: " + currentPath);
}
ftpClient.changeWorkingDirectory(dir);
}
}
}
}
/**
* 连接到FTP服务器并登录。
*
* @param host FTP服务器的主机名或IP地址。
* @param port FTP服务器的端口号。
* @param username 登录FTP服务器的用户名。
* @param password 登录FTP服务器的密码。
* @return 成功连接并登录后返回一个FTPClient实例,可用于后续操作。
* @throws IOException 如果连接或登录过程中遇到任何网络问题,则抛出IOException。
*/
private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {
FTPClient ftpClient = new FTPClient();
ftpClient.connect(host, port);
//如果不去确定ftp端口,或者想使用ftp随机端口上传,请使用如下代码,参数中的port可以传null了
//ftpClient.enterLocalActiveMode();
//ftpClient.connect(host);
ftpClient.login(username, password);
int replyCode = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(replyCode)) {
throw new IOException("连接FTP服务器失败");
}
return ftpClient;
}
/**
* 断开与FTP服务器的连接。
* 该方法首先检查FTP客户端是否已连接到服务器。如果已连接,则尝试登出,
* 如果登出失败,记录错误信息。接着尝试断开与服务器的连接,如果断开失败,同样记录错误信息。
*
* @param ftpClient 与FTP服务器交互的客户端对象。
*/
private static void disconnect(FTPClient ftpClient) {
if (ftpClient.isConnected()) {
try {
ftpClient.logout();
} catch (IOException ioe) {
log.error("登出FTP服务器失败", ioe);
}
try {
ftpClient.disconnect();
} catch (IOException ioe) {
log.error("断开FTP服务器连接失败", ioe);
}
}
}
/**
* 设置FTP客户端的文件传输类型为二进制。
* 这个方法尝试将FTP文件传输类型设置为BINARY,这是进行二进制文件传输的标准方式。
* 如果设置失败,会抛出一个运行时异常。
*
* @param ftpClient 用于文件传输的FTP客户端实例。
* @throws RuntimeException 如果设置文件传输类型为二进制时发生IOException异常。
*/
private static void setBinaryFileType(FTPClient ftpClient) {
try {
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
} catch (IOException e) {
throw new RuntimeException("设置传输二进制文件失败", e);
}
}
}
主要提供了两个方法uploadFileToRoot
和uploadFileToPath
,前者是上传到ftp服务器根目录下,后者上传到指定目录下,其中的连接时间设置的有点夸张,主要是传输时间长、数据量大,害怕断开。
注意:所有涉及到操作文件的流,包括输入流和输出流,使用完了,要及时关闭,否则占用资源不说,还会导致临时生成的文件无法删除。
笔者在ftp服务器下新建了一个文件,测试上传一个txt格式的文本文件,一个上传到根目录下,一个上传到newFile
文件夹里
测试用例代码
package com.execute.batch.executebatch.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
/**
* FTP工具类
* @author hulei
*/
@Slf4j
public class FtpUtil {
private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {
FTPClient ftpClient = new FTPClient();
ftpClient.connect(host, port);
ftpClient.login(username, password);
int replyCode = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(replyCode)) {
throw new IOException("连接FTP服务器失败");
}
return ftpClient;
}
private static void disconnect(FTPClient ftpClient) {
if (ftpClient.isConnected()) {
try {
ftpClient.logout();
} catch (IOException ioe) {
log.error("登出FTP服务器失败", ioe);
}
try {
ftpClient.disconnect();
} catch (IOException ioe) {
log.error("断开FTP服务器连接失败", ioe);
}
}
}
private static void setBinaryFileType(FTPClient ftpClient) {
try {
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
} catch (IOException e) {
throw new RuntimeException("设置传输二进制文件失败", e);
}
}
/**
* 上传文件到FTP服务器的根目录
* @param host FTP服务器地址
* @param port FTP服务器端口号,默认为21
* @param username 用户名
* @param password 密码
* @param localFile 本地要上传的文件
* @return 成功返回true,否则返回false
*/
public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {
FTPClient ftpClient = null;
try {
ftpClient = connectAndLogin(host, port, username, password);
setBinaryFileType(ftpClient);
String remoteFileName = localFile.getName();
return ftpClient.storeFile(remoteFileName, new FileInputStream(localFile));
} catch (IOException e) {
log.error("上传文件失败", e);
return false;
} finally {
assert ftpClient != null;
disconnect(ftpClient);
}
}
/**
* 上传文件到FTP服务器的指定路径
* @param host FTP服务器地址
* @param port FTP服务器端口号,默认为21
* @param username 用户名
* @param password 密码
* @param remotePath FTP服务器上的目标路径
* @param localFile 本地要上传的文件
* @return 成功返回true,否则返回false
*/
public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {
FTPClient ftpClient = null;
try {
ftpClient = connectAndLogin(host, port, username, password);
setBinaryFileType(ftpClient);
createRemoteDirectories(ftpClient, remotePath);
String remoteFileName = localFile.getName();
String fullRemotePath = remotePath + "/" + remoteFileName;
return ftpClient.storeFile(fullRemotePath, new FileInputStream(localFile));
} catch (IOException e) {
log.error("上传文件失败", e);
return false;
} finally {
assert ftpClient != null;
disconnect(ftpClient);
}
}
/**
* 在FTP服务器上创建指定路径所需的所有目录
* @param ftpClient FTP客户端
* @param remotePath 需要创建的远程路径
* @throws IOException 如果在创建目录时发生错误
*/
private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {
String[] directories = remotePath.split("/");
String currentPath = "";
for (String dir : directories) {
if (!dir.isEmpty()) {
currentPath += "/" + dir;
if (!ftpClient.changeWorkingDirectory(currentPath)) {
if (!ftpClient.makeDirectory(dir)) {
throw new IOException("无法创建远程目录: " + currentPath);
}
ftpClient.changeWorkingDirectory(dir);
}
}
}
}
}
执行后查看ftp服务器
发现根目录下和文件夹下都有上传的文件了
批量数据生成
笔者这里只模拟生成500万条数据,供测试使用
批处理工具类
package com.execute.batch.executebatch.utils;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.function.BiFunction;
@Component
public class BatchInsertUtil {
@Resource
private final SqlSessionFactory sqlSessionFactory;
public BatchInsertUtil(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
}
/**
* 批量插入数据
* @param entityList 待插入的数据列表
* @param mapperClass 映射器接口的Class对象
*/
@SuppressWarnings("all")
public <T,U,R> int batchInsert(List<T> entityList, Class<U> mapperClass, BiFunction<T,U,R> function) {
int i = 1;
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
try {
U mapper = sqlSession.getMapper(mapperClass);
for (T entity : entityList) {
function.apply(entity,mapper);
i++;
}
sqlSession.flushStatements();
sqlSession.commit();
} catch (Exception e) {
throw new RuntimeException("批量插入数据失败", e);
}finally {
sqlSession.close();
}
return i-1;
}
}
跑批数据
package com.execute.batch.executebatch;
import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.BatchInsertUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* DataSeeder 批量生成数据
* @author hulei
*/
@Component
public class DataSeeder implements CommandLineRunner {
@Resource
private ApplicationContext applicationContext;
private ExecutorService executorService;
private static final int TOTAL_RECORDS = 5000000;
private static final int BATCH_SIZE = 10000;
private static final int THREAD_POOL_SIZE = 10;
@PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}
@Override
public void run(String... args) {
long startTime = System.currentTimeMillis();
List<Runnable> tasks = new ArrayList<>();
for (int i = 0; i < TOTAL_RECORDS; i += BATCH_SIZE) {
int finalI = i;
tasks.add(() -> insertBatch(finalI, BATCH_SIZE));
}
tasks.forEach(executorService::execute);
executorService.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("Total time taken: " + (endTime - startTime) / 1000 + " seconds.");
}
public void insertBatch(int startId, int batchSize) {
List<User> batch = new ArrayList<>(batchSize);
Random random = new Random();
for (int i = 0; i < batchSize; i++) {
User user = createUser(startId + i, random);
batch.add(user);
System.out.println(user);
}
BatchInsertUtil util = new BatchInsertUtil(applicationContext.getBean(SqlSessionFactory.class));
util.batchInsert(batch, UserMapper.class, (item,mapper)-> mapper.insertBatch(item));
}
private User createUser(int id, Random random) {
User user = new User();
user.setId(id);
user.setName("User" + id);
user.setEmail("user" + id + "@example.com");
user.setAge(random.nextInt(80) + 20); // 年龄在20到99之间
user.setAddress("Address" + id);
user.setPhoneNumber("1234567890"); // 简化处理,实际应生成随机电话号码
return user;
}
}
整个生成过程是十分漫长的,40分钟左右,数据查询结果生成了500万条数据
测试上传ftp
RowBounds手动分页多个传参示例
下面展示的两个是mybatis手动分页的写法,如果有其他查询参数,则可以建一个实体类,把rowbounds参数囊括进去作为一个属性即可,以下是示例
mapper接口层
mybatis的xml
测试用例
package com.execute.batch.executebatch.controller;
import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.FtpUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.RowBounds;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* @author hulei
* @date 2024/5/22 10:42
*/
@RestController
@RequestMapping("/FTP")
@Slf4j
public class FTPController {
@Resource
private UserMapper userMapper;
private final Object lock = new Object();
@GetMapping(value = "/upload")
public void upload() throws InterruptedException {
String host = "192.168.1.103";
int port = 21; // 默认FTP端口
String username = "hulei";
String password = "hulei";
int pageSize = 450000;
int offset = 0;
int uploadCycle = 0;
int totalUploaded = 0;
boolean noData = false;
while (true) {
uploadCycle++;
// 将查询结果处理并写入本地文件
File tempFile = new File("D:/FTPFile", "user_data_" + uploadCycle + ".txt");
while (true) {
RowBounds rowBounds = new RowBounds(offset, pageSize);
List<User> list = userMapper.queryBatch(rowBounds);
if (!list.isEmpty()) {
MultiThreadWriteToFile(list, tempFile, getConsumer());
offset += pageSize;
totalUploaded += list.size();
}
if (list.isEmpty()) {
noData = true;
break;
}
// 检查总数据量是否达到500000,如果达到则上传文件
if (totalUploaded >= 600000) {
break;
}
}
// 上传本地文件到FTP服务器
if(!tempFile.exists()){
break;
}
boolean uploadSuccess = FtpUtil.uploadFileToRoot(host, port, username, password, tempFile);
if (uploadSuccess) {
System.out.println("文件上传成功");
} else {
System.out.println("文件上传失败");
}
System.out.println("上传完成,已上传" + uploadCycle + "个批次");
totalUploaded = 0;
if (noData) {
break;
}
}
}
private <T> void MultiThreadWriteToFile(List<T> list, File tempFile, BiConsumer<BufferedWriter, T> writeItemConsumer) throws InterruptedException {
Path filePath = tempFile.toPath(); // 将文件对象转换为路径对象,用于后续的文件写入操作。
try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8,
StandardOpenOption.CREATE, // 如果文件不存在则创建
StandardOpenOption.WRITE, // 打开文件进行写入
StandardOpenOption.APPEND)) { // 追加模式写入,而不是覆盖 // 使用 UTF-8 编码打开文件缓冲写入器。
ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池,包含10个线程。
BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(list.size()); // 创建一个阻塞队列,用于存储要处理的任务索引。
for (int i = 0; i < list.size(); i++) { // 预填充任务队列,为每个列表元素创建一个任务。
taskQueue.add(i);
}
for (int i = 0; i < list.size(); i++) { // 提取队列中的索引,并提交相应的任务给线程池执行。
int index = taskQueue.take();
executor.submit(() -> writeItemConsumer.accept(writer, list.get(index)));
}
executor.shutdown(); // 关闭线程池,等待所有任务完成。
boolean terminated = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 等待线程池中的所有任务完成。
if (!terminated) { // 如果线程池在指定时间内未能关闭,则记录警告信息。
log.warn("线程池关闭超时");
}
} catch (IOException e) { // 捕获并记录文件操作相关的异常。
log.error("创建或写入文件发生错误: {},异常为: {}", tempFile.getAbsolutePath(), e.getMessage());
}
}
private BiConsumer<BufferedWriter, User> getConsumer() {
return (writer, item) -> {
String str = String.join("|",
String.valueOf(item.getId()),
item.getName(),
item.getEmail(),
String.valueOf(item.getAge()),
item.getAddress(),
item.getPhoneNumber()
);
log.info("告警入湖数据拼接字符串:{}", str);
try {
synchronized (lock) {
writer.write(str);
writer.newLine();
}
} catch (IOException e) {
log.error("写入告警入湖数据发生异常: {}", e.getMessage());
}
};
}
}
简单分析下:分页查询数据,每次查询pageSize条数据,写入一个txt文件,当写入的总条数超过totalUpload时,就跳出内部while循环,上传当前txt文件。然后进入第二次外层while循环,创建第二个txt文件,内部循环分页查询数据写入第二个txt文件。。。以此类推,直至最后查不出数据为止。
注意:pageSize和totalUpload最好是倍数关系,比如pageSize = 50000,那么totalUpload最好是pageSize 的整数倍,如100000,150000,200000,这样可以保证当文件数较多时,大部分的文件中数据条数一样。
以下是我分批上传到ftp服务器的文件,一共500万条数据,字段做了处理,使用 | 拼接
写入的数据是乱序的,要求顺序写入的话,就不要使用多线程了。