背景

最近在写图片上传功能,使用antd提供的upload组件,并使用beforeUpload函数拦截,再批量读进浏览器之后手动上传。

第一次实现

传输使用的协议是http2,如果不做处理,并发传输的数量没有限制。第一次的实现使用Promise.all,并发传输,测试时,使用单张16.7M的图片,同时上传20张,会给后端带来不小的压力,多次测试中都会有部分图片上传失败,返回502或504。

使用Promise.all的实现方式带来的问题是:

  1. 没有并发的数量限制,会占用大量带宽,同时给后端带来压力;
  2. Promise.all的执行方式是全部成功才算成功,有一个失败即算失败,就会将这个失败的结果返回,不管其他的项是否完成,这带来的问题是每一个失败的err没有办法单独隔离处理,导致没有办法进行后续操作,比如单独重新上传,也就是说,使用Promise.all没有办法单独对每一项进行操作,限制了灵活度。

优化的上传策略

改进的实现使用下面这种传输策略:

创建比如3个transmitter,点击发送按钮时,三个transmitter并发发出一张图片,哪个transmitter完成自己的发送任务,就去任务队列中拿下一张图片发送,直至全部图片发送完成。

下面是传输策略的实现。

优化代码

  1. 第一版

下面这版代码有个缺点,在所有文件处理完成(可能的情况有全部完成,部分完成,全部失败),没有回调函数,由于业务需求,修改代码到第二版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Transmition {
constructor(num, data, func, handleSuccess, handleErr) {
this.num = num; // 这个属性用来控制transmitter的数量
this.data = data; // 这个属性是数组类型,保存任务队列
this.func = func; // 这个函数用来实际处理异步任务
this.index = -1; // 这个属性用来存储当前的任务指针
this.handleSuccess = handleSuccess; // 所有任务处理完成的回调函数
this.len = data.length; // 任务的数量
this.handleErr = handleErr; // 任务失败的处理函数
}

// 这个函数是用来实际执行任务的异步处理
// 最开始写进了transmit函数中
// 但要写闭包保存index
// 为了更清晰,单独拿出来写成一个函数
executeTask = async (index, data) => {
const { func } = this;
try {
await func(data, index);
this.handleSuccess();
} catch (e) {
this.handleErr(e, data[index], index);
}
}

// 这个函数表示的是单个transmitter的执行情况
// await在这里很重要
transmit = async () => {
const { data } = this;
let currentData = data[++this.index];
while (currentData) {
await this.executeTask(this.index, currentData);
currentData = data[++this.index];
}
}

// 根据num创建相应数量的transmitter
// 并发启动
createTransmitter = () => {
const { num } = this;
for (let i = 0; i < num; i++) {
this.transmit();
}
}
}

export default Transmition;

注:transmit中的await关键字很重要,如果没写,会导致下图中的情景。

多核

  1. 第二版

为了增加上述的回调函数,一个难点:这里有三个执行队列A,B,C,当执行到最后三个文件的时候,假设A执行完毕且成功了,B和C还在等待回调中,这个时候如果仅用文件队列中没有文件了来做判断条件执行最后的handleEnd(),就可能导致handleEnd()函数执行完,B和C的回调函数才开始执行,带来意想不到的结果

有两种方法避免上面的问题:

第一种:在func函数外定义一个数组endPoint,在func函数中的onError和onSuccess函数中,将异步执行的过程封装成Promise推入endPoint中,将endPoint传入Transmition函数中,在try…catch…finally的finally中执行下面的检查:

1
2
3
4
5
6
7
this.timeId = setInterval(async () => {
if (this.endPoint.length === this.len) {
await Promise.all(this.endPoint);
this.handleEnd();
clearInterval(this.timeId);
}
}, 100);

即可避免上面的问题,但这样会有一个持续的轮寻,可能会带来代码执行上的堵塞和性能问题。

第二种:记录每个队列执行的当前任务,当满足队列中没有剩余文件的时候,await Promise.all(currentTask())来保证所有的异步过程都已经执行完毕。其他细节详见代码注释。

