并發(fā)編程一直是令人頭疼的編程方式,直到 Clojure 和 Go 的出現(xiàn),徹底改變了我們并發(fā)編程的方式。而對(duì)于單線程的 JavaScript,基于事件循環(huán)的 并發(fā)模型也一直困擾著我們,到底能從 Clojure 學(xué)些什么,可以使我們的前端 并發(fā)編程之路更順暢一些呢?本章將帶你熟悉:
在介紹 CSP 之前首先有兩個(gè)概念需要強(qiáng)調(diào)一下,那就是并發(fā)與并行。 為了便于理解,我會(huì)結(jié)合現(xiàn)實(shí)生活舉一個(gè)例子。
假設(shè)我正在上班寫(xiě)代碼,老板過(guò)來(lái)拍著肩膀說(shuō)明天要發(fā)布,加個(gè)班吧。于是我發(fā)個(gè)短信給老婆說(shuō)晚點(diǎn)回,發(fā)完以后繼續(xù)敲代碼。那么請(qǐng)問(wèn),發(fā)短信和敲代碼兩個(gè)任務(wù)是 并發(fā) 還是并行 ?
但如果我還特別喜歡音樂(lè),所以我邊聽(tīng)音樂(lè)邊敲代碼,那么寫(xiě)代碼和聽(tīng)音樂(lè)兩個(gè)任務(wù)是并發(fā)還是 并行 ?
為了不侮辱讀者的智商,我就不公布答案了。所以說(shuō):
所以說(shuō)到并發(fā),如果不是系統(tǒng)編程,我們大多關(guān)心的只是多線程并發(fā)編程。因?yàn)檫M(jìn)程調(diào)度是需要操作系統(tǒng)更關(guān)心的事情。
繼續(xù)敲代碼這個(gè)例子,假如我現(xiàn)在能 fork 出來(lái)一只手發(fā)來(lái)短信,但是我還是只有一個(gè)腦袋,在發(fā)短信的時(shí)候我的腦子還是只能集中在 如何編一個(gè)理由向老婆請(qǐng)假,而另外兩只手只能放在鍵盤(pán)上什么也干不了,直到短信發(fā)出去,才能繼續(xù)寫(xiě)代碼。
所以多線程開(kāi)銷大至需要長(zhǎng)出(fork)一只手,結(jié)束后縮回去(join),但是這 些代價(jià)并沒(méi)有帶來(lái)時(shí)間上的好處,發(fā)短信時(shí)其它兩只手其實(shí)是閑置(阻塞)著的。
http://wiki.jikexueyuan.com/project/clojure-flavored-javascript/images/multithread.png" alt="" />
圖14 線程任務(wù)到達(dá) CPU 的不確定順序
因此,另外一種更省資源的處理并發(fā)的方式就出現(xiàn)了——異步編程,或者叫 事件驅(qū)動(dòng)模型 。對(duì)的,就是我們?cè)?JavaScript 代碼里經(jīng)常發(fā)的 Ajax 那個(gè)異步。
比如我還是兩只手,我發(fā)完短信繼續(xù)就敲代碼了,這時(shí),老婆給我回了一條短信,那我放下手中的活,拿起手機(jī)看看,老牌居然說(shuō)了“同意”,于是就安心的放下手機(jī)繼續(xù)敲代碼了。
注意這段動(dòng)作與之前多線程的區(qū)別,相對(duì)于多線程的場(chǎng)景下 fork 了第三只手在敲代碼時(shí)一直呆呆的握著手機(jī),異步編程并不需要增加胳膊,資源利用率更高一些。
那么你就要問(wèn)了,你是怎么知道手機(jī)響的,還不是要開(kāi)一個(gè)線程讓耳朵監(jiān)聽(tīng)著。對(duì)的,但是異步只需要很少的有限個(gè)線程就好了。比如我有十個(gè)手機(jī) 要發(fā)給十個(gè)老婆,我還是兩個(gè)線程,相比如果是多線程的話我要 fork 出來(lái)十只手,卻是會(huì)省了不少的資源的。
所以 JavaScript 的并發(fā)模型就是這么實(shí)現(xiàn)的,有一個(gè)專門(mén) 的事件循環(huán)(event loop)線程,就如同我們的耳朵,不停的檢查消息隊(duì)列中是否還有待執(zhí)行的任務(wù)。
JavaScript 的并發(fā)模型主要基于事件循環(huán),運(yùn)行 JavaScript 代碼其實(shí)就是從 event loop 里面取任務(wù),隊(duì)列中任務(wù)的來(lái)源為函數(shù)調(diào)用棧與事件綁定。
f(),都會(huì)被加到消息隊(duì)列中,運(yùn)行該任務(wù)直到調(diào)用棧全部彈空。setTimeout(somefunction,0) 其實(shí)是 注冊(cè)一個(gè)事件句柄(event handler), timer 會(huì)在“0毫秒”后“立刻”往隊(duì)列加入 somefunction (如果不是 0,則是 n 長(zhǎng)時(shí)間后加入隊(duì)列)http://wiki.jikexueyuan.com/project/clojure-flavored-javascript/images/event-loop-model.png" alt="" />
圖15 callback 會(huì)加到消息隊(duì)列末尾
function a(){
console.log('a');
}
function b(){
console.log('b');
}
function timeout(){
console.log('timeout');
}
setTimeout(timeout,0);
a();
b();
// => "a"
// => "b"
// => "timeout"
這個(gè)例子中的 timeout 函數(shù)并沒(méi)有在 a 或 b 之前被調(diào)用,因?yàn)楫?dāng)時(shí)的消息隊(duì)列應(yīng)該是這樣的(處理順序從左至右)
http://wiki.jikexueyuan.com/project/clojure-flavored-javascript/images/message-queue.png" alt="" />
現(xiàn)在,我們可以用的并發(fā)模型來(lái)再實(shí)現(xiàn)一下我們最開(kāi)始的加班寫(xiě)代碼的例子:
let keepDoing = (something, interval) => {
return setInterval(()=>console.log(something), interval);
};
let notify = function(read, callback, yesno){
console.log('dinglingling')
setTimeout(read.bind(callback), 2000)
};
let meSendingText = function(callback) {
console.log('Me sending text');
notify(wifeReadingText, callback)
}
let wifeReadingText = function(callback){
console.log('my wife sending text');
notify(callback, null, 'yes')
};
let working = keepDoing('typing',1000);
let meReadingText = function(msg) {
if(msg!='ok') clearInterval(work);
console.log('I\'m reading text');
}
meSendingText((msg)=>{
if(msg!='ok') clearInterval(work);
else
console.log('continue working');
});
其中 notify 負(fù)責(zé)往事件循環(huán)上放一個(gè)任務(wù),當(dāng)老婆讀了短信,并 notify 我讀回信之后,兩秒后短信發(fā)到了我的手機(jī)上,手機(jī)(包含快來(lái)閱讀短信句柄)的鈴聲通過(guò)我的耳朵傳到我的腦回路中,觸發(fā)我開(kāi)始讀短信。
使用事件循環(huán)回調(diào)的形式看起來(lái)還挺高效的,而且 JavaScript 編程中我們也一直也是這么用的。但是當(dāng)異步調(diào)用多了之后,就會(huì)出現(xiàn) 回調(diào)地獄 (Callback Hell)的現(xiàn)象,為什么說(shuō)是 地獄 呢, 可以想象一下前面例子中如果我有十個(gè)老婆,要向 五個(gè)老婆發(fā)短信申請(qǐng)加班,而且都同意后才能繼續(xù)工作,該是如何實(shí)現(xiàn)呢? #+INDEX 回調(diào)地獄
meSendingText(wife1Reading, (msg)=>{
if(msg=='yes')
metSendingText(wife2Reading, (msg)=>{
if(msg=='yes')
metSendingText(wife3Reading, (msg)=>{
if(msg=='yes')
metSendingText(wife4Reading, (msg)=>{
if(msg=='yes')
metSendingText(wife5Reading, (msg)=>{
if(msg=='yes')
console.log('continue working')
})
})
})
})
})
只要有一個(gè)異步函數(shù)要回調(diào),那么所有依賴于這個(gè)異步函數(shù)結(jié)束的函數(shù)都得放到該函數(shù)的回調(diào)內(nèi)。這是個(gè)比地獄還要深的回調(diào)地獄。 于是前段時(shí)間特別火的 Promise,似乎能夠緩解一下回調(diào)地獄的窘境。但是,Promise 并不是專門(mén)用來(lái)消除回調(diào)地獄的,Promise 更有意義的應(yīng)該是在于 Monadic 編程。對(duì)于回調(diào)地獄,Promise 能做的也只是把這些回調(diào)平鋪開(kāi)而已。
從乘坐手扶電梯下回調(diào)地獄,變成了乘坐直梯下回調(diào)地獄。
meSendingText(wife1Reading)
.then(()=>meSendingText(wife2Reading))
.then(()=>meSendingText(wife3Reading))
.then(()=>meSendingText(wife4Reading))
.then(()=>meSendingText(wife5Reading))
當(dāng)然,如果是使用 Monadic 編程方式來(lái)解決這種問(wèn)題的話,其實(shí)也可以變得非常優(yōu)雅而且函數(shù)式,讀者可以嘗試用 when 實(shí)現(xiàn)一下(請(qǐng)回到第七章,如果你忘了 when 是什么)。
但是本章,我要強(qiáng)調(diào)的是一種更有意思的異步編程方式 CSP。
通信順序進(jìn)程(Communicating Sequential Processes), 是計(jì)算機(jī)科學(xué)中用于一種描述并發(fā)系統(tǒng)中交互的形式語(yǔ)言,簡(jiǎn)稱 CSP,來(lái)源于C.A.R Hoare 1978年的論文。沒(méi)錯(cuò)了,Hoare就是發(fā)明(讓我們熟悉的大學(xué)算法課糾結(jié)得快要掛科的) 快排算法的那位計(jì)算機(jī)科學(xué)家了。
CSP 由于最近 Go 語(yǔ)言的興起突然復(fù)活,Go 給自己的 CSP 實(shí)現(xiàn)起名叫 goroutines and channels 25,由于實(shí)在是太好用了,Clojure 也加入了 CSP 的陣營(yíng),弄了一個(gè)包叫做 core.async 。
CSP 的概念非常簡(jiǎn)單, 想象一下事件循環(huán),類似的:
http://wiki.jikexueyuan.com/project/clojure-flavored-javascript/images/csp.png" alt="" />
圖17 CSP 中的 Channel
這樣就成功的把任務(wù)和異步數(shù)據(jù)成功從回調(diào)地獄中分離開(kāi)來(lái)。還是剛才發(fā)短信的例子,我們來(lái)用 CSP 實(shí)現(xiàn)一遍:
(def working (chan))
(def texting (chan))
(defn boss-yelling []
(go-loop [no 1]
(<! (timeout 1000))
(>! working (str "bose say: work " no))
(recur (+ no 1))))
(defn wife-texting []
(go-loop []
(<! (timeout 4000))
(>! texting "wife say: come home!")
(recur)))
(defn reading-text []
(go-loop []
(println (<! texting) "me: ignore")
(recur)))
(defn work []
(go-loop []
(println (<! working) " me: working")
(recur)))
(boss-yelling)
(wife-texting)
(work)
(reading-text)
working 和 texting 這兩個(gè)channelgo-loop 神奇的地方是,它循環(huán)獲取channel中的數(shù)據(jù),當(dāng)隊(duì)列空時(shí),它的狀態(tài)會(huì)變成 parking,并沒(méi)有阻塞線程,而是保存當(dāng)前狀態(tài),繼續(xù)去試另一個(gè)go語(yǔ)句。work 來(lái)說(shuō), (<! texting) 就是從 channel texting 中取數(shù)據(jù),如果 texting 為空,則parkingwife-texting, (>! texting "wife say: come home!") 是往 channel texting 中加數(shù)據(jù),如果 channel 已滿,則也切到 parking 狀態(tài)。在看明白了 Clojure 是如何使用 channel 來(lái)解耦我的問(wèn)題后,再回過(guò)頭來(lái)看 JavaScript 如何實(shí)現(xiàn)類似的 CSP 編程呢?
先理一下我們都要實(shí)現(xiàn)些什么:
當(dāng)然,首先要實(shí)現(xiàn)的當(dāng)然是最重要的 go block,但是在這之前,讓我們看看實(shí)現(xiàn) go block 的前提 ES6 的一個(gè)的新標(biāo)準(zhǔn)—— generator 。
ES6 終于支持了Generator,目前Firefox與Chrome都已經(jīng)實(shí)現(xiàn)。27 Generator 在每次被調(diào)用時(shí)返回 yield 后邊表達(dá)式的值,并保存狀態(tài),下次調(diào)用時(shí)繼續(xù)運(yùn)行。
這種功能聽(tīng)起來(lái)剛好符合上例中神奇的 parking 的行為,于是,我們可以試試用 generator 來(lái)實(shí)現(xiàn)剛剛 Clojure 的 CSP 版本。
go block 其實(shí)就是一個(gè)狀態(tài)機(jī),generator 為狀態(tài)機(jī)的輸入,根據(jù)不同的輸入使得狀態(tài)機(jī)狀態(tài)轉(zhuǎn)移。所以實(shí)現(xiàn) go block 其實(shí)就是:
function go_(machine, step) {
while(!step.done) {
var arr = step.value(),
state = arr[0],
value = arr[1];
switch (state) {
case "park":
setTimeout(function() { go_(machine, step); },0);
return;
case "continue":
step = machine.next(value);
break;
}
}
}
function go(machine) {
var gen = machine();
go_(gen, gen.next());
}
timeout 是一個(gè)類似于 thread sleep 的功能,想讓任務(wù)能等待個(gè)一段時(shí)間再執(zhí)行, 只需要在 go_中加入一個(gè) timeout 的 case 就好了。
...
case 'timeout':
setTimeout(function(){ go_(machine, machine.next());}, value);
return;
...
如果狀態(tài)是 timeout,那么等待 value 那么長(zhǎng)的時(shí)間再繼續(xù)運(yùn)行 generator。
另外,當(dāng)然還需要一個(gè)返回 timeout channel 的函數(shù):
function timeout(interval){
var chan = [interval];
chan.name = 'timeout';
return chan;
}
每次使用 timeout 都會(huì)生成一個(gè)新的 channel,但是 channel 內(nèi)只有一個(gè)消息,就是 timeout 的 毫秒數(shù)。
當(dāng) generator 從 channel 中 take 數(shù)據(jù)時(shí)的狀態(tài)轉(zhuǎn)移如下:
function take(chan) {
return function() {
if(chan.name === 'timeout'){
return ['timeout', chan.pop()];
}else if(chan.length === 0) {
return ["park", null];
} else {
var val = chan.pop();
return ["continue", val];
}
};
}
當(dāng) generator 往 channel 中 put 消息
function put(chan, val) {
return function() {
if(chan.length === 0) {
chan.unshift(val);
return ["continue", null];
} else {
return ["park", null];
}
};
}
有了 go block 這個(gè)狀態(tài)機(jī)以及使他狀態(tài)轉(zhuǎn)移表之后,終于可以原原本本的將之前的 clojure 的例子翻譯成 JavaScript 了。
function boss_yelling(){
go(function*(){
for(var i=0;;i++){
yield take(timeout(1000));
yield put(work, "boss say: work "+i);
}
});
}
function wife_texting(){
go(function*(){
for(;;){
yield take(timeout(4000));
yield put(text, "wife say: come home");
}
});
}
function working(){
go(function*(){
for(;;){
var task = yield take(work);
console.log(task, "me working");
}
});
}
function texting(){
go(function*(){
for(;;){
var read = yield take(text);
console.log(read, "me ignoring");
}
});
}
boss_yelling();
wife_texting();
working();
texting();
是不是決定跟 Clojure 的例子非常相似呢?注意每一次 yield 都是操作 go block 這個(gè)狀態(tài)機(jī),因此就這個(gè)例子來(lái)說(shuō),我們可以跟蹤一下它的狀態(tài)轉(zhuǎn)移過(guò)程,這樣可能會(huì)對(duì)這個(gè)簡(jiǎn)單的 go block 狀態(tài)機(jī)有更深得理解。
boss_yelling 這個(gè) go 狀態(tài)機(jī),當(dāng)操作為 take(timeout(1000)) 時(shí),狀態(tài)會(huì)切換到 timeout 這樣狀態(tài)機(jī)會(huì)停一個(gè) 1000 毫秒。wife_texting ,同樣的這個(gè)狀態(tài)機(jī)也會(huì)停 4000秒working ,但是 work channel 中并沒(méi)有任何的消息,所以也進(jìn)入 parking 狀態(tài)。texting 狀態(tài)機(jī)也進(jìn)入 parking 狀態(tài)。直到 1000 毫秒后, boss_yelling timeout
bose_yelling 狀態(tài)機(jī)繼續(xù)運(yùn)行,往 work channel 中放了一條消息。working 狀態(tài)機(jī)得以繼續(xù)運(yùn)行,打印消息。
此時(shí)沒(méi)有別的狀態(tài)機(jī)的狀態(tài)可以變化,又過(guò)了 1000 毫秒, working 還會(huì)繼續(xù)打印,直到第 4000 毫秒, wife_texting timeout,狀態(tài)機(jī)繼續(xù)運(yùn)行,往 text channel 添加了一條消息。這時(shí)狀態(tài)機(jī) texting 的狀態(tài)才從 parking 切到 continue,開(kāi)始打印消息。以此類推,就會(huì)得到這樣的結(jié)果:
"boss say: work 0"
"me working"
"boss say: work 1"
"me working"
"boss say: work 2"
"me working"
"boss say: work 3"
"me working"
"boss say: work 4"
"me working"
"wife say: come home"
"me ignoring"
"boss say: work 5"
"me working"
...
之前的實(shí)驗(yàn)性的代碼只是為了說(shuō)明 CSP 的原理和實(shí)現(xiàn)思路之一,更切合實(shí)際的,我們可以通過(guò)一些庫(kù)來(lái)使用到 Clojure 的 core.async。這里我簡(jiǎn)單的介紹一下我從 ClojureScript 的 core.async 移植過(guò)來(lái)的 conjs。
由于 go block 在 Clojure 中是用 macro 生成狀態(tài)機(jī)來(lái)實(shí)現(xiàn)的,要移植過(guò)來(lái)困難不小,因此這里我只將 core.async 的 channel 移植了過(guò)來(lái),但是是以接受回調(diào)函數(shù)的方式。
const _ = require('con.js');
const {async} = _;
var c1 = async.chan()
var c2 = async.chan()
async.doAlts(function(v) {
console.log(v.get(0)); // => c1
console.log(_.equals(c1, v.get(1))) // => true
},[c1,c2]);
async.put$(c1, 'c1');
async.put$(c2, 'c2');
有意思的是,我順帶實(shí)現(xiàn)了 Promise 版本的 core.async,會(huì)比回調(diào)要稍微更方便一些。
async.alts([c1,c2])
.then((v) => {
console.log(v.get(0)); // => c1
console.log(_.equals(c1, v.get(1))) // => true
})
async.put(c1, 'c1').then(_=>{console.log('put c1 into c1')})
async.put(c2, 'c2').then(_=>{console.log('put c2 into c2')})
雖然把 channel 能移植過(guò)來(lái),但是缺少 macro 原生支持的 JavaScript 似乎對(duì) go block 也無(wú)能為力,除非能有 generator 的支持。
由于在實(shí)踐中我們經(jīng)常會(huì)使用到 babel 來(lái)將 ES6 規(guī)范的代碼編譯成 ES5 的代碼。所以順便可以將 ES7 的開(kāi)關(guān)打開(kāi),這樣我們就可以使用 ES7 規(guī)范中的一個(gè)新特性—— async 函數(shù)。 使用 async 函數(shù)實(shí)現(xiàn)我們之前的例子估計(jì)代碼并不會(huì)有大的變化,讓我們使用 async 函數(shù)和 channel 實(shí)現(xiàn)一下 go 經(jīng)典的乒乓球小例子。
1: let _ = require('con.js');
2: let {async} = _;
3:
4: async function player(name, table) {
5: while (true) {
6: var ball = await table.take();
7: ball.hits += 1;
8: console.log(name + " " + ball.hits);
9: await async.timeout(100).take();
10: table.put(ball);
11: }
12: }
13:
14: (async function () {
15: var table = async.chan();
16:
17: player("ping", table);
18: player("pong", table);
19:
20: await table.put({hits: 0});
21: await async.timeout(1000).take();
22: table.close();
23: })();
當(dāng)把球{hist:0} 放到 table channel 上的時(shí)候,阻塞在第6行take 的 player ping 會(huì)先接到球,player ping 擊完球 100ms 之后,球又回到了 tablechannel。之后 player pong 之間來(lái)回?fù)羟蛑?table 在 1000ms 后被關(guān)閉。
所以我們運(yùn)行代碼后看到的間斷性的 100ms 的打印出:
pong 1
ping 2
pong 3
ping 4
pong 5
ping 6
pong 7
ping 8
pong 9
ping 10
pong 11
ping 12
通過(guò) async/await,結(jié)合 conjs 的 channel, 真正讓我們寫(xiě)出了 Clojure core.async 風(fēng)格的代碼。利用 CSP 異步編程的方式,我們可以用同步的思路,去編寫(xiě)實(shí)際運(yùn)行時(shí)異步的代碼。這樣做不僅讓我們的代碼更好推理,更符合簡(jiǎn)單的命令式思維方式,也更容易 debug 和做異常處理。