pro-node 之 async 使用,实现,修改

说明

node 的流程控制是个大问题,callback hell 很容易让你对这门语言,node 这个平台嗤之以鼻,学习 async 等流程控制库可以更容易地控制流程,更重要的是看那些源码(例如 hexo),一堆的 async 调用…

本文内容 :

部分 内容
流程控制 series,parrllel,waterfall,auto
集合 each/eachSeries ,其他都是这两个实现的
比较实用 apply

series

series 字面意思 : 序列,连续的,电视连续剧 TV Series 就是这个
表示按部就班的做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* 测试js代码
*/
var results = [];
setTimeout(function() {
console.log("Task 1");
results[0] = 1;
}, 300);
setTimeout(function() {
console.log("Task 2");
results[1] = 2;
}, 200);
setTimeout(function() {
console.log("Task 3");
results[2] = 3;
}, 100);

运行结果

1
2
3
4
5
6
/*
* 测试结果
*/
Task 3
Task 2
Task 1

看到没有按照我们书写的顺序来运行…

使用 async.series 来实现顺序运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* async.series使用
*/
var async = require("async");
var series = require("./my_series");

var things = [
function(next) {
setTimeout(function() {
console.log("100 ms passed");
next();
}, 100);
},
function(next) {
setTimeout(function() {
console.log("200 ms passed");
next();
}, 200);
},
function(next) {
setTimeout(function() {
console.log("300 ms passed");
next();
}, 300);
}
];
async.series(things);

使用 async.series 时要注意,function(next)这个 next 是一个函数,会接着调用后面的 things,如果在第一个完成时不调用这个 next(err,result1…resultn)会导致只执行第一项函数,必须调用传的 next

parallel

并行执行,当然 node 还是单线程的,所谓并行是指第二个任务不再等待第一个执行完…此谓并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
* parallel测试代码
*/
var async = require("async");
async.parallel(
{
one: function(callback) {
setTimeout(function() {
console.log("Task 1");
callback(null, 1);
}, 300);
},
two: function(callback) {
setTimeout(function() {
console.log("Task 2");
callback(null, 2);
}, 200);
},
three: function(callback) {
setTimeout(function() {
console.log("Task 3");
callback(null, 3);
}, 100);
}
},
function(error, results) {
console.log(results);
}
);
1
2
3
4
5
6
7
/*
* parallel结果
*/
Task 3
Task 2
Task 1
{ three: 3, two: 2, one: 1 }

waterfall

水流型…前一个 task 产生的结果作为下一个 task 的参数
如以下例子, c = 根号下( a^2 + b^2 ) 直角三角形的勾股定理

  1. 第一步产生随机数 a b
  2. 计算 a^2 + b^2
  3. 计算开根号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var async = require("async");
async.waterfall(
[
function(callback) {
callback(null, Math.random(), Math.random());
},
function(a, b, callback) {
callback(null, a * a + b * b);
},
function(cc, callback) {
callback(null, Math.sqrt(cc));
}
],
function(error, c) {
console.log(c);
}
);
  • 我在自己实现的时候,发现在 apply 调用的时候,task 的 function 的 this 被我设置为 null,this 变量被浪费了,运用起来岂不爽哉…于是就有了下文…

修改 async 库,使用 this 传递结果

series ### 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
* series this 测试代码
*/
var series = require("./my_series.js");
var as = require("assert");

var things = [
function(next) {
var self = this;
setTimeout(function() {
console.log("100 ms passed");
self.task1 = 1;
next(null, 1);
}, 100);
},
function(next) {
var self = this;
setTimeout(function() {
console.log("200 ms passed");
self.task2 = 2;
next();
}, 200);
},
function(next) {
var self = this;
setTimeout(function() {
console.log("300 ms passed");
self.task3 = 3;
next(null, 3);
}, 300);
}
];

series(things, function(err, results) {
as(err == null);
if (err) throw err;

console.log("results : " + results);
console.log("this : ");
console.log(this);
});

测试输出

1
2
3
4
5
6
7
8
9
/*
* series this 测试输出
*/
100 ms passed
200 ms passed
300 ms passed
results : 1,3
this :
{ task1: 1, task2: 2, task3: 3 }

看到么,也就是

  • 可以按照 async 的形式来用,传 next 函数,结束时调用 next(null,res1 … resn),全部结束时在回调 results 拿结果
  • 也可以将结果放到 this 中没这个 this 是最外层的 this,使用 self 防止覆盖,放心,不会污染全局变量的

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* series this 实现代码
*/
module.exports = series;

function series(tasks, callback) {
if (!tasks) final();

var results = [];
var results_obj = {};
var current = -1;

var final = function(err) {
if (callback) {
callback.call(results_obj, err, results);
}
};

//一个task完成时
var done = function(err) {
//出现错误,立即返回
if (err) final(err);

//取出本次结果
results = results.concat([].slice.call(arguments, 1));

//执行第一个,此时i=0,若只有一个length = 1,则不继续next
if (current < tasks.length - 1) next();
else final();
};

var next = function() {
current++;
var func = tasks[current];
func.call(results_obj, done);
};

next();
}

