大文件分片上传、分片进度以及整体进度、断点续传【前端原生、后端 Koa、Node 原生】(一)

分片进度

效果展示,一个分片是 500MB 的

请添加图片描述
这个分片大小是 10MB 的
请添加图片描述


大文件分片上传

效果展示
请添加图片描述


前端

思路

前端的思路:将大文件切分成多个小文件,然后并发给后端。

页面构建

先在页面上写几个组件用来获取文件。

<body>
  <input type="file" id="file" />
  <button id="uploadButton">点击上传</button>
</body>

功能函数:生成切片

切分文件的核心函数是 slice,没错,就是这么的神奇啊

我们把切好的 chunk 放到数组里,等待下一步的包装处理

/**
 * 默认切片大小 10 MB
 */
const SIZE = 10 * 1024 * 1024;

/**
 * 功能:生成切片
 */
function handleCreateChunk(file, size = SIZE) {
  const fileChunkList = [];
  let cur = 0;
  while (cur < file.size) {
    fileChunkList.push({
      file: file.slice(cur, cur + size),
    });
    cur += size;
  }
  return fileChunkList;
}

功能函数:请求逻辑

在这里简单封装一下 XMLHttpRequest

/**
 * 功能:封装请求
 * @param {*} param0
 * @returns
 */
function request({ url, method = 'post', data, header = {}, requestList }) {
  return new Promise((resolve, reject) => {
    let xhr = new XMLHttpRequest();
    xhr.open(method, url);
    Object.keys(header).forEach((item) => {
      xhr.setRequestHeader(item, header[item]);
    });
    xhr.onloadend = function (e) {
      resolve({
        data: e.target.response,
      });
    };
    xhr.send(data);
  });
}

功能函数:上传切片

/**
 * 功能: 上传切片
 * 包装好 FormData 之后通过 Promise.all() 并发所有切片
 */
async function uploadChunks(hanldleData, fileName) {
  const requestList = hanldleData
    .map(({ chunk, hash }) => {
      const formData = new FormData();
      formData.append('chunk', chunk);
      formData.append('hash', hash);
      formData.append('filename', fileName);
      return formData;
    })
    .map((formData) => {
      request({
        // url: 'http://localhost:3001/upload',
        url: 'upload',
        data: formData,
      });
    });

  await Promise.all(requestList);
}

/**
 * 功能:触发上传
*/
document.getElementById('uploadButton').onclick = async function () {
  // 切片
  const file = document.getElementById('file').files[0];
  console.log(file);
  const fileName = file.name;
  const fileChunkList = handleCreateChunk(file);
  // 包装
  const hanldleData = fileChunkList.map(({ file }, index) => {
    return {
      chunk: file,
      hash: `${fileName}_${index}`,
    };
  });
  await uploadChunks(hanldleData, fileName);
};

可以在请求中看到有很多个请求并发的上传

在这里插入图片描述

优化:进度条的生成

自己简单撸了几个 cube 进度条

<style>
  #uploadCube {
    margin-top: 10px;
    /* width: 520px; */
    overflow: hidden;
  }

  .cube {
    width: 50px;
    height: 50px;
    background-color: #fff;
    float: left;
    border: 1px solid #000;
    .progress {
      height: 100%;
      line-height: 50px;
      text-align: center;
    }
    .uploading {
      background-color: #409eff;
    }
    .success {
      background-color: #51f400;
    }
    .error {
      background-color: #ff9090;
    }
  }
</style>
<body>
  <input type="file" id="file" />
  <button id="uploadButton">点击上传</button>
  <div id="uploadCube"></div>
</body>
/**
 * 功能:生成页面进度的 HTML
 */
function handleUpdateHTML(progressData) {
  let uploadCube = document.querySelector('#uploadCube');
  let html = '';
  progressData.forEach((item) => {
    const { presentage } = item;
    let className = '';
    if (presentage < 100) {
      className = 'progress uploading';
    } else if (presentage == 100) {
      className = 'progress success';
    }
    html += ` <div class="cube">
    <div class="${className}" style="width: ${presentage}%">${presentage}%</div>
  </div>`;
  });
  uploadCube.innerHTML = html;
}

/**
 * 功能:处理每个 chunk 的 xhr.upload.onprogress,拿到各个 chunk 的上传进度
 * - 1. 同时通过 handleUpdateHTML 更新进度页面
 * - 2. progressData 用来记录各个 chunk 的进度
 */
let progressData = [];
function handleCreateOnProgress(data) {
  return (e) => {
    data.presentage = ((e.loaded / e.total) * 100).toFixed(2);
    console.log(JSON.stringify(progressData));
    handleUpdateHTML(progressData);
  };
}

后端 (Koa)

后端的思路是:

  1. 把 Node 暂存的 chunk 文件转移到我想处理的地方(也可以直接处理,看你的)
  2. 创建写入流,把各个 chunk 合并,前端会给你每个 chunk 的大小,还有 hash 值来定位每个 chunk 的位置

获取 chunk 切片文件

先把上传的接口写好

