流程控制库
尾触发和next
尾触发最多的应用是在Connect的中间件
var app = connect()
app.use(connect.staticCache())
app.use(connect.static(__dirname + '/public'))
app.use(conect.cokkieParser())
app.use(connect.session())
app.use(connect.query())
app.use(connect.bodyParser())
app.use(connect.csrf())
app.listen(3001)
通过use方法注册一系列的中间件,监听端口上的请求,中间件利用了尾触发的机制。
function (req, res, next) {
// 中间件
}
每个中间件传递请求对象,响应对象和尾触发的函数,通过队列形成一个处理流
中间件机制使得在处理网络请求的时候,可以向切面编程一样过滤,验证,日志等共嗯那个。不会与具体业务产生关联
Connect的核心代码
// 创建了http服务器的request事件处理函数
function createServer() {
function app(req, res) {
app.handle(req, res)
}
utils.merge(app, proto)
utils.merge(app, EventEmitter.prototype)
app.route = '/'
app.stack = []
for (var i = 0; i < arguments.length; i++) {
app.use(arguments[i])
}
return app
}
function app(req, res) {app.handle(req, res)}
通过这个代码创建了http服务器的request事件处理函数。真正核心的代码是app.stack=[]
stack属性是服务器内部维护的中间件队列,通过调用use方法可以将中间件放到队列中。
app.use = function(route,fn) {
this.stack.push({route: route, handle fn})
return this
}
此时就建立好了模型,通过整合原生的http模块就可以实现监听。监听函数的实现
app.listen = function() {
var serve = http.createServer(this)
return server.listen.apply(serve, arguments)
}
app.handle = function(req, res, out) {
next()
}
原始的next()方法较为复杂,简化后: 取出队列中的中间件并执行,同时传入当前方法实现链式调用。达到持续触发的目的
function next(node) {
layer = stack[index++]
layer.handle(req, res, next)
}
可以参考connect的流式处理模式。尽管中间件这种尾触发的模式不要求每个中间方法都是异步的,但是如果每个步骤都可以采用异步来完成,实际上只是串行化的处理,没有办法通过并行的异步调用来提升业务的处理效率,流式处理只是可以将一些串行的逻辑扁平化,但是并行逻辑处理还是需要搭配事件或者promise完成。
在connect中,尾触发适合处理网络请求的场景。将复杂的处理逻辑拆解为简介,单一的处理单元,逐层次的处理请求对象和响应对象。
async
async是流程控制模块,流程控制是开发过程中的基本需求,async提供了20多种的方法用于处理异步的各种写作模块
async提供了series()方法来实现一组任务的串行执行。
async.series([
function(callback) {
fs.readFile('file1.txt', 'utf-8', callback)
}
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback)
}
], function(err, result) {
// results => [file1.txt, file2.txt]
})
等价于
fs.readFile('file1.txt','utf-8', function(err, content) {
if (err) {
return callback(err)
}
fs.readFile('file2.txt','utf-8', function(err, data) {
if(err) {
return callback(err)
}
callback(null, [content, data])
})
})
series()
方法中传入的函数callback()并非是由使用者指定,事实上,此处的回调函数通过async经过高阶函数的方式注入,这里隐含了特殊的逻辑,每个callback()执行的时候都会将结果保存起来,然后执行到下一个调用,直到结束所有的调用,最终的回调函数执行的时候,队列中的异步调用保存到结果通过数组的方式传入。
异步的并行执行
当需要通过并行来提升性能的时候,async提供了parallel方法,通过并行执行一些异步操作
async.parallel([
function(callback) {
fs.readFile('file1.txt', 'utf-8', callback)
},
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback)
}
],function(err, results) {
// results => ['file1.txt', 'file2.txt']
})
// 等价于
var counter = 2
var results = []
var done = function(index, value) {
results[index] = value
counter--
if(counter == 0) {
callback(null, results)
}
}
var hasErr = false
var fail = function(err) {
if(!hasErr) {
hasErr = true
callback(err)
}
}
fs.readFile('file.txt', 'utf-8', function(err, content) {
if (err) {
return fail(err)
}
done(0, data)
})
fs.readFile('file2.txt','utf-8', function(err, data) {
if (err) {
return fail(err)
}
done(1, data)
})
同样,通过async编写的代码既没有深度的嵌套,也灭有复杂的状态判断,parallel()方法对于异常的判断是一旦某个异步调用产生了异常,就会将异常作为第一个参数传入给最终的回调函数,只有所有的异步调用都正常完成的时候,才会将结果以数组的形式传入。
EventProxy[前几篇中有] 是基于事件发布/订阅模式设计的,也用到了async相同的原理,通过特殊的回调函数来返回隐含返回值的处理,不同的是在async这个架构中,这个回调函数是通过async封装后传递出来的,但是EventProxy是通过done和fail方法来产生新的回调函数,这两种方法都是高阶函数的应用
异步调用的依赖处理
series()适合无依赖的异步串行。当前一个是后一个调用的输入,series()无法满足需求。async提供了waterfall()方法。
async.waterfall([
function(callback) {
fs.readFile('file1.txt', 'utf-8', function(err,data) {
callback(err, content)
})
},
function (arg1, callback) {
fs.readFile(arg1, 'utf-8', function(err, content) {
callback(err, content)
})
}
], function (err, result) {
// result => file4.txt
})
等价于
fs.readFile('file1.txt', 'utf-8', function(err, data) {
if (err) {
return callback(err)
}
fs.readFile(data1, 'utf-8', function (err, data2) {
if (err) {
return callback(err)
}
fs.readFile(data2, 'utf-8', function (err, data3) {
if (err) {
return callback(err);
}
callback(null, data3)
})
})
})
自动依赖处理
业务如下
1. 从磁盘读取配置文件
2. 链接mongodb
3. 配置文件链接redis
4. 编译静态文件
5. 上传cdn
6. 启动服务器
伪代码
readConfig: function() {}
connectMongoDB: function() {}
connectRedis: function() {}
complieAsserts: function() {}
uploadAsserts: function() {}
startup: function() {}
代码实现
var deps = {
readConfig: function (callback) {
// read config file
callback();
},
connectMongoDB: [
"readConfig",
function (callback) {
// connect to mongodb
callback();
},
],
connectRedis: [
"readconfig",
function (callback) {
// connect to redis
},
],
complieAsserts: function (callback) {
// complie asserts
callback();
},
uploadAsserts: [
"complieAsserts",
function (callback) {
// upload to assert
callback();
},
],
startup: [
"connectMongoDB",
"connectRedis",
"uploadAsserts",
function (callback) {
// startup
},
],
};
auto()方法可以根据依赖关系自动分析,以最佳的顺序执行上面的业务async.auto(deps)
proxy.assp('readtheconfig', function () {
// read config file
proxy.emit('readConfig');
}).on('readConfig', function () {
// connect to mongodb
proxy.emit('connectMongoDB');
}).on('readConfig', function () {
// connect to redis
proxy.emit('connectRedis');
}).assp('complietheasserts', function () {
// complie asserts
proxy.emit('complieAsserts');
}).on('complieAsserts', function () {
// upload to assert
proxy.emit('uploadAsserts');
}).all('connectMongoDB', 'connectRedis', 'uploadAsserts', function () {
// Startup
});
step
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8',this)
},
function readFile2(err,content) {
fs.readFile('file2.txt', 'utf-8',this)
},
function done(err,content) {
console.log(content)
}
)
step 用到了this,是step内部的一个next()方法,将异步调用的结果传递给下一个任务作为参数。
并行任务执行
this具有一个parallel()方法,告诉step,需要等待所有的任务完成才能执行下一个任务。
Step(
function readFile1() {
fs.readFile("file1.txt", "utf-8", this.parallel());
fs.readFile("file2.txt", "utf-8", this.parallel());
},
function done(err, content1, content2) {
// content1 => file1
// content2 => file2
console.log(arguments);
}
);
wind 不加赘述
小结
- 事件发布/订阅模式相对算是一种较为原始的方式,Promise/Deferred模式贡献了一个非常不错的异步任务模型的抽象。promise/deferred重点在于封装异步的调用
- 流程控制库在于会低啊函数的注入