代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Transmition {
constructor(num, data, func, handleSuccess, handleErr, handleEnd) {
this.num = num;
this.data = data;
this.func = func;
this.index = -1;
this.handleSuccess = handleSuccess;
this.len = data.length;
this.handleErr = handleErr;
this.handleEnd = handleEnd;
this.currentTask = [];
}

// 这个函数拆出来是十分有必要的
// 写成这种形式,就把图片的上传过程单独封装了
// 上传成功也好,失败也好都封装在自己的过程里
// 对于整个对列的上传过程而言都是经历了上传并且已经结束
handleTask = async (data, index)=> {
const { func } = this;
try {
const response = await func(data, index);
this.handleSuccess(response);
} catch(e) {
this.handleErr(e, data[index], index);
}
}

// 这里保存当前队列执行的上传任务
// 并启动任务
// 在上传队列没有文件的时候
// 还会判断所有上传过程是否完成,并执行最终的回调函数
executeTask = async (index, data, i) => {
this.currentTask[i] = this.handleTask(data, index);
await this.currentTask[i];
if (index + 1 === this.len) {
await Promise.all(this.currentTask);
this.handleEnd();
}
}

// 定义的每个上传队列执行的规则
// 有任务时,执行任务并等待任务结束
// 结束后如果文件队列中还有文件就取文件开始上传
transmit = async (i) => {
const { data } = this;
let currentData = data[++this.index];
while (currentData) {
await this.executeTask(this.index, currentData, i);
currentData = data[++this.index];
}
}

// 这里同步创建上传队列
// 并启动上传过程
createTransmitor = () => {
const { num } = this;
for (let i = 0; i < num; i++) {
this.transmit(i);
}
}
}

export default Transmition;
  1. 第三版

增加了队列停止的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Transmition {
constructor(num, data, func, handleSuccess, handleErr, handleEnd) {
this.num = num; // 这个属性用来控制transmitter的数量
this.data = data; // 这个属性是数组类型,保存任务队列
this.func = func; // 这个函数用来实际处理异步任务
this.index = -1; // 这个属性用来存储当前的任务指针
this.handleSuccess = handleSuccess || function() {}; // 所有任务处理完成的回调函数
this.len = data.length; // 任务的数量
this.handleErr = handleErr || function() {}; // 任务失败的处理函数
this.handleEnd = handleEnd || function() {}; // 所有任务执行完一次后的处理函数
this.currentTask = []; // 这个属性用来保存当前正在执行的三个任务,随index的变化动态替换
this.currentIndex = -1; // 这个属性用来用来当停止任务时,记录执行到的文件指针,用于后面恢复,此处没有具体用到,仅留出这个字段
}

// 这个函数拆出来是十分有必要的
// 写成这种形式,就把图片的上传过程单独封装了
// 上传成功也好,失败也好都封装在自己的过程里
// 对于整个对列的上传过程而言都是经历了上传并且已经结束
handleTask = async (data, index) => {
const { func } = this;
try {
const response = await func(data, index);
this.handleSuccess(response);
} catch (e) {
this.handleErr(e, data[index], index);
}
};

// 这里保存当前队列执行的上传任务
// 并启动任务
// 在上传队列没有文件的时候
// 还会判断所有上传过程是否完成,并执行最终的回调函数
executeTask = async (index, data, i) => {
this.currentTask[i] = this.handleTask(data, index);
await this.currentTask[i];
if (index + 1 === this.len) {
await Promise.all(this.currentTask);
this.handleEnd();
}
};

// 定义的每个上传队列执行的规则
// 有任务时,执行任务并等待任务结束
// 结束后如果文件队列中还有文件就取文件开始上传
transmit = async i => {
const { data } = this;
let currentData = data[++this.index];
while (currentData) {
await this.executeTask(this.index, currentData, i);
currentData = data[++this.index];
}
};

// 这里同步创建上传队列
// 并启动上传过程
createTransmitor = () => {
const { num } = this;
for (let i = 0; i < num; i++) {
this.transmit(i);
}
};

// 这里用来等待已经触发的异步过程
// 当已经触发的异步过程结束
// 停止队列操作
stopTransmite = async () => {
this.currentIndex = this.index;
this.index = this.len;
await Promise.all(this.currentTask);
};

// 这个方法实现了暂停后继续操作
// 目前还没有使用
continueTransmite = () => {
this.index = this.currentIndex;
this.createTransmitor();
};
}

export default Transmition;

最后

在写这篇的时候,突然想到,图片读进浏览器的过程是不是也可以使用这个策略实现,这样可以降低读入文件的压力,读完的文件也可以先显示,在交互上有比较好的反馈。

上面这个想法已经实现,在浏览器读图的时候,能够拿到图片列表,在调用上面的上传类处理,实现和往服务器传图片相同的节奏控制。另外,在读图的过程中,如果关闭组件,就需要调用第三版中的停止方法。停止方法的出现为上传暂停继续提供了基础。