说明 node 的流程控制是个大问题,callback hell 很容易让你对这门语言,node 这个平台嗤之以鼻,学习 async 等流程控制库可以更容易地控制流程,更重要的是看那些源码(例如 hexo),一堆的 async 调用…
本文内容 :
部分
内容
流程控制
series,parrllel,waterfall,auto
集合
each/eachSeries ,其他都是这两个实现的
比较实用
apply
series series 字面意思 : 序列,连续的,电视连续剧 TV Series 就是这个 表示按部就班的做
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 var results = [];setTimeout (function ( ) { console .log ("Task 1" ); results[0 ] = 1 ; }, 300 ); setTimeout (function ( ) { console .log ("Task 2" ); results[1 ] = 2 ; }, 200 ); setTimeout (function ( ) { console .log ("Task 3" ); results[2 ] = 3 ; }, 100 );
运行结果
看到没有按照我们书写的顺序来运行…
使用 async.series 来实现顺序运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 var async = require ("async" );var series = require ("./my_series" );var things = [ function (next ) { setTimeout (function ( ) { console .log ("100 ms passed" ); next (); }, 100 ); }, function (next ) { setTimeout (function ( ) { console .log ("200 ms passed" ); next (); }, 200 ); }, function (next ) { setTimeout (function ( ) { console .log ("300 ms passed" ); next (); }, 300 ); } ]; async .series (things);
使用 async.series 时要注意,function(next)这个 next 是一个函数,会接着调用后面的 things,如果在第一个完成时不调用这个 next(err,result1…resultn)会导致只执行第一项函数,必须调用传的 next
parallel 并行执行,当然 node 还是单线程的,所谓并行是指第二个任务不再等待第一个执行完…此谓并行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 var async = require ("async" );async .parallel ( { one : function (callback ) { setTimeout (function ( ) { console .log ("Task 1" ); callback (null , 1 ); }, 300 ); }, two : function (callback ) { setTimeout (function ( ) { console .log ("Task 2" ); callback (null , 2 ); }, 200 ); }, three : function (callback ) { setTimeout (function ( ) { console .log ("Task 3" ); callback (null , 3 ); }, 100 ); } }, function (error, results ) { console .log (results); } );
1 2 3 4 5 6 7 Task 3 Task 2 Task 1 { three : 3 , two : 2 , one : 1 }
waterfall 水流型…前一个 task 产生的结果作为下一个 task 的参数 如以下例子, c = 根号下( a^2 + b^2 ) 直角三角形的勾股定理
第一步产生随机数 a b
计算 a^2 + b^2
计算开根号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 var async = require ("async" );async .waterfall ( [ function (callback ) { callback (null , Math .random (), Math .random ()); }, function (a, b, callback ) { callback (null , a * a + b * b); }, function (cc, callback ) { callback (null , Math .sqrt (cc)); } ], function (error, c ) { console .log (c); } );
我在自己实现的时候,发现在 apply 调用的时候,task 的 function 的 this 被我设置为 null,this 变量被浪费了,运用起来岂不爽哉…于是就有了下文…
修改 async 库,使用 this 传递结果 series ### 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 var series = require ("./my_series.js" );var as = require ("assert" );var things = [ function (next ) { var self = this ; setTimeout (function ( ) { console .log ("100 ms passed" ); self.task1 = 1 ; next (null , 1 ); }, 100 ); }, function (next ) { var self = this ; setTimeout (function ( ) { console .log ("200 ms passed" ); self.task2 = 2 ; next (); }, 200 ); }, function (next ) { var self = this ; setTimeout (function ( ) { console .log ("300 ms passed" ); self.task3 = 3 ; next (null , 3 ); }, 300 ); } ]; series (things, function (err, results ) { as (err == null ); if (err) throw err; console .log ("results : " + results); console .log ("this : " ); console .log (this ); });
测试输出 1 2 3 4 5 6 7 8 9 /* * series this 测试输出 */ 100 ms passed 200 ms passed 300 ms passed results : 1 ,3 this : { task1: 1 , task2: 2 , task3: 3 }
看到么,也就是
可以按照 async 的形式来用,传 next 函数,结束时调用 next(null,res1 … resn),全部结束时在回调 results 拿结果
也可以将结果放到 this 中没这个 this 是最外层的 this,使用 self 防止覆盖,放心,不会污染全局变量的
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 module .exports = series;function series (tasks, callback ) { if (!tasks) final (); var results = []; var results_obj = {}; var current = -1 ; var final = function (err ) { if (callback) { callback.call (results_obj, err, results); } }; var done = function (err ) { if (err) final (err); results = results.concat ([].slice .call (arguments , 1 )); if (current < tasks.length - 1 ) next (); else final (); }; var next = function ( ) { current++; var func = tasks[current]; func.call (results_obj, done); }; next (); }
parallel ### 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 var things = { one : function (next ) { var self = this ; setTimeout (function ( ) { console .log ("one ..." ); self.one = 1 ; next (null , 1 ); }, 100 ); }, two : function (next ) { var self = this ; setTimeout (function ( ) { console .log ("two ..." ); self.two = 2 ; next (); }, 200 ); }, three : function (next ) { var self = this ; setTimeout (function ( ) { console .log ("three ..." ); self.three = 3 ; next (null , 3 ); }, 300 ); } }; var callback = function (err, results ) { if (err) throw err; console .log ("results :" ); console .log (results); console .log (); console .log ("this : " ); console .log (this ); }; var parallel = require ("./my_parallel.js" );parallel (things, callback);
测试输出 1 2 3 4 5 6 7 8 one ... two ... three ... results : { one: 1 , three: 3 } this : { one: 1 , two: 2 , three: 3 }
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 module .exports = parallel;function parallel (tasks, callback ) { var results = {}; var results_obj = {}; if (!tasks) final (); var finished = 0 ; var count = Object .keys (tasks).length ; for (var key in tasks) { (function (key ) { var func = tasks[key]; func.call (results_obj, function (err ) { if (err) final (err); if (arguments [1 ]) { results[key] = arguments [1 ]; } if (++finished == count) { final (); } }); })(key); } function final (err ) { if (callback) { callback.call (results_obj, err, results); } } }
waterfall ### 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var things = [ function (next ) { this .a = Math .random (); console .log ("a : %d" , this .a ); next (); }, function (next ) { this .b = Math .random (); console .log ("b : %d" , this .b ); next (); }, function (next ) { var a = this .a ; var b = this .b ; next (null , a * a + b * b); }, function (cc, next ) { var c = Math .sqrt (cc); next (null , c); } ]; var cb = function (error, c ) { console .log ("c: %d" , c); }; var waterfall = require ("./my_waterfall.js" );waterfall (things, cb);
测试输出 1 2 3 a : 0 .3195089085493237 b : 0 .7993917833082378 c : 0 .8608792980802272
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 module .exports = function waterfall (tasks, callback ) { var i = -1 ; var prev_results = []; var results_obj = {}; var next = function ( ) { i++; var func = tasks[i]; func.apply (results_obj, prev_results.concat (done)); }; function final (err ) { if (callback) { if (err) callback.call (results_obj, err); else callback.apply (results_obj, [null ].concat (prev_results)); } } function done (err ) { if (err) final (err); prev_results = [].slice .call (arguments , 1 ); if (i < tasks.length - 1 ) next (); else final (err); } next (); };
修改说明 较之前的实现,修改的地方是,增加 results_obj 作为 this 传递到各 function 顶层,最后传递至最后的 callback 中也是this
,将各 func 调用方式改为 call
async.auto 自动决定 tasks 的执行顺序,听着比较高级,我等码白还是不敢用的,不过 hexo 里面就用了 async.auto,实现就算了,来看看发生了什么事
async 的 readme 里面的样例代码,稍有修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 var async = require ("async" );var tasks = { get_data : function (callback ) { console .log ("in get_data" ); callback (null , "data" , "converted to array" ); }, make_folder : function (callback ) { console .log ("in make_folder" ); callback (null , "folder" ); }, write_file : [ "get_data" , "make_folder" , function (callback, results ) { console .log ("in write_file" , JSON .stringify (results)); callback (null , "filename" ); } ], email_link : [ "write_file" , function (callback, results ) { console .log ("in email_link" , JSON .stringify (results)); callback (null , { file : results.write_file , email : "user@example.com" }); } ] }; var cb = function (err, results ) { console .log ("---------" ); console .log ("最后的callback" ); console .log ("err = " , err); console .log ("results = " , results); }; async .auto (tasks, cb);
输出
1 2 3 4 5 6 7 8 9 10 11 12 in get_data in make_folder in write_file {"get_data" :["data" ,"converted to array" ],"make_folder" :"folder" } in email_link {"get_data" :["data" ,"converted to array" ],"make_folder" :"folder" ," write_file":" filename"} --------- 最后的callback err = null results = { get_data: [ 'data', 'converted to array' ], make_folder: 'folder', write_file: 'filename', email_link: { file: 'filename', email: 'user@example.com' } }
也就是
依赖其他 task 的 task,接受(callback,results)参数,results 是 object,key 是 task 名称,值是各 task 用 callback 传回的,多个聚合成数组
源码是这样实现的,为每个任务创建 ready 函数,判断是否可以开始执行,刚开始 tasks for in 遍历,能执行就执行了,不能执行为 complete 添加一个 listener,就是一个子 task 执行完会触发 complete,没有使用内置的 EventEmmiter,简陋的一个函数加一个数组搞定,这个 listener 再次判断 ready,直至全部完成,调用 callback…
是不是有点枯燥妮,福利
上面福利是昨天的,今天还会有的,接着写
each ## 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 var arr = [1 , 2 , 3 , 4 , 5 ];var handler = function (x, cb ) { console .log (x); setTimeout (function ( ) { cb (); }, 100 * x); }; console .time ("each" );async .each (arr, handler, function (err ) { if (err) throw err; console .log ("已完成" ); console .timeEnd ("each" ); });
测试输出 1 2 3 4 5 6 7 1 2 3 4 5 已完成 each : 526 ms
可以看到 each 是加上就是 parallel,1 等待 100ms … 5 等待 500ms,总执行时间在最长的 500ms 以上一点
我的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 function each (arr, fn, callback ) { if (!arr) final (); var finished = 0 ; function final (err ) { if (callback) callback (err); } function done (err ) { if (err) final (err); if (++finished == arr.length ) final (); } arr.forEach (function (x ) { fn (x, done); }); }
参考 async 库的源码之后,真心佩服一个 done 方法清楚地表示了这是传递给各个任务的,很好…抄下来,我也用 done…async 库还兼容了浏览器(array.forEach),完事后置 callback 为空函数,done 只准调用一次等等安全措施,我就不搞了…
eachSeries each - parallel eachSeries - series 对应关系
each 和 parallel 后面的任务不用等待前面结束,而 series 和 eachSeries 则是后面等待前面,依次运行 ## 测试代码 测试代码同 each,也就是 3 个 setTimeout,将 each 改为 eachSeries 方法 ## 测试结果
1 2 3 4 5 6 7 1 2 3 4 5 已完成 eachSeries: 1532 ms
可以看到总时间 1500+ms,算算 (1+2+3+4+5)*100 等于多少,即可见 eachSeries 与 each 之间区别
我的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 function eachSeries (arr, fn, callback ) { if (!arr) final (); var finished = 0 ; function final (err ) { if (callback) callback (err); } function done (err ) { if (err) final (err); if (++finished < arr.length ) next (); else final (); } function next ( ) { var item = arr[finished]; fn (item, done); } next (); }
其他的可以用这两个(each/eachSeries)实现,如 filter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 var each = require ("./my_each.js" ).each ;var async = require ("async" ); filter ( [0 , 1 , 2 , 3 ], function (x, cb ) { cb (x % 2 == 0 ); }, function (results ) { console .log (results); } ); function filter (arr, tester, callback ) { var results = []; each ( arr, function (x, each_cb ) { tester (x, function (bool_val ) { if (bool_val) { results.push (x); } each_cb (); }); }, function (err ) { callback (results); } ); }
先去掉 async 注释看看,async 的效果,是输出[0,2]就是输出偶数,以回调传值,然后自己用 each 实现的 filter…脑子转快点,要不然不够用…我就是,callback 已经不是回调
了,而是来回调
,这个 filter 有点问题,不能保证顺序,在 each 没有提供 index 参数,要顺序只能自己重头写,或者改用 eachSeries,那个是顺序调用…
apply 这个是绑定方法,源码就几行,先来看看啦作用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async .parallel ([ async .apply (fs.writeFile , "testfile1" , "test1" ), async .apply (fs.writeFile , "testfile2" , "test2" ) ]); async .parallel ([ function (callback ) { fs.writeFile ("testfile1" , "test1" , callback); }, function (callback ) { fs.writeFile ("testfile2" , "test2" , callback); } ]);
此例是 async 的 readme 中的例子,很直观,async.apply(fn)返回一个包装 fn 的函数
源码
1 2 3 4 5 6 async .apply = function (fn ) { var args = Array .prototype .slice .call (arguments , 1 ); return function ( ) { return fn.apply (null , args.concat (Array .prototype .slice .call (arguments ))); }; };
首先,apply(fs.writeFile,”testFile1”,”test1”) 取出 fn = fs.writeFile arg = [“testFile1”,”test1”] 再封装一层 function(xxx){ },再这个 function 内部,arguments 又是它自己的 arguments,所以当 var func = tasks[i]; func(function(err){ if(error) throw err; // < length next // > final }) 就变成了,fs.apply(“testFile1”,”test1”,callback);