PHP Conference Japan 2024

parallel\run

(1.0.0)

parallel\run执行

描述

parallel\run(Closure $task): ?Future

task 调度到并行执行。

parallel\run(Closure $task, array $argv): ?Future

task 调度到并行执行,并在执行时传递 argv

自动调度

如果之前调用 parallel\run() 内部创建并缓存的 \parallel\Runtime 处于空闲状态,它将用于执行任务。如果没有空闲的 \parallel\Runtime,parallel 将创建并缓存一个 \parallel\Runtime

注意:

程序员创建的 \parallel\Runtime 对象不会用于自动调度。

参数

task

具有特定特征的 Closure

argv

一个 array,包含要在执行时传递给 task 的具有特定特征的参数。

任务特征

计划用于并行执行的闭包不能

  • 按引用传递或返回
  • 接受或返回内部对象(请参阅注释)
  • 执行有限的指令集

禁止在计划用于并行执行的闭包中使用的指令为

  • yield
  • 按引用使用
  • 声明类
  • 声明命名函数

注意:

嵌套闭包可以 yield 或按引用使用,但不能包含类或命名函数声明。

注意:

任务可能包含的文件中没有指令被禁止。

参数特征

参数不能

  • 包含引用
  • 包含资源
  • 包含内部对象(请参阅注释)

注意:

在文件流资源的情况下,该资源将尽可能转换为文件描述符并作为 int 传递,这在 Windows 上不受支持。

内部对象注释

内部对象通常使用无法安全地按值复制的自定义结构,PHP 目前缺乏执行此操作的机制(无需序列化),因此只能共享不使用自定义结构的对象。

一些内部对象不使用自定义结构,例如 parallel\Events\Event,因此可以共享。

闭包是一种特殊的内部对象,支持按值复制,因此可以共享。

通道对于编写并行代码至关重要,并且支持并发访问和执行,因此可以共享。

警告

扩展内部类的用户类可能会使用内部类定义的自定义结构,在这种情况下,它们不能安全地按值复制,因此可能无法共享。

返回值

警告

当任务包含 return 或 throw 语句时,不能忽略返回的 parallel\Future

异常

警告

如果 parallel\Runtime 已关闭,则将抛出 parallel\Runtime\Error\Closed

警告

如果 task 是从内部函数创建的闭包,则将抛出 parallel\Runtime\Error\IllegalFunction

警告

如果 task 包含非法指令,则将抛出 parallel\Runtime\Error\IllegalInstruction

警告

如果 task 接受或 argv 包含非法变量,则将抛出 parallel\Runtime\Error\IllegalParameter

警告

如果 task 非法返回,则将抛出 parallel\Runtime\Error\IllegalReturn

添加注释

用户贡献注释 3 条注释

22
john_2885 at yahoo dot com
4 年前
这是一个关于如何使用 run 函数式 API 的更完整的示例。

<?php
/*********************************************
* 并行函数式 API 示例
*
* 场景
* -------------------------------------------
* 给定大量需要处理的数据行,将工作分配给
* 一组工作线程。每个工作线程负责
* 完成其分配的任务。
*
* 在下面的代码中,假设我们有任意
* 的起始和结束 ID(行) - 我们将尝试
* 将 ID(行)的数量平均分配
* 给 8 个工作线程。工作线程将获得
* 以下批次以完成处理:
*
* ID(行)总数:1371129
* 每个工作线程将处理 171392 个 ID
*
* 工作线程 1:ID 从 11001 到 182393
* 工作线程 2:ID 从 182393 到 353785
* 工作线程 3:ID 从 353785 到 525177
* 工作线程 4:ID 从 525177 到 696569
* 工作线程 5:ID 从 696569 到 867961
* 工作线程 6:ID 从 867961 到 1039353
* 工作线程 7:ID 从 1039353 到 1210745
* 工作线程 8:ID 从 1210745 到 1382130
*
* 然后,每个工作线程一次处理 5000 行
* 直到他们完成分配的工作
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// 尝试将 ID 平均分配到工作线程数量中
$batchSize = ceil($totalIds / $workers);
// 最后一批获得剩余的任何内容
$lastBatch = $totalIds % $batchSize;
// 要将整体任务划分为子批次的 ID(行)数量
// 任务
$rowsToFetch = 5000;

print
"总 ID 数: " . $totalIds . "\n";
print
"批次大小: " . $batchSize . "\n";
print
"最后一批: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"工作线程 " . $worker . " 正在处理 ID 从 " . $startId . " 到 " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"工作线程 " . $worker . " 完成了批次 " . $fetchCount . " 从 ID " . $tempMinId . " 到 " . $tempMaxId . "\n";
// 完成后需要显式地跳出 for 循环,否则它将永远只处理第一个子批次
break;
}

// 现在我们继续处理此工作线程的下一个子批次
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// 引入一些时间随机性
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// 此工作线程已完成其整个批次
print "工作线程 " . $worker . " 完成\n";

};

// 创建我们的工作线程并让他们开始处理他们的任务
// 在这种情况下,它是一组需要处理的 171392 个 ID
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
9
匿名用户
3 年前
虽然在线程执行代码中不允许函数声明,但允许包含文件。因此,如果我们想要声明一个函数,我们可以编写另一个包含该函数的文件并将其包含进来。
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# 输出:4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
1
Thierry Kauffmann
3 年前
<?php

/**
* 示例并行函数式 API
* 使用生成器而不是静态项目列表进行处理
*
* 并行处理的项目来自生成器
* 它可以是任何东西:例如获取 MySQL 数组、DirectoryIterator 等...
* 因此,并行处理的项目数量事先未知
*
* 此算法动态地将项目分配给每个并行线程
* 线程完成工作后
* 它会被分配一个新的项目来处理
* 直到所有项目都被处理(生成器关闭)
*
* 在此示例中,我们使用 5 个并行线程处理 50 个项目
* 它以这种形式输出结果(输出在每次运行时都会更改):
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// 使用生成器生成要处理的项目列表
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// 在每个线程中执行的函数。例如,随机休眠一段时间!
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// 使用初始“非活动”状态填充线程
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// 循环遍历线程,直到所有线程都完成
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// 线程处于非活动状态且生成器仍有值:在线程中运行某些内容
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// 如果生成器关闭得比线程数量早,则销毁辅助线程
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// 线程已完成。获取结果
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// 生成器已关闭,然后销毁线程
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// 线程已准备好再次运行
$threads[$thread_id]['is_active'] = false;
}
}
}

// 当所有线程都被销毁时退出循环
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top