parallel ### 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
var things = {
one: function(next) {
var self = this;
setTimeout(function() {
console.log("one ...");
self.one = 1;
next(null, 1);
}, 100);
},
two: function(next) {
var self = this;
setTimeout(function() {
console.log("two ...");
self.two = 2;
next();
}, 200);
},
three: function(next) {
var self = this;
setTimeout(function() {
console.log("three ...");
self.three = 3;
next(null, 3);
}, 300);
}
};
var callback = function(err, results) {
if (err) throw err;
console.log("results :");
console.log(results);
console.log();
console.log("this : ");
console.log(this);
};

var parallel = require("./my_parallel.js");
parallel(things, callback);

测试输出

1
2
3
4
5
6
7
8
one ...
two ...
three ...
results :
{ one: 1, three: 3 }

this :
{ one: 1, two: 2, three: 3 }

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
module.exports = parallel;

function parallel(tasks, callback) {
/*
tasks : {
key : func(cb){
cb(null,1);
}
}
*/
var results = {};
var results_obj = {};
if (!tasks) final();

var finished = 0;
var count = Object.keys(tasks).length;

for (var key in tasks) {
(function(key) {
var func = tasks[key];
func.call(results_obj, function(err) {
if (err) final(err);

if (arguments[1]) {
results[key] = arguments[1];
}
if (++finished == count) {
final();
}
});
})(key);
}

function final(err) {
if (callback) {
callback.call(results_obj, err, results);
}
}
}

waterfall ### 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//根号下(a^2 + b^2) = c
var things = [
function(next) {
this.a = Math.random();
console.log("a : %d", this.a);
next();
},
function(next) {
this.b = Math.random();
console.log("b : %d", this.b);
next();
},
function(next) {
var a = this.a;
var b = this.b;
next(null, a * a + b * b);
},
function(cc, next) {
var c = Math.sqrt(cc);
next(null, c);
}
];
var cb = function(error, c) {
console.log("c: %d", c);
};

var waterfall = require("./my_waterfall.js");
waterfall(things, cb);

测试输出

1
2
3
a : 0.3195089085493237
b : 0.7993917833082378
c: 0.8608792980802272

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
module.exports = function waterfall(tasks, callback) {
/*
tasks : [
function(next){
next(null,a,b);
},
function(a,b,next){
xxx
}
]
*/
var i = -1;
var prev_results = [];
var results_obj = {};

//内部 move_next
var next = function() {
i++; // i=0 第一个
var func = tasks[i];

//tasks[i](a,b,next)
func.apply(results_obj, prev_results.concat(done));
};

function final(err) {
if (callback) {
if (err) callback.call(results_obj, err);
//添加error = null
else callback.apply(results_obj, [null].concat(prev_results));
}
}

function done(err) {
if (err) final(err);

prev_results = [].slice.call(arguments, 1); //[1] [] .slice(1)不会报错

if (i < tasks.length - 1)
//i = 0,0处执行完
next();
else final(err);
}

next();
};

修改说明

较之前的实现,修改的地方是,增加 results_obj 作为 this 传递到各 function 顶层,最后传递至最后的 callback 中也是this,将各 func 调用方式改为 call

async.auto

自动决定 tasks 的执行顺序,听着比较高级,我等码白还是不敢用的,不过 hexo 里面就用了 async.auto,实现就算了,来看看发生了什么事

async 的 readme 里面的样例代码,稍有修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
var async = require("async");

var tasks = {
get_data: function(callback) {
console.log("in get_data");
// async code to get some data
callback(null, "data", "converted to array");
},
make_folder: function(callback) {
console.log("in make_folder");
// async code to create a directory to store a file in
// this is run at the same time as getting the data
callback(null, "folder");
},
write_file: [
"get_data",
"make_folder",
function(callback, results) {
console.log("in write_file", JSON.stringify(results));
// once there is some data and the directory exists,
// write the data to a file in the directory

callback(null, "filename");
}
],
email_link: [
"write_file",
function(callback, results) {
console.log("in email_link", JSON.stringify(results));
// once the file is written let's email a link to it...
// results.write_file contains the filename returned by write_file.
callback(null, {
file: results.write_file,
email: "user@example.com"
});
}
]
};
var cb = function(err, results) {
console.log("---------");
console.log("最后的callback");
console.log("err = ", err);
console.log("results = ", results);
};

async.auto(tasks, cb);

输出