const Koa = require('koa');
const Views = require('koa-views');
const Router = require('koa-router');
const Static = require('koa-static');
const { koaBody } = require('koa-body');
const fs = require('fs');
const fse = require('fs-extra');

const app = new Koa();
const router = new Router();
app.use(Views(__dirname));
app.use(Static(__dirname));
app.use(
  koaBody({
    multipart: true,
    formidable: {
      maxFields: 1000 * 1024 * 1024,
    },
  })
);

router.get('/', async (ctx) => {
  await ctx.render('index.html');
});

/**
 * 功能:上传接口
 * - 从 ctx.request.body 中获取 hash 以及 filename
 * - 从 ctx.request.files 中拿到分片数据
 * - 然后再把 node 帮我们临时存放的 chunk 文件的 filepath 拿到,之后移动到我们想要存放的路径下
 * - filepath 和 hash 是一一对应的关系
 */
router.post('/upload', async (ctx) => {
  const { hash, filename } = ctx.request.body;
  const { filepath } = ctx.request.files?.chunk;
  const chunkPath = `${__dirname}/chunkPath/${filename}`;
  if (!fse.existsSync(chunkPath)) {
    await fse.mkdirs(chunkPath);
  }
  await fse.move(filepath, `${chunkPath}/${hash}`);
  ctx.body = {
    code: 1,
  };
});

app.use(router.routes());
app.listen(3000, () => {
  console.log(`server start: http://localhost:3000`);
});

写完这些就可以拿到 chunk
在这里插入图片描述

合并接口

先写一个接口,用来拿到 hash文件名

/**
 * 功能: merge 接口
 * - hasMergeChunk 变量是上面用来记录的
 * - mergePath 定义一下合并后的文件的路径
 */
router.post('/merge', async (ctx) => {
  // console.log(ctx.request.body);
  const { fileName, size } = ctx.request.body;
  hasMergeChunk = {};
  const mergePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(`${__dirname}/merge`)) {
    fse.mkdirSync(`${__dirname}/merge`);
  }
  await mergeChunk(mergePath, fileName, size);
  ctx.body = {
    data: '成功',
  };
});

合并分片的功能函数

然后开始合并

/**
 * 功能:合并 Chunk
 * - 1. chunkDir: 是 chunks 文件们所在的文件夹的路径
 * - 2. chunkPaths: 是个 Array,数组中包含所有的 chunk 的 path
 * - 3. 因为 每个 chunk 的 path 命名是通过 hash 组成的,所以我们先排序一下,
 * - 算是为 createWriteStream 中的 start 做准备
 * - 4. 为每个 chunk 的 path 创建写入流,写到 mergePath 这个路径下。因为已经
 * - 排序了,所以 start 就是每个文件的 index * eachChunkSize
 * @param {*} mergePath
 * @param {*} name
 * @param {*} eachChunkSize
 */
async function mergeChunk(mergePath, name, eachChunkSize) {
  const chunkDir = `${__dirname}/chunkPath/${name}`;
  const chunkPaths = await fse.readdir(chunkDir);
  chunkPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);

  await Promise.all(
    chunkPaths.map((chunk, index) => {
      const eachChunkPath = `${chunkDir}/${chunk}`;
      const writeStream = fse.createWriteStream(mergePath, {
        start: index * eachChunkSize,
      });
      return pipeStream(eachChunkPath, writeStream);
    })
  );
  console.log('合并完成');
  fse.rmdirSync(chunkDir);
  console.log(`删除 ${chunkDir} 文件夹`);
}

接着就是写入流

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入输出流](https://www.jmjc.tech/less/111)
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测输出流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path); // 输出流
    readStream.pipe(writeStream); // 输出通过管道流向输入
    readStream.on('end', () => {
      hasMergeChunk[path] = 'finish';
      fse.unlinkSync(path); // 删除此文件
      resolve();
      console.log(`合并 No.${path.split('_')[1]}, 已经合并${Object.keys(hasMergeChunk).length}`);
    });
  });
}

至此一个基本的逻辑上传就做好了!


后端 (Node 原生)

想了想还是有必要用原生写一下 ,复习一下。

基础:搭建简单的服务

先写一个基本的服务框架

const http = require('http');
const server = http.createServer();
server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }
  res.end('hello node');
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

基础:资源返回

添加页面的返回,以及资源的返回

const http = require('http');
const server = http.createServer();
const url = require('url');
const fs = require('fs');
const path = require('path');
const MIME = require('./mime.json');
server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  res.setHeader('content-type', 'text/html;charset=utf-8');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }

  let { pathname } = url.parse(req.url); // 解析一下 url,因为 req.url 可能会带一些参数
  //   console.log('req.url:>>', req.url);
  //   console.log('url.parse(req.url):>>', url.parse(req.url));
  console.log(`进入${pathname}`);
  switch (pathname) {
    case '/':
    case '/index': {
      let rs = fs.createReadStream('./index.html');
      rs.pipe(res);
      break;
    }
    case '/favicon.ico': {
      res.end('我没有哦');
      break;
    }
    default: {
      let ext = path.extname(pathname);
      res.setHeader('Content-Type', MIME[ext]); // 通过请求的资源后缀名,来返回对应的 Content-type 的类型
      let rs = fs.createReadStream(`.${pathname}`);
      rs.pipe(res);
    }
  }
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

