PHP Conference Japan 2024

Pool::collect

(PECL pthreads >= 2.0.0)

Pool::collect收集已完成任務的參考

說明

public Pool::collect(Callable $collector = ?): int

允許線程池收集被(選擇性提供的)收集器判定為垃圾的參考。

參數

collector

一個 Callable 收集器,它會根據任務是否可以收集而返回一個布林值。只有在極少數情況下才需要使用自定義收集器。

回傳值

線程池中剩餘待收集的任務數量。

更新日誌

版本 說明
PECL pthreads 3.0.0 現在返回一個整數,並且 collector 參數現在是可選的。

範例

範例 #1 Pool::collect() 的基本範例

<?php
$pool
= new Pool(4);

for (
$i = 0; $i < 15; ++$i) {
$pool->submit(new class extends Threaded {});
}

while (
$pool->collect()); // 阻塞直到所有任務執行完畢

$pool->shutdown();

新增註記

使用者貢獻的註記 4 則註記

meadowsjared at gmail dot com
3 年前
在此範例中,展示了如何使用 pThreads v3.2.1 和 php 7.3.23,搭配 Threaded 和 pool 來獲取結果陣列。

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
protected $complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData) {
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run() {
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function
isDone() {
return
$this->complete;
}
}
class
ExamplePool extends Pool {
public
$data = array(); // used to return data after we're done
private $numTasks = 0; // counter used to know when we're done
/**
* override the submit function from the parent
* to keep track of our jobs
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* used to wait until all workers are done
*/
public function process() {
// Run this loop as long as we have
// jobs in the pool
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// If a task was marked as done, collect its results
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return
$task->isDone();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
your dot brother dot t at hotmail dot com
9 年前
範例程式碼會崩潰,讓我浪費了 2 個工作天。
首先,`Stackable` 沒有名為 $worker 的屬性,或者它的存取方法使其無法存取。

其次,`Stackable` 也沒有 `getThreadId()`。最佳做法是使用 `Thread` 類別來實現執行緒,因為它具有更多控制功能。最好使用 `Stackable` 來儲存物件,並將其 `run()` 作為初始化方法。

可運作的範例如下:

<?php
類別 MyWork 繼承 Thread {
受保護的
$complete;

公開函式
__construct() {
$this->complete = false;
}

公開函式
run() {
printf(
"來自 %s 在執行緒 #%lu 的問候\n",
__CLASS__, $this->getThreadId());
$this->complete = true;
}

公開函式
isComplete() {
返回
$this->complete;
}
}

類別
Something {}

類別
MyWorker 繼承 Worker {

公開函式
__construct(Something $something) {
$this->something = $something;
}

公開函式
run() {
/** ... **/
}
}

$pool = new Pool(8, \MyWorker::class, [new Something()]);
$pool->submit(new MyWork());

usleep(1000);

$pool->collect(function($work){
返回
$work->isComplete();
});
var_dump($pool);
?>
meadowsjared at gmail dot com
8 年前
請注意,使用 collect 函式時,重要的是您必須擴展 pool 類別,以便您可以持續檢查已完成的執行緒,直到它們全部完成。

<?php
class TestWork extends Threaded {
protected
$complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData){
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run(){
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function
isGarbage() {
return
$this->complete;
}
}
class
ExamplePool extends Pool
{
public
$data = array();
public function
process()
{
// Run this loop as long as we have
// jobs in the pool
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// If a task was marked as done
// collect its results
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return
$task->isGarbage();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
l00k at protonmail dot com
7 年前
此範例示範了使用 pthreads 的 MTP 的各個方面 - 特別值得注意的是與子執行緒的雙向通訊。
我找不到任何相關資訊,因此我想向您展示我的研究結果。

<?php

class Model
{

public
$id;
public
$value;

}

class
Connection
extends Worker
{

protected static
$link;


public function
__construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}

public function
getConnection()
{
if(!
self::$link)
{
echo
'Thread: '. $this->getThreadId() ." Connecting to db\n";
self::$link = new \PDO(...);
}

return
self::$link;
}

}

/** @property Connection $worker */
class QueryTask
extends Threaded
{

public
$data;
public
$result;

protected
$_complete;


public function
__construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}

public function
run()
{
/** @var \PDO $pdo */
$pdo = $this->worker->getConnection();

$text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;

$t = microtime(true);

$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '"
. $text ."')
"
);
$stmt->execute();

$dt = microtime(true) - $t;

$result = (int) $stmt->rowCount();

echo
$text .' Result: '. $result .' Exec time: '. $dt ."s\n";

$this->result = $result;
$this->_complete = true;
}

public function
isGarbage() : bool
{
return
$this->_complete;
}

}

$t = microtime(true);

// uruchomienie
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);

// zadania
$tasks = 10;

for(
$i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();

$pool->submit(new QueryTask($object));
}

// oczekiwanie na zakonczenie
$data = [];

while(
1)
{
$newData = [];

$pool->collect(function(QueryTask $task) use (&$newData) {
if(
$task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;

$newData[ $task->data->id ] = $task->data->value;
}

return
$task->isGarbage();
});

$data = array_merge($data, $newData);

if(
count($data) == $tasks)
break;

usleep(100000);
}

var_dump($data);
?>

結果
執行緒:6796 連線到資料庫
執行緒:3156 連線到資料庫
執行緒:9040 連線到資料庫
執行緒:7748 連線到資料庫
執行緒:8836 連線到資料庫
工作:0 完成於:0.0070011615753174 秒
工作:4 完成於:0.0069999694824219 秒
工作:2 完成於:0.0090010166168213 秒
工作:3 完成於:0.0090010166168213 秒
工作:1 完成於:0.003000020980835 秒
工作:5 完成於:0.0069999694824219 秒
工作:7 完成於:0.0079998970031738 秒
工作:6 完成於:0.0049998760223389 秒
工作:9 完成於:0.0079998970031738 秒
工作:8 完成於:0.0069999694824219 秒

陣列(10) {
[0] =>
整數(17730)
[1] =>
整數(18771)
[2] =>
整數(12944)
[3] =>
整數(6025)
[4] =>
整數(29582)
[5] =>
整數(10159)
[6] =>
整數(26556)
[7] =>
整數(9029)
[8] =>
整數(15002)
[9] =>
整數(4043)
}

值得注意的事項
1. 為 10 個任務構建 5 個 worker。最後 5 個任務在已建立資料庫連線的現有執行緒上運行。
2. 您可以透過建立新任務並提交它來將資料「發送」到執行緒。
3. 您可以使用 collect 函式取得結果。
4. 您可以傳遞簡單的物件給任務建構子。
To Top