此範例示範了使用 pthreads 的 MTP 的各個方面 - 特別值得注意的是與子執行緒的雙向通訊。
我找不到任何相關資訊,因此我想向您展示我的研究結果。
<?php
class Model
{
public $id;
public $value;
}
class Connection
extends Worker
{
protected static $link;
public function __construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}
public function getConnection()
{
if(!self::$link)
{
echo 'Thread: '. $this->getThreadId() ." Connecting to db\n";
self::$link = new \PDO(...);
}
return self::$link;
}
}
class QueryTask
extends Threaded
{
public $data;
public $result;
protected $_complete;
public function __construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}
public function run()
{
$pdo = $this->worker->getConnection();
$text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;
$t = microtime(true);
$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')
");
$stmt->execute();
$dt = microtime(true) - $t;
$result = (int) $stmt->rowCount();
echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n";
$this->result = $result;
$this->_complete = true;
}
public function isGarbage() : bool
{
return $this->_complete;
}
}
$t = microtime(true);
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);
$tasks = 10;
for($i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();
$pool->submit(new QueryTask($object));
}
$data = [];
while(1)
{
$newData = [];
$pool->collect(function(QueryTask $task) use (&$newData) {
if($task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
$newData[ $task->data->id ] = $task->data->value;
}
return $task->isGarbage();
});
$data = array_merge($data, $newData);
if(count($data) == $tasks)
break;
usleep(100000);
}
var_dump($data);
?>
結果
執行緒:6796 連線到資料庫
執行緒:3156 連線到資料庫
執行緒:9040 連線到資料庫
執行緒:7748 連線到資料庫
執行緒:8836 連線到資料庫
工作:0 完成於:0.0070011615753174 秒
工作:4 完成於:0.0069999694824219 秒
工作:2 完成於:0.0090010166168213 秒
工作:3 完成於:0.0090010166168213 秒
工作:1 完成於:0.003000020980835 秒
工作:5 完成於:0.0069999694824219 秒
工作:7 完成於:0.0079998970031738 秒
工作:6 完成於:0.0049998760223389 秒
工作:9 完成於:0.0079998970031738 秒
工作:8 完成於:0.0069999694824219 秒
陣列(10) {
[0] =>
整數(17730)
[1] =>
整數(18771)
[2] =>
整數(12944)
[3] =>
整數(6025)
[4] =>
整數(29582)
[5] =>
整數(10159)
[6] =>
整數(26556)
[7] =>
整數(9029)
[8] =>
整數(15002)
[9] =>
整數(4043)
}
值得注意的事項
1. 為 10 個任務構建 5 個 worker。最後 5 個任務在已建立資料庫連線的現有執行緒上運行。
2. 您可以透過建立新任務並提交它來將資料「發送」到執行緒。
3. 您可以使用 collect 函式取得結果。
4. 您可以傳遞簡單的物件給任務建構子。