MIME.json 在这个文章的最底下, 或者可以自己找个更全的。

功能:写上传接口

接着写上传的接口,这里参考大圣老师的代码,写一个类来收集方法。

类定义如下,新建一个文件 controller.js

/**
 * module.exports 方法用于在服务器端导出模块,并以 CommonJS 格式提供。
 * - 参考:https://www.delftstack.com/zh/howto/node.js/create-and-export-classes/
 */
const multiparty = require('multiparty');
const fse = require('fs-extra');
class Controller {
  constructor(dirPath) {
    this.chunkPath = dirPath;
  }
  /**
   * multiparty 使用方法:https://www.npmjs.com/package/multiparty
   * - chunkFileDirPath 为关于文件 chunks 的文件夹路径,每个大文件根据文件名生成相关的文件夹
   * - 注意回调函数里的 this
   * @param {*} url
   * @param {*} path
   */
  async handleUpload(req, res) {
    const _this = this;
    const form = new multiparty.Form();
    form.parse(req, async function (err, fields, files) {
      if (err) {
        console.log(err);
        return;
      }
      const [chunk] = files.chunk;
      const [hash] = fields.hash;
      const [filename] = fields.filename;

      const chunkFileDirPath = `${_this.chunkPath}/${filename}`;
      if (!fse.existsSync(chunkFileDirPath)) {
        await fse.mkdirs(chunkFileDirPath);
      }
      await fse.move(chunk?.path, `${chunkFileDirPath}/${hash}`);
      res.end('收到文件 chunks');
    });
  }
}
module.exports = Controller;

然后在 server 里引入,再接口这里声明好。

// ... 
const Controller = require('./controller');
const UPLOAD_DIR = `${__dirname}/chunkPath`; // chunks 上传的文件夹
const controller = new Controller(UPLOAD_DIR);
// ... 
case '/upload': {
    await controller.handleUpload(req, res);
    break;
}

功能:写合并接口

合并的逻辑跟 Koa 几乎没什么差别,只不过我都把方法封装到类里了。

首先写路由

case '/merge': {
  await controller.handleMerge(req, res);
  break;
}

然后在类中定义合并的方法

/**
 * 功能:合并
 * - 1. handlePostData 用来处理 POST 传递的数据,具体怎么处理的请查看方法
 * - 2. 把各个文件的 path 先想清楚要存到哪儿,建议自己写一写。
 * - 我是把所有的 chunks 都放到大目录 chunkPath 中,
 * - 然后在用文件名新建文件夹,再把chunks放到子文件夹中。
 * @param {*} req
 * @param {*} res
 */
async handleMerge(req, res) {
  const postData = await handlePostData(req);
  const { fileName, size: eachChunkSize } = postData;
  const mergePath = `${__dirname}/merge`;
  const mergeFilePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(mergePath)) {
    fse.mkdirSync(mergePath);
  }
  const mergeOptions = { chunksPath: this.chunksPath, mergeFilePath, fileName, eachChunkSize };
  await handleMergeChunks(mergeOptions);
  console.log('Success Merge');
  res.end(
    JSON.stringify({
      code: 1,
      message: 'success merge',
    })
  );
}

这里的 POST 请求需要处理一下

/**
 * 功能:处理 POST 请求
 * - 与 GET 数据相比,POST 数据量大,需要分段。
 * - 通过 req.on('data', function(data) {}) 监听
 * - 当有一段数据到达的时候执行回调,回调函数参数 data 为每段达到的数据。
 * - 当数据全部到达时会触发 req.on('end', function() {}) 里面的回调函数。
 * - 可以通过 JSON.parse(str) 解析成我们想要的 POST 请求数据格式。
 * - 参考资料:https://juejin.cn/post/7142700338414518286#heading-2
 * @param {*} req
 * @returns
 */
function handlePostData(req) {
  return new Promise((resolve, reject) => {
    let allData = '';
    let i = 0;
    req.on('data', function (chunkData) {
      //   console.log(`第 ${++i} 次收到数据`);
      allData += chunkData;
    });
    req.on('end', function () {
      const POST_MESSAGE = JSON.parse(allData);
      resolve(POST_MESSAGE);
    });
  });
}

然后就是合并 chunk

/**
 * 功能:合并 chunks
 * - 1. 首先根据 fileChunksDir 拿到所有 chunks 的文件名
 * - 2. 然后拼接成 fileAllChunksPaths <Array> 数组,然后一一创建可写流
 * - 3. fileAllChunksPaths 注意这里需要排序一下,不然就是乱的,这也是我们创建可写流 srart 位置的基础
 * - 4. 然后这里通过 pipeStream 函数用 Promise 包装了一下可读流,代码需要慢慢读去理解。
 * - 5. 我们这里的 可写流们,是根据 chunks 的不同,定义好写入的文件 path,
 * - 以及每个块儿写的开始位置和写入大小,每个可写流都是不一样的!
 * -
 * @param {*} param0
 */
