真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

php中pthreads怎么用

小編給大家分享一下php中pthreads怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)建站主要從事成都做網(wǎng)站、網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)隴縣,10多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575

php pthreads的使用方法:1、通過“pecl install pthreads”安裝pthreads;2、在需要控制多個線程同一時刻只能有一個線程工作的情況下使用互斥鎖。

本文操作環(huán)境:windows7系統(tǒng)、php7.0.2版,DELL G3電腦

php多線程pthreads的安裝與使用

安裝Pthreads 基本上需要重新編譯PHP,加上 --enable-maintainer-zts 參數(shù),但是用這個文檔很少;bug會很多很有很多意想不到的問題,生成環(huán)境上只能呵呵了,所以這個東西玩玩就算了,真正多線程還是用Python、C等等

以下代碼大部分來自網(wǎng)絡(luò)

一、安裝

這里使用的是 php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-MySQL=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

安裝pthreads

pecl install pthreads

二、Thread

getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

三、 Worker 與 Stackable

Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');
$worker->stack($sql1);

$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>

四、 互斥鎖

什么情況下會用到互斥鎖?在你需要控制多個線程同一時刻只能有一個線程工作的情況下可以使用。一個簡單的計(jì)數(shù)器程序,說明有無互斥鎖情況下的不同

mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//沒有互斥鎖
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥鎖
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>
多線程與共享內(nèi)存

在共享內(nèi)存的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作內(nèi)存操作本身具備鎖的功能

shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

五、 線程同步

有些場景我們不希望 thread->start() 就開始運(yùn)行程序,而是希望線程等待我們的命令。$thread->wait();測作用是 thread->start()后線程并不會立即運(yùn)行,只有收到 $thread->notify(); 發(fā)出的信號后才運(yùn)行

shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

六、線程池

一個Pool類

row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //壓入任務(wù)到池中
				break;
			}else{ //如果池已經(jīng)滿,就開始啟動線程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系統(tǒng)錯誤', $e->getMessage(), "\n";
}
?>
動態(tài)隊(duì)列線程池

上面的例子是當(dāng)線程池滿后執(zhí)行start統(tǒng)一啟動,下面的例子是只要線程池中有空閑便立即創(chuàng)建新線程。

row = $row;
		$this->sql = null;
		//print_r($this->row);
	}

	public function run() {

		if(strlen($this->row['bankno']) > 100 ){
			$bankno = safenet_decrypt($this->row['bankno']);
		}else{
			$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
			file_put_contents("bankno_error.log", $error, FILE_APPEND);
		}

		if( strlen($bankno) > 7 ){
			$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

			$this->sql = $sql;
		}

		printf("%s\n",$this->sql);
	}

}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
				PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
				PDO::MYSQL_ATTR_COMPRESS => true
				)
			);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
	echo '【' , date('H:i:s') , '】', '【系統(tǒng)錯誤】', $e->getMessage(), "\n";
}
?> 
pthreads Pool類
logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
		return $work->isComplete();
		});

var_dump($pool);			

七、多線程文件安全讀寫

  • LOCK_SH 取得共享鎖定(讀取的程序)

  • LOCK_EX 取得獨(dú)占鎖定(寫入的程序

  • LOCK_UN 釋放鎖定(無論共享或獨(dú)占)

  • LOCK_NB 如果不希望 flock() 在鎖定時堵塞

八、多線程與數(shù)據(jù)連接

pthreads 與 pdo 同時使用是,需要注意一點(diǎn),需要靜態(tài)聲明public static $dbh;并且通過單例模式訪問數(shù)據(jù)庫連接。

Worker 與 PDO
worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>
Pool 與 PDO

在線程池中鏈接數(shù)據(jù)庫

# cat pool.php
logger = $logger;
	}

	protected $logger;
}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
                $dbhost = 'db.example.com';               // 數(shù)據(jù)庫服務(wù)器
                $dbuser = 'example.com';                 // 數(shù)據(jù)庫用戶名
                $dbpw = 'password';                               // 數(shù)據(jù)庫密碼
                $dbname = 'example_real';
		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
                        )
                );
		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";
		#echo $sql;
		$row = $dbh->query($sql);
		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);
		if($mt4_trades){

			$row = null;

			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";
			$dbh->query($sql);
			#printf("%s\n",$sql);
		}
		$dbh = null;
		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);

	}
}

