今天就跟大家聊聊有關(guān)PHP中怎么實現(xiàn)協(xié)程,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
主要從事網(wǎng)頁設(shè)計、PC網(wǎng)站建設(shè)(電腦版網(wǎng)站建設(shè))、wap網(wǎng)站建設(shè)(手機版網(wǎng)站建設(shè))、響應(yīng)式網(wǎng)站開發(fā)、程序開發(fā)、微網(wǎng)站、微信平臺小程序開發(fā)等,憑借多年來在互聯(lián)網(wǎng)的打拼,我們在互聯(lián)網(wǎng)網(wǎng)站建設(shè)行業(yè)積累了豐富的成都網(wǎng)站設(shè)計、網(wǎng)站制作、網(wǎng)絡(luò)營銷經(jīng)驗,集策劃、開發(fā)、設(shè)計、營銷、管理等多方位專業(yè)化運作于一體,具備承接不同規(guī)模與類型的建設(shè)項目的能力。
多進程/線程
最早的服務(wù)器端程序都是通過多進程、多線程來解決并發(fā)IO的問題。進程模型出現(xiàn)的最早,從Unix 系統(tǒng)誕生就開始有了進程的概念。最早的服務(wù)器端程序一般都是 Accept 一個客戶端連接就創(chuàng)建一個進程,然后子進程進入循環(huán)同步阻塞地與客戶端連接進行交互,收發(fā)處理數(shù)據(jù)。
多線程模式出現(xiàn)要晚一些,線程與進程相比更輕量,而且線程之間共享內(nèi)存堆棧,所以不同的線程之間交互非常容易實現(xiàn)。比如實現(xiàn)一個聊天室,客戶端連接之間可以交互,聊天室中的玩家可以任意的其他人發(fā)消息。用多線程模式實現(xiàn)非常簡單,線程中可以直接向某一個客戶端連接發(fā)送數(shù)據(jù)。而多進程模式就要用到管道、消息隊列、共享內(nèi)存等等統(tǒng)稱進程間通信(IPC)復(fù)雜的技術(shù)才能實現(xiàn)。
最簡單的多進程服務(wù)端模型
$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) or die("Create server failed");while(1) { $conn = stream_socket_accept($serv);if (pcntl_fork() == 0) { $request = fread($conn);// do something // $response = "hello world"; fwrite($response); fclose($conn);exit(0); } }
多進程/線程模型的流程是:
創(chuàng)建一個 socket
,綁定服務(wù)器端口(bind
),監(jiān)聽端口(listen
),在 PHP 中用 stream_socket_server
一個函數(shù)就能完成上面 3 個步驟,當(dāng)然也可以使用更底層的sockets
擴展分別實現(xiàn)。
進入 while
循環(huán),阻塞在 accept
操作上,等待客戶端連接進入。此時程序會進入睡眠狀態(tài),直到有新的客戶端發(fā)起 connect
到服務(wù)器,操作系統(tǒng)會喚醒此進程。accept
函數(shù)返回客戶端連接的 socket
主進程在多進程模型下通過 fork
(php: pcntl_fork)創(chuàng)建子進程,多線程模型下使用 pthread_create
(php: new Thread)創(chuàng)建子線程。
下文如無特殊聲明將使用進程同時表示進程/線程。
子進程創(chuàng)建成功后進入 while
循環(huán),阻塞在 recv
(php:fread)調(diào)用上,等待客戶端向服務(wù)器發(fā)送數(shù)據(jù)。收到數(shù)據(jù)后服務(wù)器程序進行處理然后使用 send
(php: fwrite)向客戶端發(fā)送響應(yīng)。長連接的服務(wù)會持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會 close
。
當(dāng)客戶端連接關(guān)閉時,子進程退出并銷毀所有資源,主進程會回收掉此子進程。
這種模式***的問題是,進程創(chuàng)建和銷毀的開銷很大。所以上面的模式?jīng)]辦法應(yīng)用于非常繁忙的服務(wù)器程序。對應(yīng)的改進版解決了此問題,這就是經(jīng)典的 Leader-Follower
模型。
$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) or die("Create server failed");for($i = 0; $i < 32; $i++) {if (pcntl_fork() == 0) {while(1) { $conn = stream_socket_accept($serv);if ($conn == false) continue;// do something$request = fread($conn);// $response = "hello world";fwrite($response); fclose($conn); }exit(0); } }
它的特點是程序啟動后就會創(chuàng)建 N 個進程。每個子進程進入 Accept
,等待新的連接進入。當(dāng)客戶端連接到服務(wù)器時,其中一個子進程會被喚醒,開始處理客戶端請求,并且不再接受新的 TCP 連接。當(dāng)此連接關(guān)閉時,子進程會釋放,重新進入 Accept
,參與處理新的連接。
這個模型的優(yōu)勢是完全可以復(fù)用進程,沒有額外消耗,性能非常好。很多常見的服務(wù)器程序都是基于此模型的,比如 Apache、PHP-FPM。
多進程模型也有一些缺點。
這種模型嚴(yán)重依賴進程的數(shù)量解決并發(fā)問題,一個客戶端連接就需要占用一個進程,工作進程的數(shù)量有多少,并發(fā)處理能力就有多少。操作系統(tǒng)可以創(chuàng)建的進程數(shù)量是有限的。
啟動大量進程會帶來額外的進程調(diào)度消耗。數(shù)百個進程時可能進程上下文切換調(diào)度消耗占 CPU 不到 1% 可以忽略不計,如果啟動數(shù)千甚至數(shù)萬個進程,消耗就會直線上升。調(diào)度消耗可能占到 CPU 的百分之幾十甚至 100%。
談到多進程以及類似同時執(zhí)行多個任務(wù)的模型,就不得不先談?wù)劜⑿泻筒l(fā)。
是指能處理多個同時活動的能力,并發(fā)事件之間不一定要同一時刻發(fā)生。
是指同時刻發(fā)生的兩個并發(fā)事件,具有并發(fā)的含義,但并發(fā)不一定并行。
『并發(fā)』指的是程序的結(jié)構(gòu),『并行』指的是程序運行時的狀態(tài)
『并行』一定是并發(fā)的,『并行』是『并發(fā)』設(shè)計的一種
單線程永遠無法達到『并行』狀態(tài)
正確的并發(fā)設(shè)計的標(biāo)準(zhǔn)是:
使多個操作可以在重疊的時間段內(nèi)進行。
two tasks can start, run, and complete in overlapping time periods
參考:
http://www.vaikan.com/docs/Concurrency-is-not-Parallelism
https://talks.golang.org/2012/waza.slide
在了解 PHP 協(xié)程前,還有 迭代器 和 生成器 這兩個概念需要先認(rèn)識一下。
PHP5 開始內(nèi)置了 Iterator
即迭代器接口,所以如果你定義了一個類,并實現(xiàn)了Iterator
接口,那么你的這個類對象就是 ZEND_ITER_OBJECT
即可迭代的,否則就是 ZEND_ITER_PLAIN_OBJECT
。
對于 ZEND_ITER_PLAIN_OBJECT
的類,foreach
會獲取該對象的默認(rèn)屬性數(shù)組,然后對該數(shù)組進行迭代。
而對于 ZEND_ITER_OBJECT
的類對象,則會通過調(diào)用對象實現(xiàn)的 Iterator
接口相關(guān)函數(shù)來進行迭代。
任何實現(xiàn)了 Iterator
接口的類都是可迭代的,即都可以用 foreach
語句來遍歷。
interface Iterator extends Traversable {// 獲取當(dāng)前內(nèi)部標(biāo)量指向的元素的數(shù)據(jù)public mixed current() // 獲取當(dāng)前標(biāo)量 public scalar key() // 移動到下一個標(biāo)量 public void next() // 重置標(biāo)量 public void rewind() // 檢查當(dāng)前標(biāo)量是否有效 public boolean valid() }
PHP 自帶的 range 函數(shù)原型:
range — 根據(jù)范圍創(chuàng)建數(shù)組,包含指定的元素
array range (mixed $start , mixed $end [, number $step = 1 ])
建立一個包含指定范圍單元的數(shù)組。
在不使用迭代器的情況要實現(xiàn)一個和 PHP 自帶的 range
函數(shù)類似的功能,可能會這么寫:
function range ($start, $end, $step = 1) { $ret = [];for ($i = $start; $i <= $end; $i += $step) { $ret[] = $i; }return $ret; }
需要將生成的所有元素放在內(nèi)存數(shù)組中,如果需要生成一個非常大的集合,則會占用巨大的內(nèi)存。
來看看迭代實現(xiàn)的 range
,我們叫做 xrange
,他實現(xiàn)了 Iterator
接口必須的 5 個方法:
class Xrange implements Iterator {protected $start;protected $limit;protected $step;protected $current;public function __construct($start, $limit, $step = 1) {$this->start = $start;$this->limit = $limit;$this->step = $step; }public function rewind() {$this->current = $this->start; }public function next() {$this->current += $this->step; }public function current() {return $this->current; }public function key() {return $this->current + 1; }public function valid() {return $this->current <= $this->limit; } }
使用時代碼如下:
foreach (new Xrange(0, 9) as $key => $val) {echo $key, ' ', $val, "\n"; }
輸出:
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
看上去功能和 range()
函數(shù)所做的一致,不同點在于迭代的是一個 對象(Object)
而不是數(shù)組:
var_dump(new Xrange(0, 9));
輸出:
object(Xrange)#1 (4) { ["start":protected]=> int(0) ["limit":protected]=> int(9) ["step":protected]=> int(1) ["current":protected]=> NULL}
另外,內(nèi)存的占用情況也完全不同:
// range$startMemory = memory_get_usage(); $arr = range(0, 500000);echo 'range(): ', memory_get_usage() - $startMemory, " bytes\n";unset($arr);// xrange$startMemory = memory_get_usage(); $arr = new Xrange(0, 500000);echo 'xrange(): ', memory_get_usage() - $startMemory, " bytes\n";
輸出:
xrange(): 624 bytesrange(): 72194784 bytes
range()
函數(shù)在執(zhí)行后占用了 50W 個元素內(nèi)存空間,而 xrange
對象在整個迭代過程中只占用一個對象的內(nèi)存。
在喜聞樂見的各種 PHP 框架里有不少生成器的實例,比如 Yii2 中用來構(gòu)建 SQL 語句的 \yii\db\Query
類:
$query = (new \yii\db\Query)->from('user');// yii\db\BatchQueryResultforeach ($query->batch() as $users) {// 每次循環(huán)得到多條 user 記錄}
來看一下 batch()
做了什么:
/** * Starts a batch query. * * A batch query supports fetching data in batches, which can keep the memory usage under a limit. * This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface * and can be traversed to retrieve the data in batches. * * For example, * * * $query = (new Query)->from('user'); * foreach ($query->batch() as $rows) { * // $rows is an array of 10 or fewer rows from user table * } * * * @param integer $batchSize the number of records to be fetched in each batch. * @param Connection $db the database connection. If not set, the "db" application component will be used. * @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface * and can be traversed to retrieve the data in batches. */public function batch($batchSize = 100, $db = null) { return Yii::createObject([ 'class' => BatchQueryResult::className(), 'query' => $this, 'batchSize' => $batchSize, 'db' => $db, 'each' => false, ]); }
實際上返回了一個 BatchQueryResult
類,類的源碼實現(xiàn)了 Iterator
接口 5 個關(guān)鍵方法:
class BatchQueryResult extends Object implements \Iterator {public $db;public $query;public $batchSize = 100;public $each = false;private $_dataReader;private $_batch;private $_value;private $_key;/** * Destructor. */public function __destruct() {// make sure cursor is closed$this->reset(); }/** * Resets the batch query. * This method will clean up the existing batch query so that a new batch query can be performed. */public function reset() {if ($this->_dataReader !== null) {$this->_dataReader->close(); }$this->_dataReader = null;$this->_batch = null;$this->_value = null;$this->_key = null; }/** * Resets the iterator to the initial state. * This method is required by the interface [[\Iterator]]. */public function rewind() {$this->reset();$this->next(); }/** * Moves the internal pointer to the next dataset. * This method is required by the interface [[\Iterator]]. */public function next() {if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {$this->_batch = $this->fetchData(); reset($this->_batch); }if ($this->each) {$this->_value = current($this->_batch);if ($this->query->indexBy !== null) {$this->_key = key($this->_batch); } elseif (key($this->_batch) !== null) {$this->_key++; } else {$this->_key = null; } } else {$this->_value = $this->_batch;$this->_key = $this->_key === null ? 0 : $this->_key + 1; } }/** * Fetches the next batch of data. * @return array the data fetched */protected function fetchData() {// ...}/** * Returns the index of the current dataset. * This method is required by the interface [[\Iterator]]. * @return integer the index of the current row. */public function key() {return $this->_key; }/** * Returns the current dataset. * This method is required by the interface [[\Iterator]]. * @return mixed the current dataset. */public function current() {return $this->_value; }/** * Returns whether there is a valid dataset at the current position. * This method is required by the interface [[\Iterator]]. * @return boolean whether there is a valid dataset at the current position. */public function valid() {return !empty($this->_batch); } }
以迭代器的方式實現(xiàn)了類似分頁取的效果,同時避免了一次性取出所有數(shù)據(jù)占用太多的內(nèi)存空間。
使用返回迭代器的包或庫時(如 PHP5 中的 SPL 迭代器)
無法在一次調(diào)用獲取所需的所有元素時
要處理數(shù)量巨大的元素時(數(shù)據(jù)庫中要處理的結(jié)果集內(nèi)容超過內(nèi)存)
…
需要 PHP 5 >= 5.5.0 或 PHP 7
雖然迭代器僅需繼承接口即可實現(xiàn),但畢竟需要定義一整個類然后實現(xiàn)接口的所有方法,實在是不怎么方便。
生成器則提供了一種更簡單的方式來實現(xiàn)簡單的對象迭代,相比定義類來實現(xiàn)
Iterator
接口的方式,性能開銷和復(fù)雜度大大降低。
生成器允許在 foreach
代碼塊中迭代一組數(shù)據(jù)而不需要創(chuàng)建任何數(shù)組。一個生成器函數(shù),就像一個普通的有返回值的自定義函數(shù)類似,但普通函數(shù)只返回一次, 而生成器可以根據(jù)需要通過 yield
關(guān)鍵字返回多次,以便連續(xù)生成需要迭代返回的值。
一個最簡單的例子就是使用生成器來重新實現(xiàn) xrange()
函數(shù)。效果和上面我們用迭代器實現(xiàn)的差不多,但實現(xiàn)起來要簡單的多。
xrange
函數(shù)function xrange($start, $limit, $step = 1) {for ($i = 0; $i < $limit; $i += $step) { yield $i + 1 => $i; } }foreach (xrange(0, 9) as $key => $val) { printf("%d %d \n", $key, $val); }// 輸出// 1 0// 2 1// 3 2// 4 3// 5 4// 6 5// 7 6// 8 7// 9 8
實際上生成器生成的正是一個迭代器對象實例,該迭代器對象繼承了 Iterator
接口,同時也包含了生成器對象自有的接口,具體可以參考 Generator 類的定義以及語法參考。
同時需要注意的是:
一個生成器不可以返回值,這樣做會產(chǎn)生一個編譯錯誤。然而 return 空是一個有效的語法并且它將會終止生成器繼續(xù)執(zhí)行。
需要注意的是 yield
關(guān)鍵字,這是生成器的關(guān)鍵。通過上面的例子可以看出,yield
會將當(dāng)前產(chǎn)生的值傳遞給 foreach
,換句話說,foreach
每一次迭代過程都會從 yield
處取一個值,直到整個遍歷過程不再能執(zhí)行到 yield
時遍歷結(jié)束,此時生成器函數(shù)簡單的退出,而調(diào)用生成器的上層代碼還可以繼續(xù)執(zhí)行,就像一個數(shù)組已經(jīng)被遍歷完了。
yield
最簡單的調(diào)用形式看起來像一個 return
申明,不同的是 yield
暫停當(dāng)前過程的執(zhí)行并返回值,而 return
是中斷當(dāng)前過程并返回值。暫停當(dāng)前過程,意味著將處理權(quán)轉(zhuǎn)交由上一級繼續(xù)進行,直到上一級再次調(diào)用被暫停的過程,該過程又會從上一次暫停的位置繼續(xù)執(zhí)行。這像是什么呢?如果之前已經(jīng)在鳥哥的文章中粗略看過,應(yīng)該知道這很像操作系統(tǒng)的進程調(diào)度,多個進程在一個 CPU 核心上執(zhí)行,在系統(tǒng)調(diào)度下每一個進程執(zhí)行一段指令就被暫停,切換到下一個進程,這樣外部用戶看起來就像是同時在執(zhí)行多個任務(wù)。
但僅僅如此還不夠,yield
除了可以返回值以外,還能接收值,也就是可以在兩個層級間實現(xiàn)雙向通信。
來看看如何傳遞一個值給 yield
:
function printer() {while (true) { printf("receive: %s\n", yield); } } $printer = printer(); $printer->send('hello'); $printer->send('world');// 輸出receive: hello receive: world
根據(jù) PHP 官方文檔的描述可以知道 Generator
對象除了實現(xiàn) Iterator
接口中的必要方法以外,還有一個 send
方法,這個方法就是向 yield
語句處傳遞一個值,同時從 yield
語句處繼續(xù)執(zhí)行,直至再次遇到 yield
后控制權(quán)回到外部。
既然 yield
可以在其位置中斷并返回或者接收一個值,那能不能同時進行接收和返回呢?當(dāng)然,這也是實現(xiàn)協(xié)程的根本。對上述代碼做出修改:
function printer() { $i = 0;while (true) { printf("receive: %s\n", (yield ++$i)); } } $printer = printer(); printf("%d\n", $printer->current()); $printer->send('hello'); printf("%d\n", $printer->current()); $printer->send('world'); printf("%d\n", $printer->current());// 輸出1receive: hello2receive: world3
這是另一個例子:
function gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret); } $gen = gen(); var_dump($gen->current()); // string(6) "yield1"var_dump($gen->send('ret1')); // string(4) "ret1" (***個 var_dump) // string(6) "yield2" (繼續(xù)執(zhí)行到第二個 yield,吐出了返回值)var_dump($gen->send('ret2')); // string(4) "ret2" (第二個 var_dump) // NULL (var_dump 之后沒有其他語句,所以這次 ->send() 的返回值為 null)
current
方法是迭代器 Iterator
接口必要的方法,foreach
語句每一次迭代都會通過其獲取當(dāng)前值,而后調(diào)用迭代器的 next
方法。在上述例子里則是手動調(diào)用了 current
方法獲取值。
上述例子已經(jīng)足以表示 yield 能夠作為實現(xiàn)雙向通信的工具,也就是具備了后續(xù)實現(xiàn)協(xié)程的基本條件。
上面的例子如果***次接觸并稍加思考,不免會疑惑為什么一個 yield
既是語句又是表達式,而且這兩種情況還同時存在:
對于所有在生成器函數(shù)中出現(xiàn)的 yield
,首先它都是語句,而跟在 yield
后面的任何表達式的值將作為調(diào)用生成器函數(shù)的返回值,如果 yield
后面沒有任何表達式(變量、常量都是表達式),那么它會返回 NULL
,這一點和 return
語句一致。
yield
也是表達式,它的值就是 send
函數(shù)傳過來的值(相當(dāng)于一個特殊變量,只不過賦值是通過 send
函數(shù)進行的)。只要調(diào)用send方法,并且生成器對象的迭代并未終結(jié),那么當(dāng)前位置的 yield
就會得到 send
方法傳遞過來的值,這和生成器函數(shù)有沒有把這個值賦值給某個變量沒有任何關(guān)系。
這個地方可能需要仔細品味上面兩個 send()
方法的例子才能理解。但可以簡單的記?。?/p>
任何時候 yield 關(guān)鍵詞即是語句:可以為生成器函數(shù)返回值;也是表達式:可以接收生成器對象發(fā)過來的值。
除了 send()
方法,還有一種控制生成器執(zhí)行的方法是 next()
函數(shù):
Next()
,恢復(fù)生成器函數(shù)的執(zhí)行直到下一個 yield
Send()
,向生成器傳入一個值,恢復(fù)執(zhí)行直到下一個 yield
對于單核處理器,多進程實現(xiàn)多任務(wù)的原理是讓操作系統(tǒng)給一個任務(wù)每次分配一定的 CPU 時間片,然后中斷、讓下一個任務(wù)執(zhí)行一定的時間片接著再中斷并繼續(xù)執(zhí)行下一個,如此反復(fù)。由于切換執(zhí)行任務(wù)的速度非常快,給外部用戶的感受就是多個任務(wù)的執(zhí)行是同時進行的。
多進程的調(diào)度是由操作系統(tǒng)來實現(xiàn)的,進程自身不能控制自己何時被調(diào)度,也就是說:
進程的調(diào)度是由外層調(diào)度器搶占式實現(xiàn)的
而協(xié)程要求當(dāng)前正在運行的任務(wù)自動把控制權(quán)回傳給調(diào)度器,這樣就可以繼續(xù)運行其他任務(wù)。這與『搶占式』的多任務(wù)正好相反, 搶占多任務(wù)的調(diào)度器可以強制中斷正在運行的任務(wù), 不管它自己有沒有意愿。『協(xié)作式多任務(wù)』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不過它們后來都切換到『搶占式多任務(wù)』了。理由相當(dāng)明確:如果僅依靠程序自動交出控制的話,那么一些惡意程序?qū)苋菀渍加萌?CPU 時間而不與其他任務(wù)共享。
協(xié)程的調(diào)度是由協(xié)程自身主動讓出控制權(quán)到外層調(diào)度器實現(xiàn)的
回到剛才生成器實現(xiàn) xrange
函數(shù)的例子,整個執(zhí)行過程的交替可以用下圖來表示:
協(xié)程可以理解為純用戶態(tài)的線程,通過協(xié)作而不是搶占來進行任務(wù)切換。相對于進程或者線程,協(xié)程所有的操作都可以在用戶態(tài)而非操作系統(tǒng)內(nèi)核態(tài)完成,創(chuàng)建和切換的消耗非常低。
簡單的說 Coroutine(協(xié)程) 就是提供一種方法來中斷當(dāng)前任務(wù)的執(zhí)行,保存當(dāng)前的局部變量,下次再過來又可以恢復(fù)當(dāng)前局部變量繼續(xù)執(zhí)行。
我們可以把大任務(wù)拆分成多個小任務(wù)輪流執(zhí)行,如果有某個小任務(wù)在等待系統(tǒng) IO,就跳過它,執(zhí)行下一個小任務(wù),這樣往復(fù)調(diào)度,實現(xiàn)了 IO 操作和 CPU 計算的并行執(zhí)行,總體上就提升了任務(wù)的執(zhí)行效率,這也便是協(xié)程的意義。
PHP 從 5.5 開始支持生成器及 yield
關(guān)鍵字,而 PHP 協(xié)程則由 yield
來實現(xiàn)。
要理解協(xié)程,首先要理解:代碼是代碼,函數(shù)是函數(shù)。函數(shù)包裹的代碼賦予了這段代碼附加的意義:不管是否顯式的指明返回值,當(dāng)函數(shù)內(nèi)的代碼塊執(zhí)行完后都會返回到調(diào)用層。而當(dāng)調(diào)用層調(diào)用某個函數(shù)的時候,必須等這個函數(shù)返回,當(dāng)前函數(shù)才能繼續(xù)執(zhí)行,這就構(gòu)成了后進先出,也就是 Stack
。
而協(xié)程包裹的代碼,不是函數(shù),不完全遵守函數(shù)的附加意義,協(xié)程執(zhí)行到某個點,協(xié)會協(xié)程會 yield
返回一個值然后掛起,而不是 return
一個值然后結(jié)束,當(dāng)再次調(diào)用協(xié)程的時候,會在上次 yield
的點繼續(xù)執(zhí)行。
所以協(xié)程違背了通常操作系統(tǒng)和 x86 的 CPU 認(rèn)定的代碼執(zhí)行方式,也就是 Stack
的這種執(zhí)行方式,需要運行環(huán)境(比如 php,python 的 yield 和 golang 的 goroutine)自己調(diào)度,來實現(xiàn)任務(wù)的中斷和恢復(fù),具體到 PHP,就是靠 yield
來實現(xiàn)。
堆棧式調(diào)用 和 協(xié)程調(diào)用的對比:
結(jié)合之前的例子,可以總結(jié)一下 yield
能做的就是:
實現(xiàn)不同任務(wù)間的主動讓位、讓行,把控制權(quán)交回給任務(wù)調(diào)度器。
通過 send()
實現(xiàn)不同任務(wù)間的雙向通信,也就可以實現(xiàn)任務(wù)和調(diào)度器之間的通信。
yield
就是 PHP 實現(xiàn)協(xié)程的方式。
下面是雄文 Cooperative multitasking using coroutines (in PHP!) 里一個簡單但完整的例子,來展示如何具體的在 PHP 里實現(xiàn)協(xié)程任務(wù)的調(diào)度。
首先是一個任務(wù)類:
Task
class Task {// 任務(wù) IDprotected $taskId;// 協(xié)程對象protected $coroutine;// send() 值protected $sendVal = null;// 是否*** yieldprotected $beforeFirstYield = true;public function __construct($taskId, Generator $coroutine) {$this->taskId = $taskId;$this->coroutine = $coroutine; }public function getTaskId() {return $this->taskId; }public function setSendValue($sendVal) {$this->sendVal = $sendVal; }public function run() {// 如之前提到的在send之前, 當(dāng)?shù)鞅粍?chuàng)建后***次 yield 之前,一個 renwind() 方法會被隱式調(diào)用// 所以實際上發(fā)生的應(yīng)該類似:// $this->coroutine->rewind();// $this->coroutine->send();// 這樣 renwind 的執(zhí)行將會導(dǎo)致***個 yield 被執(zhí)行, 并且忽略了他的返回值.// 真正當(dāng)我們調(diào)用 yield 的時候, 我們得到的是第二個yield的值,導(dǎo)致***個yield的值被忽略。// 所以這個加上一個是否***次 yield 的判斷來避免這個問題if ($this->beforeFirstYield) {$this->beforeFirstYield = false;return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendVal);$this->sendVal = null;return $retval; } }public function isFinished() {return !$this->coroutine->valid(); } }
接下來是調(diào)度器,比 foreach
是要復(fù)雜一點,但好歹也能算個正兒八經(jīng)的 Scheduler
:)
Scheduler
class Scheduler {protected $maxTaskId = 0;protected $taskMap = []; // taskId => taskprotected $taskQueue;public function __construct() {$this->taskQueue = new SplQueue(); }// (使用下一個空閑的任務(wù)id)創(chuàng)建一個新任務(wù),然后把這個任務(wù)放入任務(wù)map數(shù)組里. 接著它通過把任務(wù)放入任務(wù)隊列里來實現(xiàn)對任務(wù)的調(diào)度. 接著run()方法掃描任務(wù)隊列, 運行任務(wù).如果一個任務(wù)結(jié)束了, 那么它將從隊列里刪除, 否則它將在隊列的末尾再次被調(diào)度。public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine);$this->taskMap[$tid] = $task;$this->schedule($task);return $tid; }public function schedule(Task $task) {// 任務(wù)入隊$this->queue->enqueue($task); }public function run() {while (!$this->queue->isEmpty()) {// 任務(wù)出隊$task = $this->queue->dequeue(); $task->run();if ($task->isFinished()) {unset($this->taskMap[$task->getTaskId()]); } else {$this->schedule($task); } } } }
隊列可以使每個任務(wù)獲得同等的 CPU 使用時間,
Demo
function task1() {for ($i = 1; $i <= 10; ++$i) {echo "This is task 1 iteration $i.\n";yield; } }function task2() {for ($i = 1; $i <= 5; ++$i) {echo "This is task 2 iteration $i.\n";yield; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run();
輸出:
This is task 1 iteration 1.This is task 2 iteration 1.This is task 1 iteration 2.This is task 2 iteration 2.This is task 1 iteration 3.This is task 2 iteration 3.This is task 1 iteration 4.This is task 2 iteration 4.This is task 1 iteration 5.This is task 2 iteration 5.This is task 1 iteration 6.This is task 1 iteration 7.This is task 1 iteration 8.This is task 1 iteration 9.This is task 1 iteration 10.
結(jié)果正是我們期待的,最初的 5 次迭代,兩個任務(wù)是交替進行的,而在第二個任務(wù)結(jié)束后,只有***個任務(wù)繼續(xù)執(zhí)行到結(jié)束。
若想真正的發(fā)揮出協(xié)程的作用,那一定是在一些涉及到阻塞 IO 的場景,我們都知道 Web 服務(wù)器最耗時的部分通常都是 socket 讀取數(shù)據(jù)等操作上,如果進程對每個請求都掛起的等待 IO 操作,那處理效率就太低了,接下來我們看個支持非阻塞 IO 的 Scheduler:
taskprotected $queue;// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function __construct() {// SPL 隊列$this->queue = new SplQueue(); }public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine);$this->tasks[$tid] = $task;$this->schedule($task);return $tid; }public function schedule(Task $task) {// 任務(wù)入隊$this->queue->enqueue($task); }public function run() {while (!$this->queue->isEmpty()) {// 任務(wù)出隊$task = $this->queue->dequeue(); $task->run();if ($task->isFinished()) {unset($this->tasks[$task->getTaskId()]); } else {$this->schedule($task); } } }public function waitForRead($socket, Task $task) {if (isset($this->waitingForRead[(int)$socket])) {$this->waitingForRead[(int)$socket][1][] = $task; } else {$this->waitingForRead[(int)$socket] = [$socket, [$task]]; } }public function waitForWrite($socket, Task $task) {if (isset($this->waitingForWrite[(int)$socket])) {$this->waitingForWrite[(int)$socket][1][] = $task; } else {$this->waitingForWrite[(int)$socket] = [$socket, [$task]]; } }/** * @param $timeout 0 represent */protected function ioPoll($timeout) { $rSocks = [];foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = [];foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = [];// $timeout 為 0 時, stream_select 為立即返回,為 null 時則會阻塞的等,見 http://php.net/manual/zh/function.stream-select.phpif (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {return; }foreach ($rSocks as $socket) {list(, $tasks) = $this->waitingForRead[(int)$socket];unset($this->waitingForRead[(int)$socket]);foreach ($tasks as $task) {$this->schedule($task); } }foreach ($wSocks as $socket) {list(, $tasks) = $this->waitingForWrite[(int)$socket];unset($this->waitingForWrite[(int)$socket]);foreach ($tasks as $task) {$this->schedule($task); } } }/** * 檢查隊列是否為空,若為空則掛起的執(zhí)行 stream_select,否則檢查完 IO 狀態(tài)立即返回,詳見 ioPoll() * 作為任務(wù)加入隊列后,由于 while true,會被一直重復(fù)的加入任務(wù)隊列,實現(xiàn)每次任務(wù)前檢查 IO 狀態(tài) * @return Generator object for newTask * */protected function ioPollTask() {while (true) {if ($this->taskQueue->isEmpty()) {$this->ioPoll(null); } else {$this->ioPoll(0); }yield; } }/** * $scheduler = new Scheduler; * $scheduler->newTask(Web Server Generator); * $scheduler->withIoPoll()->run(); * * 新建 Web Server 任務(wù)后先執(zhí)行 withIoPoll() 將 ioPollTask() 作為任務(wù)入隊 * * @return $this */public function withIoPoll() {$this->newTask($this->ioPollTask());return $this; } }
這個版本的 Scheduler 里加入一個永不退出的任務(wù),并且通過 stream_select
支持的特性來實現(xiàn)快速的來回檢查各個任務(wù)的 IO 狀態(tài),只有 IO 完成的任務(wù)才會繼續(xù)執(zhí)行,而 IO 還未完成的任務(wù)則會跳過,完整的代碼和例子可以戳這里。
也就是說任務(wù)交替執(zhí)行的過程中,一旦遇到需要 IO 的部分,調(diào)度器就會把 CPU 時間分配給不需要 IO 的任務(wù),等到當(dāng)前任務(wù)遇到 IO 或者之前的任務(wù) IO 結(jié)束才再次調(diào)度 CPU 時間,以此實現(xiàn) CPU 和 IO 并行來提升執(zhí)行效率,類似下圖:
如果想將一個單進程任務(wù)改造成并發(fā)執(zhí)行,我們可以選擇改造成多進程或者協(xié)程:
多進程,不改變?nèi)蝿?wù)執(zhí)行的整體過程,在一個時間段內(nèi)同時執(zhí)行多個相同的代碼段,調(diào)度權(quán)在 CPU,如果一個任務(wù)能獨占一個 CPU 則可以實現(xiàn)并行。
協(xié)程,把原有任務(wù)拆分成多個小任務(wù),原有任務(wù)的執(zhí)行流程被改變,調(diào)度權(quán)在進程自己,如果有 IO 并且可以實現(xiàn)異步,則可以實現(xiàn)并行。
多進程改造
協(xié)程改造
PHP 的協(xié)程或者其他語言中,比如 Python、Lua 等都有協(xié)程的概念,和 Go 協(xié)程有些相似,不過有兩點不同:
Go 協(xié)程意味著并行(或者可以以并行的方式部署,可以用 runtime.GOMAXPROCS()
指定可同時使用的 CPU 個數(shù)),協(xié)程一般來說只是并發(fā)。
Go 協(xié)程通過通道 channel
來通信;協(xié)程通過 yield
讓出和恢復(fù)操作來通信。
Go 協(xié)程比普通協(xié)程更強大,也很容易從協(xié)程的邏輯復(fù)用到 Go 協(xié)程,而且在 Go 的開發(fā)中也使用的極為普遍,有興趣的話可以了解一下作為對比。
看完上述內(nèi)容,你們對PHP中怎么實現(xiàn)協(xié)程有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。