async function handleMergeChunks({ chunksPath, mergeFilePath, fileName, eachChunkSize }) {
  const fileChunksDir = `${chunksPath}/${fileName}`;
  const fileAllChunksPaths = await fse.readdir(fileChunksDir);
  console.log(fileAllChunksPaths);
  fileAllChunksPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);
  const promiseArray = fileAllChunksPaths.map((chunk, index, array) => {
    const eachChunkPath = `${fileChunksDir}/${chunk}`;
    const writeStream = fse.createWriteStream(mergeFilePath, {
      start: index * eachChunkSize,
    });
    return pipeStream(eachChunkPath, writeStream, array.length);
  });
  await Promise.all(promiseArray);
}

把另一个功能也拆分出来。

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 方便记忆:可读流 通过 管道 流入 可写流。    可读流  =======> 可写流
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream, length) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path);
    readStream.pipe(writeStream);
    readStream.on('end', function () {
      hasMergeChunk[path] = 'finished';
      fse.unlinkSync(path);
      resolve();
      console.log(
        `doing: No.${path.split('_')[1]} progress: [ ${Object.keys(hasMergeChunk).length} / ${length} ]`
      );
    });
  });
}

基本上也就完成啦!


参考文章

  1. 字节跳动面试官:请你实现一个大文件上传和断点续传
  2. 字节跳动面试官,我也实现了大文件上传和断点续传

Q & A

Q: 发送片段之后的合并可能出现错误

这个情况分析了一下是前端的锅啊,前端的 await Promise.all() 并不能保证后端的文件流都写完了。

在这里插入图片描述

Q: 进度条直接从 0 到了 100

我发现我的请求写错了

在这里插入图片描述

完整代码

前端

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
    <script src="request.js"></script>
    <style>
      #uploadCube {
        margin-top: 10px;
        /* width: 520px; */
        overflow: hidden;
      }

      .cube {
        width: 50px;
        height: 50px;
        background-color: #fff;
        float: left;
        border: 1px solid #000;
        .progress {
          height: 100%;
          line-height: 50px;
          text-align: center;
        }
        .uploading {
          background-color: #409eff;
        }
        .success {
          background-color: #51f400;
        }
        .error {
          background-color: #ff9090;
        }
      }
    </style>
  </head>
  <body>
    <input type="file" id="file" />
    <button id="uploadButton">点击上传</button>
    <!-- <button id="mergeButton">点击合并</button> -->
    <div id="uploadCube"></div>
  </body>

  <script>
    /**
     * 默认切片大小
     */
    const SIZE = 10 * 1024 * 1024;

    /**
     * 功能:生成切片
     */
    function handleCreateChunk(file, size = SIZE) {
      const fileChunkList = [];
      let cur = 0;
      while (cur < file.size) {
        fileChunkList.push({
          file: file.slice(cur, cur + size),
        });
        progressData.push({ presentage: 0 });
        cur += size;
      }
      return fileChunkList;
    }

    /**
     * 功能:生成页面进度的 HTML
     */
    function handleUpdateHTML(progressData) {
      let uploadCube = document.querySelector('#uploadCube');
      let html = '';
      progressData.forEach((item) => {
        const { presentage } = item;
        let className = '';
        if (presentage < 100) {
          className = 'progress uploading';
        } else if (presentage == 100) {
          className = 'progress success';
        }
        html += ` <div class="cube">
        <div class="${className}" style="width: ${presentage}%">${presentage}%</div>
      </div>`;
      });
      uploadCube.innerHTML = html;
    }

    /**
     * 功能:处理每个 chunk 的 xhr.upload.onprogress,拿到各个 chunk 的上传进度
     * - 1. 同时通过 handleUpdateHTML 更新进度页面
     * - 2. progressData 用来记录各个 chunk 的进度
     */
    let progressData = [];
    function handleCreateOnProgress(data) {
      return (e) => {
        data.presentage = ((e.loaded / e.total) * 100).toFixed(2);
        console.log(JSON.stringify(progressData));
        handleUpdateHTML(progressData);
      };
    }

    /**
     * 功能: 上传切片
     * - 注意 map 里别忘了写 return
     */
    async function uploadChunks(hanldleData, fileName) {
      const requestList = hanldleData
        .map(({ chunk, hash, index }) => {
          const formData = new FormData();
          formData.append('chunk', chunk);
          formData.append('hash', hash);
          formData.append('filename', fileName);
          return { formData, index };
        })
        .map(({ formData, index }) => {
          return request({
            url: 'upload',
            data: formData,
            onprogress: handleCreateOnProgress(progressData[index]),
          });
        });
      await Promise.all(requestList).then((res) => {
        console.log('所有上传结束', res);
      });
      console.log('发送合并请求');
      await request({
        url: 'merge',
        headers: {
          'content-type': 'application/json',
        },
        data: JSON.stringify({
          size: SIZE,
          fileName,
        }),
      });
    }

    document.getElementById('uploadButton').onclick = async function () {
      // 切片
      const file = document.getElementById('file').files[0];
      const fileName = file.name;
      const fileChunkList = handleCreateChunk(file);
      // 包装
      const hanldleData = fileChunkList.map(({ file }, index) => {
        return {
          chunk: file,
          hash: `${fileName}_${index}`,
          index,
        };
      });
      await uploadChunks(hanldleData, fileName);
    };
  </script>