class Logging extends Stackable {
	protected  static $dbh;
	public function __construct() {
		$dbhost = 'db.example.com';			// 數(shù)據(jù)庫服務(wù)器
	        $dbuser = 'example.com';                 // 數(shù)據(jù)庫用戶名
        	$dbpw = 'password';                               // 數(shù)據(jù)庫密碼
		$dbname = 'example_real';			// 數(shù)據(jù)庫名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
			PDO::MYSQL_ATTR_COMPRESS => true
			)
		);

	}
	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}

	protected function getConnection(){
                return self::$dbh;
        }
}

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

$dbhost = 'db.example.com';                      // 數(shù)據(jù)庫服務(wù)器
$dbuser = 'example.com';                 // 數(shù)據(jù)庫用戶名
$dbpw = 'password';                               // 數(shù)據(jù)庫密碼
$dbname = 'db_example';
$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
        $pool->submit(new Work($account));
}

$pool->shutdown();

?> 

進(jìn)一步改進(jìn)上面程序,我們使用單例模式 $this->worker->getInstance(); 全局僅僅做一次數(shù)據(jù)庫連接,線程使用共享的數(shù)據(jù)庫連接

logger = $logger;
	#}

	#protected $logger;
	protected  static $dbh;
	public function __construct() {

	}
	public function run(){
		$dbhost = 'db.example.com';			// 數(shù)據(jù)庫服務(wù)器
	    $dbuser = 'example.com';        	// 數(shù)據(jù)庫用戶名
        $dbpw = 'password';             	// 數(shù)據(jù)庫密碼
		$dbname = 'example';				// 數(shù)據(jù)庫名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
			PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
			)
		);

	}
	protected function getInstance(){
        return self::$dbh;
    }

}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($data) {
		$this->data = $data;
		#print_r($data);
	}

	public function run() {
		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

		try {
			$dbh  = $this->worker->getInstance();
			#print_r($dbh);
               		$id = $this->data['id'];
			$mobile = safenet_decrypt($this->data['mobile']);
			#printf("%d, %s \n", $id, $mobile);
			if(strlen($mobile) > 11){
				$mobile = substr($mobile, -11);
			}
			if($mobile == 'null'){
			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";
			#	printf("%s\n",$sql);
			#	$dbh->query($sql);
				$mobile = '';
				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
			}else{
				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";
			}
			$sth = $dbh->prepare($sql);
			$sth->bindValue(':mobile', $mobile);
			$sth->bindValue(':id', $id);
			$sth->execute();
			#echo $sth->debugDumpParams();
		}
		catch(PDOException $e) {
			$error = sprintf("%s,%s\n", $mobile, $id );
			file_put_contents("mobile_error.log", $error, FILE_APPEND);
		}

		#$dbh = null;
		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);
	}
}

$pool = new Pool(100, \ExampleWorker::class, []);

#foreach (range(0, 100) as $number) {
#	$pool->submit(new Work($number));
#}

$dbhost = 'db.example.com';                     // 數(shù)據(jù)庫服務(wù)器
$dbuser = 'example.com';                 		// 數(shù)據(jù)庫用戶名
$dbpw = 'password';                             // 數(shù)據(jù)庫密碼
$dbname = 'example';
$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
#print_r($dbh);

#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':id',300);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':mobile', 'aa');
#$sth->bindValue(':id','272');
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();


$sql = "select id, mobile from members order by id asc"; // limit 1000";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
        #$order =  $account['order'];
        #printf("%s\n",$order);
        //print_r($members);
        $pool->submit(new Work($members));
		#unset($account['order']);
}

$pool->shutdown();

?>	
多線程中操作數(shù)據(jù)庫總結(jié)

總的來說 pthreads 仍然處在發(fā)展中,仍有一些不足的地方,我們也可以看到pthreads的git在不斷改進(jìn)這個項(xiàng)目

數(shù)據(jù)庫持久鏈接很重要,否則每個線程都會開啟一次數(shù)據(jù)庫連接,然后關(guān)閉,會導(dǎo)致很多鏈接超時。

 true
));
?>

以上是“php中pthreads怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


網(wǎng)站標(biāo)題:php中pthreads怎么用
網(wǎng)站地址:http://weahome.cn/article/podpos.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部