2024 年 PHP Conference Japan

parallel\run

(1.0.0)

parallel\run執行

說明

parallel\run(Closure $task): ?Future

將排程 task 以平行方式執行。

parallel\run(Closure $task, array $argv): ?Future

應排程 task 並行執行,並在執行時傳遞 argv

自動排程

如果先前呼叫 parallel\run() 時建立並快取的 \parallel\Runtime 處於閒置狀態,則會使用它來執行任務。如果沒有閒置的 \parallel\Runtime,parallel 將會建立並快取一個 \parallel\Runtime

注意事項:

由程式設計師建立的 \parallel\Runtime 物件不會用於自動排程。

參數

task

具有特定特性的 Closure (閉包)。

argv

一個具有特定特性的 array (陣列) 參數,將在執行時傳遞給 task

任務特性

排程用於並行執行的閉包不得

  • 以傳址方式接收或返回
  • 接收或返回內部物件(參見注意事項)
  • 執行一組有限的指令

禁止在用於並行執行的閉包中使用的指令如下:

  • yield
  • 以傳址方式使用
  • 宣告類別
  • 宣告具名函式

注意事項:

巢狀閉包可以使用 yield 或傳址方式使用,但不能包含類別或具名函式宣告。

注意事項:

任務可能包含的檔案中沒有任何指令是被禁止的。

參數特性

參數不得

  • 包含參考
  • 包含資源
  • 包含內部物件(參見注意事項)

注意事項:

如果是檔案串流資源,資源將會被轉換為檔案描述符,並盡可能以 int (整數) 形式傳遞,Windows 不支援此功能。

內部物件注意事項

內部物件通常使用自訂結構,無法安全地以值複製,PHP 目前缺乏執行此操作的機制(不使用序列化),因此只能共享不使用自訂結構的物件。

某些內部物件不使用自訂結構,例如 parallel\Events\Event,因此可以共享。

閉包是一種特殊的內部物件,支援以值複製,因此可以共享。

通道是編寫並行程式碼的核心,並且必然支援並行存取和執行,因此可以共享。

警告

繼承內部類別的使用者類別可能會使用由內部類別定義的自訂結構,在這種情況下,它們無法安全地以值複製,因此可能無法共享。

返回值

警告

當任務包含 return 或 throw 陳述式時,不得忽略返回的 parallel\Future

例外

警告

如果 parallel\Runtime 已關閉,將會拋出 parallel\Runtime\Error\Closed

警告

如果 task 是從內部函式建立的閉包,將會拋出 parallel\Runtime\Error\IllegalFunction

警告

如果 task 包含非法指令,將會拋出 parallel\Runtime\Error\IllegalInstruction

警告

如果 task 接收或 argv 包含非法變數,將會拋出 parallel\Runtime\Error\IllegalParameter

警告

如果 task 非法返回,將會拋出 parallel\Runtime\Error\IllegalReturn

另請參閱

新增註解

使用者貢獻的註解 3 則註解

john_2885 at yahoo dot com
4 年前
這裡有一個更具體的例子,說明如何使用 run 函式 API。

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers. Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers. The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"Worker " . $worker . " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
break;
}

// Now we move on to the next sub-batch for this worker
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// Introduce some timing randomness
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// This worker has completed their entire batch
print "Worker " . $worker . " finished\n";

};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
匿名用戶
3 年前
雖然在執行緒執行程式碼中不允許函式宣告,但允許 include。因此,如果我們想要宣告一個函式,我們可以編寫另一個包含該函式的檔案並將其 include 進來。
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# 輸出:4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
Thierry Kauffmann
3 年前
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// Function executing in each thread. Have a snap for a random time for example !
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// Fill up threads with initial 'inactive' state
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// Loop through threads until all threads are finished
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// Thread is inactive and generator still has values : run something in the thread
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// Destroy supplementary threads in case generator closes sooner than number of threads
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// Thread finished. Get results
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// Generator is closed then destroy thread
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// Thread is ready to run again
$threads[$thread_id]['is_active'] = false;
}
}
}

// Escape loop when all threads are destroyed
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top