</html>

后端 Koa

const Koa = require('koa');
const Views = require('koa-views');
const Router = require('koa-router');
const Static = require('koa-static');
const { koaBody } = require('koa-body');
const fse = require('fs-extra');

const app = new Koa();
const router = new Router();
app.use(Views(__dirname));
app.use(Static(__dirname));
app.use(
  koaBody({
    multipart: true,
    formidable: {
      maxFileSize: 1000 * 1024 * 1024,
    },
  })
);

router.get('/', async (ctx) => {
  await ctx.render('index.html');
});

/**
 * 功能:上传接口
 * - 从 ctx.request.body 中获取 hash 以及 filename
 * - 从 ctx.request.files 中拿到分片数据
 * - 然后再把 node 帮我们临时存放的 chunk 文件的 filepath 拿到,之后移动到我们想要存放的路径下
 * - filepath 和 hash 是一一对应的关系
 */
router.post('/upload', async (ctx) => {
  const { hash, filename } = ctx.request.body;
  const { filepath } = ctx.request.files?.chunk;
  const chunkPath = `${__dirname}/chunkPath/${filename}`;
  if (!fse.existsSync(chunkPath)) {
    await fse.mkdirs(chunkPath);
  }
  await fse.move(filepath, `${chunkPath}/${hash}`);
  ctx.body = {
    code: 1,
  };
});

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path); // 可读流
    readStream.pipe(writeStream); // 可读流通过管道流向可写流
    readStream.on('end', () => {
      hasMergeChunk[path] = 'finish';
      fse.unlinkSync(path); // 删除此文件
      resolve();
      console.log(`合并 No.${path.split('_')[1]}, 已经合并${Object.keys(hasMergeChunk).length}`);
    });
  });
}

/**
 * 功能:合并 Chunk
 * - 1. chunkDir: 是 chunks 文件们所在的文件夹的路径
 * - 2. chunkPaths: 是个 Array,数组中包含所有的 chunk 的 path
 * - 3. 因为 每个 chunk 的 path 命名是通过 hash 组成的,所以我们先排序一下,
 * - 算是为 createWriteStream 中的 start 做准备
 * - 4. 为每个 chunk 的 path 创建写入流,写到 mergePath 这个路径下。因为已经
 * - 排序了,所以 start 就是每个文件的 index * eachChunkSize
 * - 5. 每个写入流都用 Promise 包装了一下,然后用 await Promise.all() 等待处理完
 * @param {*} mergePath
 * @param {*} name
 * @param {*} eachChunkSize
 */
async function mergeChunk(mergePath, name, eachChunkSize) {
  const chunkDir = `${__dirname}/chunkPath/${name}`;
  const chunkPaths = await fse.readdir(chunkDir);
  chunkPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);

  await Promise.all(
    chunkPaths.map((chunk, index) => {
      const eachChunkPath = `${chunkDir}/${chunk}`;
      // 创建输入流,并为每个 chunk 定好位置
      const writeStream = fse.createWriteStream(mergePath, {
        start: index * eachChunkSize,
      });
      return pipeStream(eachChunkPath, writeStream);
    })
  );
  console.log('合并完成');
  fse.rmdirSync(chunkDir);
  console.log(`删除 ${chunkDir} 文件夹`);
}

/**
 * 功能: merge 接口
 * - hasMergeChunk 变量是上面用来记录的
 * - mergePath 定义一下合并后的文件的路径
 */
router.post('/merge', async (ctx) => {
  // console.log(ctx.request.body);
  const { fileName, size } = ctx.request.body;
  hasMergeChunk = {};
  const mergePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(`${__dirname}/merge`)) {
    fse.mkdirSync(`${__dirname}/merge`);
  }
  await mergeChunk(mergePath, fileName, size);
  ctx.body = {
    data: '成功',
  };
});

app.use(router.routes());
app.listen(3000, () => {
  console.log(`server start: http://localhost:3000`);
});

request.js 的封装

/**
 * 功能:封装请求
 * - 1. xhr.upload.onprogress 注意不要拉下 upload
 * @param {*} param0
 * @returns
 */
function request({ url, method = 'post', data, headers = {}, onprogress = (e) => e, requestList }) {
  return new Promise((resolve, reject) => {
    let xhr = new XMLHttpRequest();
    xhr.open(method, url);
    Object.keys(headers).forEach((item) => {
      xhr.setRequestHeader(item, headers[item]);
    });
    xhr.upload.onprogress = onprogress;
    xhr.onloadend = function (e) {
      resolve({
        data: e.target.response,
      });
    };
    xhr.send(data);
  });
}

后端原生

主服务

