
背景:
前端文件上传是非常普遍的功能,当需要上传大文件时会有以下问题。
1.前后端上传时间限制,一次性传输大小限制。
2.网络抖动等,失败后需要重新上传。
3.http1.1版本, TCP只有传送一个请求
4.无进度条,用户体验极差
主要步骤:

前端:
加载文件 ➡️ 分片 ➡️ 上传
node.js:
解析文件 ➡️ 存放文件碎片 ➡️ 合并文件
比如重庆市向上海市订购了一批高铁列车,如果一次性运过来不太现场,没有那么大的船,还有就是一次性运过来,如果路上出事故,需要重新发送一批了,损失严重。
所以我们准备分批发送。
1.浏览器加载文件
1 2
| <input id="file" type="file" onchange="uploadFile()"id="upload" /> <input type="button" id="upload" value="文件上传" class="btn btn-warning" onclick="handleUpload()" />
|
这一步主要是把文件读取到内存里。
document.getElementById(‘file’).files 是 FileList类型。
document.getElementById(‘file’).files[0] 是File类型的包装器。
File FileList FileReader关系:
FileReader只能读取 File或者 blob对象,File对象是FileList
的子集,constructor == Blob, 有slice方法。
2.上传文件方式选择
文件上传采用 formData形式,而不是json。原因json传参需要JSON. stringify序列化
比如一下代码:
1 2 3 4 5 6 7 8 9 10 11
| var xhr = new XMLHttpRequest(); xhr.open('post','http://localhost:3000/ajaxpost'); xhr.setReuqestHeader('Content-Type','application/json'); var params = JSON.stringify({ city: '重庆', spcial: '山城' }) xhr.send(params); xhr.onload = function () { console.log(xhr.responseText); }
|
在序列化过程中,会抹掉一些比如 function File blob的对象,所以采用formData形式进行文件上传。
3.上传
3.1直接上传
1 2 3 4 5 6 7 8 9 10 11 12 13
| const postAjax = (url,fd) => { const xhr = new XMLHttpRequest(); return new Promise((resolve, reject) => { xhr.open('POST', url, true); xhr.onreadystatechange = function() { if (xhr.readyState == 4 && xhr.status == 200) { console.log(xhr.responseText, "responseText" ) resolve(xhr.responseText) } }; xhr.send(fd); }) }
|
1 2 3 4 5 6 7 8 9 10 11
| const url = "http://127.0.0.1:1000/file/uploading" function uploadFile() { const file = document.getElementById('file').files[0]; blockUpload(file) } const blockUpload = (file) => { const fd = new FormData(); fd.append("file", file); fd.append("fileName", file.name) postAjax(url, fd); }
|
3.2 分片上传
File对象可以使用,slice + File.size,对文件进行切割,切割后的chunk实际上是浏览器对象Blob。
1 2 3 4 5 6 7 8 9 10 11
| const url = "http://127.0.0.1:1000/file/uploading" const mergrUrl = "http://127.0.0.1:1000/file/mergrChunk" const handleUpload = () => { $("#file").click(); }
function uploadFile() { const file = document.getElementById('file').files[0]; chunkedUpload(file) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| const chunkedUpload = async (file) => { const chunkSize = 1024; for (let start = 0; start <= file.size; start += chunkSize) { const chunk = file.slice(start, start + chunkSize); // 分片 blob对象 const fd = new FormData(); fd.append("chunk", chunk); fd.append("hash", start); fd.append("fileName", file.name) // 上传 利用async实现,同步请求 let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) { per = 100; } await postAjax(url, fd).then(res => { $('#bar').css({'width': per + "%",}); $('#bar').html(per + '%'); }) }
|
此时我们会等待一条船到达重庆,再让下一条船出发,河里同时只有一条船通行,就是说分片请求会等待上一个完成。

3.3 并发上传
为了利用浏览器的并发能力,把请求分批发送,每次并发11个,node.js同一个IP最多可以异步处理11个请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| const chunkedUpload = async (file) => { ++ const chunkSize = 1024; ++ let postQueue = []; ++ const parallelNum = 11; for (let start = 0; start <= file.size; start += chunkSize) { const chunk = file.slice(start, start + chunkSize); const fd = new FormData(); fd.append("chunk", chunk); + fd.append("hash", start); fd.append("fileName", file.name)
+ let per = Math.floor(100 * start / file.size ); + if ((file.size - start) < chunkSize) { + per = 100; + } + - await postAjax(url, fd).then(res => { - }) + if (postQueue.length < parallelNum) { + postQueue.push(postAjax(url, fd)) + }
+ if (postQueue.length >= parallelNum || per === 100) { + + await Promise.all(postQueue).then(res => { $('#bar').css({'width': per + "%",}); $('#bar').html(per + '%'); + postQueue = []; + }).catch(err => { + console.error(err) + }) + } + } };
|
此时,我们可以同时发出11条船,等这11条到达重庆,开始下一轮,重新发送11条船,这样就能缩短运输时间啦。

3.4 any版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ... if (postQueue.length < parallelNum) { - postQueue.push(postAjax(url, fd)) + postQueue.push({post: (postAjax(url, fd)), hash: start} ) } let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) { per = 100; } if (postQueue.length >= parallelNum || per === 100) { + const postApiQueues = postQueue.map(item => item.post) await Promise.any(postApiQueues).then(res => { + let hash = res.hash + const index = postQueue.find(item => item.hash = hash) + postQueue.splice(index, 1) - postQueue = []; $('#bar').css({'width': per + "%",}); $('#bar').html(per + '%'); if (per >= 100) { postAjax(mergrUrl, fd).then(res => { }) } }).catch(err => { console.error(err) }) } ...
|
把以上代码Promise.all 改成 Promise.any
这样等任何一条船到达重启,我们就可以开始马上让一艘船发货。

4.文件接收
4.1 node.js 接收文件流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| const express = require("express"); const app = express(); app.use(express.static("public")); const multiparty = require("multiparty"); const fs = require("fs-extra");
const path = require("path"); const UPLOAD_DIR = path.resolve(__dirname);
app.get("/", (req, res) => { res.sendFile(`${__dirname}/index.html`); }); let FILE_NAME = ""; let chunkDir = "";
app.post("/file/uploading", (req, res, next) => { var form = new multiparty.Form(); form.parse(req, async (err, fields, files) => { if(err) return; const [chunk] = files.chunk; const [hash] = fields.hash; const [fileName] = fields.fileName; FILE_NAME = fileName; chunkDir = path.resolve(UPLOAD_DIR, "fileSteam/fchunkDir" + fileName);
if (!fs.existsSync(chunkDir)) { await fs.mkdirs(chunkDir); }
await fs.move(chunk.path, `${chunkDir}/${hash}`);
res.writeHead(200, { "content-type": "text/plain;charset=utf-8" }); res.write("200"); res.end(); }); });
app.use(express.static("public")).listen(1000);
|
上面的app.js 解析文件,然后临时存放在 chunkDir+文件名的文件夹下

相当于把高铁车的所有零件放入一个独立的仓库,仓库的名字就是高铁的名字,比如复兴号。

4.2 node.js 合并文件流 生成文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| app.post("/file/uploading", (req, res, next) => { ...... });
+ const stream = require("./writeStream");
+ app.post("/file/mergrChunk", async (req, res, next) => { + FILE_NAME = path.resolve(UPLOAD_DIR, "fileSteam/" + FILE_NAME); + console.log(FILE_NAME, "========================"); + let dests = fs.readdirSync(chunkDir); + dests = dests.sort((a, b) => a - b); + await stream.WriteStreamsAsync(dests, FILE_NAME, chunkDir); + await fs.removeSync(chunkDir); + res.write("200"); + res.end(); });
app.use(express.static("public")).listen(1000);
|
前端文件传送完成,向后端发送一个合并请求,合并前把文件排序一下,文件合并操作在writeStream.js中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| const fs = require("fs"); const path = require("path");
const WriteStreamsAsync = async (dests, FILE_NAME, chunkDir) => { let writeable = fs.createWriteStream(FILE_NAME); for (let i = 0; i < dests.length; i++) { await write(dests[i], writeable, chunkDir); } };
const write = (item, writeable, chunkDir) => { return new Promise((resolve, reject) => { let destPath = path.resolve(__dirname, chunkDir + '/' + item); let readable = fs.createReadStream(destPath); readable.pipe(writeable, { end: false }); readable.on("end", () => { resolve(); }); }); };
module.exports = { WriteStreamsAsync };
|
利用 fs. createReadStream fs. createWriteStream 文件流api合并文件切片,生成文件,大文件上传完成。
这一步相当于把高铁组装起来,复原了。

断点续传
断点续传可以,在文件中断后继续上次的传输节点,继续上传。
在网页刷新后,把上传的节点存储到localStorage中,下次上传从localStorage查找是否有这个文件的节点存在,如果有从这个节点上传,如果没有,重新上传。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| const postAjax = (url,fd) => { const xhr = new XMLHttpRequest(); return new Promise((resolve, reject) => { xhr.open('POST', url, true); xhr.onreadystatechange = function() { if (xhr.readyState == 4 && xhr.status == 200) { const res = JSON.parse(xhr.responseText) + if (res.hash) { + window.localStorage.setItem(fileName, res.hash); + } resolve(res) } }; xhr.send(fd); }) }
|
1 2 3 4 5 6
| function uploadFile() { const file = document.getElementById('file').files[0]; + let fileName = window.localStorage.getItem('fileName'); + const pointHash = window.localStorage.getItem(fileName) || 0; chunkedUpload(file, +pointHash) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| const chunkedUpload = async (file, pointHash) => { const chunkSize = 1024 * 10; let postQueue = []; const parallelNum = 25; for (let start = pointHash; start <= file.size; start += chunkSize) { const chunk = file.slice(start, start + chunkSize); const fd = new FormData(); fd.append("chunk", chunk); fd.append("hash", start); fd.append("fileName", file.name) + window.localStorage.setItem('fileName', file.name); if (postQueue.length < parallelNum) { postQueue.push({post: (postAjax(url, fd)), hash: start} ) } let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) { per = 100; } if (postQueue.length >= parallelNum || per === 100) {
const postApiQueues = postQueue.map(item => item.post) await Promise.any(postApiQueues).then(res => { let hash = res.hash const index = postQueue.find(item => item.hash = hash) postQueue.splice(index, 1) $('#bar').css({'width': per + "%",}); $('#bar').html(per + '%'); if (per >= 100) { postAjax(mergrUrl, fd).then(res => { + let fileName = window.localStorage.getItem('fileName'); + window.localStorage.removeItem(fileName); + window.localStorage.removeItem('fileName'); }) } }).catch(err => { console.error(err) }) } } };
|
速度对比:
为了便于观测我们先把网络设置成fast3G
这样能保证带宽不会影响传输速度
1.promise.all并行版

并行上传时间: 32.95S
2.any版上传

any上传时间: 25.96S
3.await排队版

排队上传时间: 1.5min
4.直传版

上传时间: 18.88s
可以看出,在传送速度上。
直传版(18.88s) > any版(25.96s) > 并行版(32.95s) > 排队版(1.5min)
结论
1.TCP建立请求,关闭请求是非常费时间的。
2.并行请求速度是排队上传快很多,这个方式是可行的。
git代码地址