小編給大家分享一下php中pthreads怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)建站主要從事成都做網(wǎng)站、網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)隴縣,10多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
php pthreads的使用方法:1、通過“pecl install pthreads”安裝pthreads;2、在需要控制多個線程同一時刻只能有一個線程工作的情況下使用互斥鎖。
本文操作環(huán)境:windows7系統(tǒng)、php7.0.2版,DELL G3電腦
php多線程pthreads的安裝與使用
安裝Pthreads 基本上需要重新編譯PHP,加上 --enable-maintainer-zts 參數(shù),但是用這個文檔很少;bug會很多很有很多意想不到的問題,生成環(huán)境上只能呵呵了,所以這個東西玩玩就算了,真正多線程還是用Python、C等等
以下代碼大部分來自網(wǎng)絡(luò)
這里使用的是 php-7.0.2
./configure \ --prefix=/usr/local/php7 \ --with-config-file-path=/etc \ --with-config-file-scan-dir=/etc/php.d \ --enable-debug \ --enable-maintainer-zts \ --enable-pcntl \ --enable-fpm \ --enable-opcache \ --enable-embed=shared \ --enable-json=shared \ --enable-phpdbg \ --with-curl=shared \ --with-MySQL=/usr/local/mysql \ --with-mysqli=/usr/local/mysql/bin/mysql_config \ --with-pdo-mysql make && make install
安裝pthreads
pecl install pthreads
getThreadId()}\n"; } }; $thread->start() && $thread->join(); #2 class workerThread extends Thread { public function __construct($i){ $this->i=$i; } public function run(){ while(true){ echo $this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=new workerThread($i); $workers[$i]->start(); } ?>
Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.
sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } public function run(){ self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $sql1 = new SQLQuery('select * from test order by id desc limit 1,5'); $worker->stack($sql1); $sql2 = new SQLQuery('select * from test order by id desc limit 5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); ?>
什么情況下會用到互斥鎖?在你需要控制多個線程同一時刻只能有一個線程工作的情況下可以使用。一個簡單的計(jì)數(shù)器程序,說明有無互斥鎖情況下的不同
mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //沒有互斥鎖 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥鎖 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
在共享內(nèi)存的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作內(nèi)存操作本身具備鎖的功能
shmid = $shmid; } public function run() { $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
有些場景我們不希望 thread->start() 就開始運(yùn)行程序,而是希望線程等待我們的命令。$thread->wait();測作用是 thread->start()后線程并不會立即運(yùn)行,只有收到 $thread->notify(); 發(fā)出的信號后才運(yùn)行
shmid = $shmid; } public function run() { $this->synchronized(function($thread){ $thread->wait(); }, $this); $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
一個Pool類
row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //壓入任務(wù)到池中 break; }else{ //如果池已經(jīng)滿,就開始啟動線程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null; } catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系統(tǒng)錯誤', $e->getMessage(), "\n"; } ?>
上面的例子是當(dāng)線程池滿后執(zhí)行start統(tǒng)一啟動,下面的例子是只要線程池中有空閑便立即創(chuàng)建新線程。
row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null; } catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系統(tǒng)錯誤】', $e->getMessage(), "\n"; } ?>
logger = $logger; } protected $loger; } class WebWork extends Stackable { public function isComplete() { return $this->complete; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete; } class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } } } $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->shutdown(); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool);
LOCK_SH 取得共享鎖定(讀取的程序)
LOCK_EX 取得獨(dú)占鎖定(寫入的程序
LOCK_UN 釋放鎖定(無論共享或獨(dú)占)
LOCK_NB 如果不希望 flock() 在鎖定時堵塞
pthreads 與 pdo 同時使用是,需要注意一點(diǎn),需要靜態(tài)聲明public static $dbh;并且通過單例模式訪問數(shù)據(jù)庫連接。
worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>
在線程池中鏈接數(shù)據(jù)庫
# cat pool.php logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 數(shù)據(jù)庫服務(wù)器 $dbuser = 'example.com'; // 數(shù)據(jù)庫用戶名 $dbpw = 'password'; // 數(shù)據(jù)庫密碼 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt4_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 數(shù)據(jù)庫服務(wù)器 $dbuser = 'example.com'; // 數(shù)據(jù)庫用戶名 $dbpw = 'password'; // 數(shù)據(jù)庫密碼 $dbname = 'example_real'; // 數(shù)據(jù)庫名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(200, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 數(shù)據(jù)庫服務(wù)器 $dbuser = 'example.com'; // 數(shù)據(jù)庫用戶名 $dbpw = 'password'; // 數(shù)據(jù)庫密碼 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); ?>
進(jìn)一步改進(jìn)上面程序,我們使用單例模式 $this->worker->getInstance(); 全局僅僅做一次數(shù)據(jù)庫連接,線程使用共享的數(shù)據(jù)庫連接
logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = 'db.example.com'; // 數(shù)據(jù)庫服務(wù)器 $dbuser = 'example.com'; // 數(shù)據(jù)庫用戶名 $dbpw = 'password'; // 數(shù)據(jù)庫密碼 $dbname = 'example'; // 數(shù)據(jù)庫名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > 11){ $mobile = substr($mobile, -11); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); } } $pool = new Pool(100, \ExampleWorker::class, []); #foreach (range(0, 100) as $number) { # $pool->submit(new Work($number)); #} $dbhost = 'db.example.com'; // 數(shù)據(jù)庫服務(wù)器 $dbuser = 'example.com'; // 數(shù)據(jù)庫用戶名 $dbpw = 'password'; // 數(shù)據(jù)庫密碼 $dbname = 'example'; $dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); #print_r($dbh); #$sql = "select id, mobile from members where id < :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':id',300); #$sth->execute(); #$result = $sth->fetchAll(); #print_r($result); # #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':mobile', 'aa'); #$sth->bindValue(':id','272'); #echo $sth->execute(); #echo $sth->queryString; #echo $sth->debugDumpParams(); $sql = "select id, mobile from members order by id asc"; // limit 1000"; $row = $dbh->query($sql); while($members = $row->fetch(PDO::FETCH_ASSOC)) { #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']); } $pool->shutdown(); ?>
總的來說 pthreads 仍然處在發(fā)展中,仍有一些不足的地方,我們也可以看到pthreads的git在不斷改進(jìn)這個項(xiàng)目
數(shù)據(jù)庫持久鏈接很重要,否則每個線程都會開啟一次數(shù)據(jù)庫連接,然后關(guān)閉,會導(dǎo)致很多鏈接超時。
true )); ?>
以上是“php中pthreads怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!