const http = require('http');
const server = http.createServer();
const url = require('url');
const fs = require('fs');
const path = require('path');
const MIME = require('./mime.json');

const Controller = require('./controller');
const UPLOAD_DIR = `${__dirname}/chunkPath`; // chunks 上传的文件夹
const controller = new Controller(UPLOAD_DIR);

server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  res.setHeader('content-type', 'text/html;charset=utf-8');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }
  let { pathname } = url.parse(req.url); // 解析一下 url,因为 req.url 可能会带一些参数
  // console.log('req.url:>>', req.url); console.log('url.parse(req.url):>>', url.parse(req.url));
  console.log(`进入${pathname}`);
  switch (pathname) {
    case '/':
    case '/index': {
      let rs = fs.createReadStream('./index.html');
      rs.pipe(res);
      break;
    }
    case '/upload': {
      await controller.handleUpload(req, res);
      break;
    }
    case '/merge': {
      await controller.handleMerge(req, res);
      break;
    }
    case '/favicon.ico': {
      res.end('我没有哦');
      break;
    }
    default: {
      let ext = path.extname(pathname);
      res.setHeader('Content-Type', MIME[ext]); // 通过请求的资源后缀名,来返回对应的 Content-type 的类型
      let rs = fs.createReadStream(`.${pathname}`);
      rs.pipe(res);
    }
  }
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

/**
 * module.expothis.chunksPathrts 方法用于在服务器端导出模块,并以 CommonJS 格式提供。
 * - 参考:https://www.delftstack.com/zh/howto/node.js/create-and-export-classes/
 */
const multiparty = require('multiparty');
const fse = require('fs-extra');
const { handlePostData, handleMergeChunks } = require('./tools');
class Controller {
  constructor(dirPath) {
    this.chunksPath = dirPath;
  }

  /**
   * 功能:合并
   * - 1. handlePostData 用来处理 POST 传递的数据,具体怎么处理的请查看方法
   * - 2. 把各个文件的 path 先想清楚要存到哪儿,建议自己写一写。
   * - 我是把所有的 chunks 都放到大目录 chunkPath 中,
   * - 然后在用文件名新建文件夹,再把chunks放到子文件夹中。
   * @param {*} req
   * @param {*} res
   */
  async handleMerge(req, res) {
    const postData = await handlePostData(req);
    const { fileName, size: eachChunkSize } = postData;
    const mergePath = `${__dirname}/merge`;
    const mergeFilePath = `${__dirname}/merge/${fileName}`;
    if (!fse.existsSync(mergePath)) {
      fse.mkdirSync(mergePath);
    }
    const mergeOptions = { chunksPath: this.chunksPath, mergeFilePath, fileName, eachChunkSize };
    await handleMergeChunks(mergeOptions);
    console.log('Success Merge');
    res.end(
      JSON.stringify({
        code: 1,
        message: 'success merge',
      })
    );
  }

  /**
   * multiparty 使用方法:https://www.npmjs.com/package/multiparty
   * - chunkFileDirPath 为关于文件 chunks 的文件夹路径,每个大文件根据文件名生成相关的文件夹
   * - 注意回调函数里的 this
   * @param {*} url
   * @param {*} path
   */
  async handleUpload(req, res) {
    const _this = this;
    const form = new multiparty.Form();
    form.parse(req, async function (err, fields, files) {
      if (err) {
        console.log(err);
        return;
      }
      const [chunk] = files.chunk;
      const [hash] = fields.hash;
      const [filename] = fields.filename;

      const chunkFileDirPath = `${_this.chunksPath}/${filename}`;
      if (!fse.existsSync(chunkFileDirPath)) {
        await fse.mkdirs(chunkFileDirPath);
      }
      await fse.move(chunk?.path, `${chunkFileDirPath}/${hash}`);
      res.end('收到文件 chunks');
    });
  }
}
module.exports = Controller;

工具函数

/**
 * 学习:__dirname 就是跟文件一起的,不会因为引用关系而恒定
 */
// console.log(__dirname);
const fse = require('fs-extra');
/**
 * 功能:处理 POST 请求
 * - 与 GET 数据相比,POST 数据量大,需要分段。
 * - 通过 req.on('data', function(data) {}) 监听
 * - 当有一段数据到达的时候执行回调,回调函数参数 data 为每段达到的数据。
 * - 当数据全部到达时会触发 req.on('end', function() {}) 里面的回调函数。
 * - 可以通过 JSON.parse(str) 解析成我们想要的 POST 请求数据格式。
 * - 参考资料:https://juejin.cn/post/7142700338414518286#heading-2
 * @param {*} req
 * @returns
 */
function handlePostData(req) {
  return new Promise((resolve, reject) => {
    let allData = '';
    let i = 0;
    req.on('data', function (chunkData) {
      //   console.log(`第 ${++i} 次收到数据`);
      allData += chunkData;
    });
    req.on('end', function () {
      const POST_MESSAGE = JSON.parse(allData);
      resolve(POST_MESSAGE);
    });
  });
}

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 方便记忆:可读流 通过 管道 流入 可写流。    可读流  =======> 可写流
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream, length) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path);
    readStream.pipe(writeStream);
    readStream.on('end', function () {
      hasMergeChunk[path] = 'finished';
      fse.unlinkSync(path);
      resolve();
      console.log(
        `doing: No.${path.split('_')[1]} progress: [ ${Object.keys(hasMergeChunk).length} / ${length} ]`
      );
    });
  });
}

