parallel\run

(1.0.0)

parallel\run执行

描述

parallel\run(Closure $task): ?Future

应将 task 排入队列以在并行中执行。

parallel\run(Closure $task, array $argv): ?Future

应将 task 排入队列以在并行中执行,在执行时传递 argv

自动调度

如果 \parallel\Runtime 由先前对 parallel\run() 的调用内部创建并缓存,并且处于空闲状态,则将使用它来执行任务。如果 \parallel\Runtime 不处于空闲状态,parallel 将创建并缓存一个 \parallel\Runtime

注意:

程序员创建的 \parallel\Runtime 对象不会用于自动调度。

参数

task

具有特定特征的 Closure

argv

一个具有特定特征的 array,将在执行时传递给 task

任务特征

排入队列以进行并行执行的闭包必须

  • 不接受或通过引用返回
  • 不接受或返回内部对象(参见注释)
  • 执行一组有限的指令

禁止在用于并行执行的闭包中使用的指令包括

  • yield
  • 按引用使用
  • declare class
  • declare named function

注意:

嵌套的闭包可以 yield 或按引用使用,但不能包含类或命名函数声明。

注意:

在任务可能包含的文件中,没有指令是被禁止的。

参数特征

参数必须

  • 不包含引用
  • 不包含资源
  • 不包含内部对象(参见注释)

注意:

在文件流资源的情况下,该资源将尽可能地转换为文件描述符并作为 int 传递,这在 Windows 上不受支持。

内部对象注释

内部对象通常使用自定义结构,不能安全地按值复制,PHP 目前缺乏执行此操作的机制(不进行序列化),因此只有不使用自定义结构的对象可以共享。

一些内部对象不使用自定义结构,例如 parallel\Events\Event,因此可以共享。

闭包是一种特殊的内部对象,支持按值复制,因此可以共享。

通道是编写并行代码的核心,支持并发访问和执行,因此可以共享。

警告

扩展内部类的用户类可能使用由内部类定义的自定义结构,在这种情况下,它们不能安全地按值复制,因此不能共享。

返回值

警告

当任务包含 return 或 throw 语句时,不能忽略返回的 parallel\Future

异常

警告

如果 parallel\Runtime 已关闭,则应抛出 parallel\Runtime\Error\Closed

警告

如果 task 是从内部函数创建的闭包,则应抛出 parallel\Runtime\Error\IllegalFunction

警告

如果 task 包含非法指令,则应抛出 parallel\Runtime\Error\IllegalInstruction

警告

如果 task 接受或 argv 包含非法变量,则应抛出 parallel\Runtime\Error\IllegalParameter

警告

如果 task 非法返回,则应抛出 parallel\Runtime\Error\IllegalReturn

添加注释

用户贡献注释 3 个注释

john_2885 at yahoo dot com
4 年前
这是一个更实质性的关于如何使用 run 函数式 API 的示例。

<?php
/*********************************************
* 并行函数式 API 示例
*
* 场景
* -------------------------------------------
* 假设要处理大量数据行,将工作分配给
* 一组工作器。每个工作器负责
* 完成其分配的任务。
*
* 在下面的代码中,假设我们有任意
* 起始和结束 ID(行)- 我们将尝试
* 将 ID 数量(行)平均分配
* 给 8 个工作器。工作器将获得以下批次
* 以完成处理:
*
* 总 ID 数量(行):1371129
* 每个工作器将获得 171392 个 ID 来处理
*
* 工作器 1:ID 从 11001 到 182393
* 工作器 2:ID 从 182393 到 353785
* 工作器 3:ID 从 353785 到 525177
* 工作器 4:ID 从 525177 到 696569
* 工作器 5:ID 从 696569 到 867961
* 工作器 6:ID 从 867961 到 1039353
* 工作器 7:ID 从 1039353 到 1210745
* 工作器 8:ID 从 1210745 到 1382130
*
* 每个工作器然后一次处理 5000 行
* 直到他们完成他们分配的工作
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// 尝试将 ID 均匀分配给工作器数量
$batchSize = ceil($totalIds / $workers);
// 最后一批获得剩余部分
$lastBatch = $totalIds % $batchSize;
// 用于将整体任务划分为子批次的 ID(行)数量
// 任务
$rowsToFetch = 5000;

print
"总 ID: " . $totalIds . "\n";
print
"批次大小: " . $batchSize . "\n";
print
"最后一批: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"工作器 " . $worker . " 处理 ID 从 " . $startId . " 到 " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"工作器 " . $worker . " 完成批次 " . $fetchCount . " 从 ID " . $tempMinId . " 到 " . $tempMaxId . "\n";
// 完成后需要显式退出 for 循环,否则它将永远只处理第一个子批次
break;
}

// 现在我们继续处理此工作器的下一个子批次
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// 引入一些时间随机性
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// 此工作器已完成其整个批次
print "工作器 " . $worker . " 完成\n";

};

// 创建我们的工作器并让他们开始处理他们的任务
// 在这种情况下,它是一组 171392 个 ID 要处理
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
anonymous user
3 年前
虽然在线程执行代码中不允许函数声明,但允许包含。因此,如果我们想声明一个函数,我们可以编写另一个包含该函数的文件并将其包含在内。
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
Thierry Kauffmann
2 年前
<?php

/**
* 示例并行函数式 API
* 使用生成器而不是静态项目列表来进行处理
*
* 并行处理的项目来自生成器
* 它可以是任何东西:例如获取 MySQL 数组、DirectoryIterator...
* 因此,并行处理的项目数量事先未知
*
* 此算法动态地将项目分配给每个并行线程
* 只要一个线程完成工作
* 它会被分配一个新的项目进行处理
* 直到所有项目都处理完毕(生成器关闭)
*
* 在此示例中,我们在 5 个并行线程中处理 50 个项目
* 它以以下形式产生输出(输出在每次运行时都会发生变化):
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// 使用生成器生成要处理的项目列表
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// 在每个线程中执行的函数。例如,有一个随机时间快照!
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// 用初始的“非活动”状态填充线程
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// 循环遍历线程,直到所有线程都完成
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// 线程处于非活动状态,并且生成器仍然有值:在线程中运行某些东西
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// 如果生成器比线程数量更早关闭,则销毁辅助线程
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// 线程完成。获取结果
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// 生成器已关闭,然后销毁线程
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// 线程已准备好再次运行
$threads[$thread_id]['is_active'] = false;
}
}
}

// 当所有线程都被销毁时退出循环
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top