1
2
3
4
5
6
7
8
9
10
11
12
in get_data
in make_folder
in write_file {"get_data":["data","converted to array"],"make_folder":"folder"}
in email_link {"get_data":["data","converted to array"],"make_folder":"folder","
write_file":"filename"}
---------
最后的callback
err = null
results = { get_data: [ 'data', 'converted to array' ],
make_folder: 'folder',
write_file: 'filename',
email_link: { file: 'filename', email: 'user@example.com' } }

也就是

  • 依赖其他 task 的 task,接受(callback,results)参数,results 是 object,key 是 task 名称,值是各 task 用 callback 传回的,多个聚合成数组

源码是这样实现的,为每个任务创建 ready 函数,判断是否可以开始执行,刚开始 tasks for in 遍历,能执行就执行了,不能执行为 complete 添加一个 listener,就是一个子 task 执行完会触发 complete,没有使用内置的 EventEmmiter,简陋的一个函数加一个数组搞定,这个 listener 再次判断 ready,直至全部完成,调用 callback…

是不是有点枯燥妮,福利


上面福利是昨天的,今天还会有的,接着写

each ## 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
var arr = [1, 2, 3, 4, 5];
var handler = function(x, cb) {
console.log(x);
setTimeout(function() {
cb(); //此项已完成
}, 100 * x);
};
console.time("each");
async.each(arr, handler, function(err) {
if (err) throw err;
console.log("已完成");
console.timeEnd("each");
});

测试输出

1
2
3
4
5
6
7
1
2
3
4
5
已完成
each: 526ms

可以看到 each 是加上就是 parallel,1 等待 100ms … 5 等待 500ms,总执行时间在最长的 500ms 以上一点

我的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
function each(arr, fn, callback) {
if (!arr) final();
var finished = 0;

function final(err) {
if (callback) callback(err);
}

//完成时会调用
function done(err) {
if (err) final(err);

if (++finished == arr.length) final();
}

arr.forEach(function(x) {
fn(x, done);
});
}

参考 async 库的源码之后,真心佩服一个 done 方法清楚地表示了这是传递给各个任务的,很好…抄下来,我也用 done…async 库还兼容了浏览器(array.forEach),完事后置 callback 为空函数,done 只准调用一次等等安全措施,我就不搞了…

eachSeries

each - parallel
eachSeries - series
对应关系

each 和 parallel 后面的任务不用等待前面结束,而 series 和 eachSeries 则是后面等待前面,依次运行 ## 测试代码
测试代码同 each,也就是 3 个 setTimeout,将 each 改为 eachSeries 方法 ## 测试结果

1
2
3
4
5
6
7
1
2
3
4
5
已完成
eachSeries: 1532ms

可以看到总时间 1500+ms,算算 (1+2+3+4+5)*100 等于多少,即可见 eachSeries 与 each 之间区别

我的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function eachSeries(arr, fn, callback) {
if (!arr) final();
var finished = 0;

function final(err) {
if (callback) callback(err);
}

function done(err) {
if (err) final(err);

if (++finished < arr.length) next();
else final();
}

function next() {
var item = arr[finished]; //finised = 0,一个都没完成,取第一个 = 0
fn(item, done);
}

next(); //开始走第一个
}

其他的可以用这两个(each/eachSeries)实现,如 filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var each = require("./my_each.js").each;
var async = require("async");

/*async.*/ filter(
[0, 1, 2, 3],
function(x, cb) {
cb(x % 2 == 0);
},
function(results) {
console.log(results);
}
);

function filter(arr, tester, callback) {
var results = [];
each(
arr,
function(x, each_cb) {
tester(x, function(bool_val) {
if (bool_val) {
results.push(x);
}
each_cb(); //each的callback,表示本次完成
});
},
function(err) {
callback(results);
}
);
}

先去掉 async 注释看看,async 的效果,是输出[0,2]就是输出偶数,以回调传值,然后自己用 each 实现的 filter…脑子转快点,要不然不够用…我就是,callback 已经不是回调了,而是来回调,这个 filter 有点问题,不能保证顺序,在 each 没有提供 index 参数,要顺序只能自己重头写,或者改用 eachSeries,那个是顺序调用…

apply

这个是绑定方法,源码就几行,先来看看啦作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// using apply
async.parallel([
async.apply(fs.writeFile, "testfile1", "test1"),
async.apply(fs.writeFile, "testfile2", "test2")
]);

// the same process without using apply
async.parallel([
function(callback) {
fs.writeFile("testfile1", "test1", callback);
},
function(callback) {
fs.writeFile("testfile2", "test2", callback);
}
]);

此例是 async 的 readme 中的例子,很直观,async.apply(fn)返回一个包装 fn 的函数

源码

1
2
3
4
5
6
async.apply = function(fn) {
var args = Array.prototype.slice.call(arguments, 1);
return function() {
return fn.apply(null, args.concat(Array.prototype.slice.call(arguments)));
};
};

首先,apply(fs.writeFile,”testFile1”,”test1”)
取出
fn = fs.writeFile
arg = [“testFile1”,”test1”]
再封装一层 function(xxx){ },再这个 function 内部,arguments 又是它自己的 arguments,所以当
var func = tasks[i];
func(function(err){
if(error) throw err;
// < length next
// > final
})
就变成了,fs.apply(“testFile1”,”test1”,callback);