/**
 * 功能:合并 chunks
 * - 1. 首先根据 fileChunksDir 拿到所有 chunks 的文件名
 * - 2. 然后拼接成 fileAllChunksPaths <Array> 数组,然后一一创建可写流
 * - 3. fileAllChunksPaths 注意这里需要排序一下,不然就是乱的,这也是我们创建可写流 srart 位置的基础
 * - 4. 然后这里通过 pipeStream 函数用 Promise 包装了一下可读流,代码需要慢慢读去理解。
 * - 5. 我们这里的 可写流们,是根据 chunks 的不同,定义好写入的文件 path,
 * - 以及每个块儿写的开始位置和写入大小,每个可写流都是不一样的!
 * -
 * @param {*} param0
 */
async function handleMergeChunks({ chunksPath, mergeFilePath, fileName, eachChunkSize }) {
  const fileChunksDir = `${chunksPath}/${fileName}`;
  const fileAllChunksPaths = await fse.readdir(fileChunksDir);
  console.log(fileAllChunksPaths);
  fileAllChunksPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);
  const promiseArray = fileAllChunksPaths.map((chunk, index, array) => {
    const eachChunkPath = `${fileChunksDir}/${chunk}`;
    const writeStream = fse.createWriteStream(mergeFilePath, {
      start: index * eachChunkSize,
    });
    return pipeStream(eachChunkPath, writeStream, array.length);
  });
  await Promise.all(promiseArray);
}

module.exports = {
  handlePostData,
  handleMergeChunks,
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/215726.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网工学习7-配置 GVRP 协议

7.1GARP概述 GARP(Generic Attribute Registration Protocol)是通用属性注册协议的应用&#xff0c;提供 802.1Q 兼容的 VLAN 裁剪 VLAN pruning 功能和在 802.1Q 干线端口 trunk port 上建立动态 VLAN 的功能。 GARP 作为一个属性注册协议的载体&#xff0c;可以用来传播属性…

(1w字一篇理解透Unsafe类)Java魔法类:Unsafe详解

Java魔法类 Unsafe 文章导读&#xff1a;(约12015字&#xff0c;阅读时间大约1小时)1. Unsafe介绍2. Unsafe创建3. Unsafe功能3.1内存操作3.2 内存屏障3.3 对象操作3.4 数组操作3.5 CAS操作3.6 线程调度3.7 Class操作3.8 系统信息 4. 总结 JUC源码中的并发工具类出现过很多次 …

外包干了4年,技术退步太明显了。。。。。

先说一下自己的情况&#xff0c;本科生生&#xff0c;18年通过校招进入武汉某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年国庆&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测…

netcore swagger 错误 Failed to load API definition

后端接口报错如下&#xff1a; 前端nswag报错如下&#xff1a; 根据网上查询到的资料说明&#xff0c;说一般swagger这种错误都是控制器里有接口代码异常造成的&#xff0c;通常是接口没有加属性Attribute&#xff0c; 比如[HttpPost("Delete")]、[HttpGet("Del…

基于ssm的疫苗预约系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于ssm的疫苗预约系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring Spri…

从这7点重构品牌企业的业务中台系统|徐礼昭

文&#xff5c;徐礼昭 &#xff08;商派市场负责人&#xff0c;RRL重构零售实验室负责人&#xff09; 重构或者升级企业的数字化系统究竟有多重要&#xff1f; 笔者列举两个简单的数字化项目案例&#xff0c;数据仅供大家参考—— &#xff08;1&#xff09;某连锁企业线上云店…

【OpenCV】计算机视觉图像处理基础知识

目录 前言 推荐 1、OpenCV礼帽操作和黑帽操作 2、Sobel算子理论基础及实际操作 3、Scharr算子简介及相关操作 4、Sobel算子和Scharr算子的比较 5、laplacian算子简介及相关操作 6、Canny边缘检测的原理 6.1 去噪 6.2 梯度运算 6.3 非极大值抑制 6.4 滞后阈值 7、Ca…

mockito加junit实现单元测试笔记

目录 一、简介1.1 单元测试的特点1.2 mock类框架使用场景1.3 常用mock类框架1.3.1 mockito1.3.2 easymock1.3.3 powermock1.3.4 JMockit 二、mockito的单独使用2.1 mock对象与spy对象2.2 初始化mock/spy对象的方式初始化mock/spy对象第1种方式初始化mock/spy对象第2种方式初始化…

数据“表”的增删改查

创建数据表 删除数据表 修改数据表 查看数据表 喜欢点赞收藏&#xff0c;如有疑问&#xff0c;点击链接加入群聊【信创技术交流群】&#xff1a;http://qm.qq.com/cgi-bin/qm/qr?_wv1027&kEjDhISXNgJlMMemn85viUFgIqzkDY3OC&authKey2SKLwlmvTpbqlaQtJ%2FtFXJgHVgl…

全球与中国HDPE管道市场:增长趋势、竞争格局与前景展望

快速成长的人口、快速的经济成长和工业发展增加了对可靠供水系统的需求。工业需要为制造流程、冷却系统和卫生目的提供可靠的水供应。随着国家的发展&#xff0c;它们更加重视基础设施&#xff0c;包括供水系统&#xff0c;以支持工业成长。HDPE管道广泛应用于饮用水和灌溉的配…

LeetCode 1038. 从二叉搜索树到更大和树:(反)中序遍历

【LetMeFly】1038.从二叉搜索树到更大和树&#xff1a;&#xff08;反&#xff09;中序遍历 力扣题目链接&#xff1a;https://leetcode.cn/problems/binary-search-tree-to-greater-sum-tree/ 给定一个二叉搜索树 root (BST)&#xff0c;请将它的每个节点的值替换成树中大于…

虹科技术 | BabyLIN产品如何轻松搞定K线协议实现?

概述&#xff1a;为了实现K线通信&#xff0c;SDF-V3在协议部分中定义了新的协议类型KLine Raw。所有能够运行SDF-V3文件&#xff08;LinWorks版本在V.2.29.4以上&#xff09;并使用最新的固件&#xff08;固件版本在V.6.18以上&#xff09;的BabyLIN设备都可以执行KLine Raw协…

【23-24 秋学期】NNDL 作业12 优化算法2D可视化

简要介绍图中的优化算法&#xff0c;编程实现并2D可视化 1. 被优化函数 2. 被优化函数 3. 分析各个算法的优缺点 REF&#xff1a;图灵社区-图书 (ituring.com.cn) 深度学习入门&#xff1a;基于Python的理论与实现 NNDL 作业11&#xff1a;优化算法比较_"ptimizers[…

MYSQL报错 [ERROR] InnoDB: Unable to create temporary file; errno: 0

起因 服务器的mysql不支持远程访问&#xff0c;在修改完相关配置后重启服务出错。 2023-12-03T10:12:23.895459Z 0 [Note] C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe (mysqld 5.7.22-log) starting as process 15684 ... 2023-12-03T10:12:23.908886Z 0 [Note…

YOLOv8独家原创改进:创新自研CPMS注意力,多尺度通道注意力具+多尺度深度可分离卷积空间注意力,全面升级CBAM

💡💡💡本文自研创新改进:自研CPMS, 多尺度通道注意力具+多尺度深度可分离卷积空间注意力,全面升级CBAM 1)作为注意力CPMS使用; 推荐指数:五星 CPMS | 亲测在多个数据集能够实现涨点,对标CBAM。 收录 YOLOv8原创自研 https://blog.csdn.net/m0_63774211/ca…

内存是如何工作的

一、什么是内存 从外观上辨识&#xff0c;它就是内存条&#xff1b;从硬件上讲&#xff0c;它叫RAM&#xff0c;翻译过来叫随机存储器。英文全称&#xff1a;Random Access Memory。它也叫主存&#xff0c;是与CPU直接交换数据的内部存储器。其特点是读写速度快&#xff0c;不…

一文搞懂系列——动态库的加载方式及应用场景

引文 我们在工作中经常会遇到动态库链接的问题&#xff0c;因为正常的方式并不能满足我们的场景。常见的问题可以总结如下&#xff1a; 系统路径默认路径、usr/lib、/lib 目录&#xff0c;不会集成第三方动态库。 同名动态库可能在多个路径中存在。 针对不同的场景&#xff0…

替代AMS1117-ADJ可调输出线性稳压器(LDO)

1、概 述 PC1117-ADJ/1.2/1.5/1.8/2.5/2.85/3.3/5是最大输出电流为1A的低压降正向稳压器&#xff0c;其中 PC1117-ADJ是可调输出电压版&#xff0c;只需要两个外接电阻即可实现输出电压在1.25V~13.8V范围内的调节&#xff0c;而PC1117-1.2/1.5/1.8/2.5/2.85/3.3/5是固定输出1.…

【陈老板赠书活动 - 19期】-2023年以就业为目的学习Java还有必要吗?

陈老老老板&#x1f9b8; &#x1f468;‍&#x1f4bb;本文专栏&#xff1a;赠书活动专栏&#xff08;为大家争取的福利&#xff0c;免费送书&#xff09; &#x1f468;‍&#x1f4bb;本文简述&#xff1a;生活就像海洋,只有意志坚强的人,才能到达彼岸。 &#x1f468;‍&am…

vector向量详解,小白快速入门

1.vector是什么 vector名为向量&#xff0c;其实就是一个长度可变的数组 是连续的顺序的储存结构&#xff08;和数组一样的类别&#xff09;&#xff0c;但是有长度可变的特性。 2.vector的初始化 vector<int> v; 一维可变数组&#xff0c;类型为int&#xff0c;名称…