PHP 大会日本 2024

parallel\Channel 类

(0.9.0)

无缓冲通道

无缓冲通道将在对 parallel\Channel::send() 的调用上阻塞,直到有接收方,并在对 parallel\Channel::recv() 的调用上阻塞,直到有发送方。这意味着无缓冲通道不仅是任务之间共享数据的方式,而且是简单的同步方法。

无缓冲通道是在任务之间共享数据的最快方式,需要最少的复制。

缓冲通道

缓冲通道在对 parallel\Channel::send() 的调用上不会阻塞,直到达到容量,对 parallel\Channel::recv() 的调用将阻塞,直到缓冲区中有数据。

通道上的闭包

并行通道的一个强大功能是它们允许在任务(和运行时)之间交换闭包。

当闭包通过通道发送时,闭包会被缓冲,它不会改变传输闭包的通道的缓冲,但它确实会影响闭包内部的静态作用域:发送到不同运行时或相同运行时的相同闭包不会共享它们的静态作用域。

这意味着每当执行通过通道传输的闭包时,静态状态都将与闭包被缓冲时的状态相同。

匿名通道

匿名通道构造函数允许程序员避免为每个通道分配名称:parallel 将为匿名通道生成唯一名称。

类概要

final class parallel\Channel {
/* 匿名构造函数 */
public __construct()
public __construct(int $capacity)
/* 访问 */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* 共享 */
public recv(): mixed
public send(mixed $value): void
/* 关闭 */
public close(): void
/* 无限缓冲的常量 */
const Infinite;
}

目录

添加注释

用户贡献的注释 5 条注释

hdvianna
4 年前
这是一个使用通道为消费者生成数据的示例。在此示例中,生产者 Runtime 实例将发送消费者应休眠的秒数。

<?php

use parallel\{Runtime, Channel};

main($argv);

function
main(array $argv)
{
if (
count($argv) !== 3) {
echo
"用法:hello-parallel.php <任务数量> <最大休眠时间(秒)>" . PHP_EOL;
echo
"示例:hello-parallel.php 5 3" . PHP_EOL;
die;
} else {
$numberOfTasks = intval($argv[1]);
$maximumTimeOfSleep = intval($argv[2]);
$t1 = microtime(true);
parallelize($numberOfTasks, $maximumTimeOfSleep);
$endTime = microtime(true) - $t1;
echo
PHP_EOL."完成 $numberOfTasks 个任务,耗时 {$endTime} 秒".PHP_EOL;
}
}

function
parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
$channel = new Channel();

$taskIds = array_map(function () use ($maximumTimeOfSleep) {
return
$id = uniqid("task::");
},
range(0, $numberOfTasks - 1));

$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
return
rand(1, $maximumTimeOfSleep);
},
$taskIds);

$producer = new Runtime();
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
foreach (
$timesToSleep as $timeToSleep) {
$channel->send($timeToSleep);
}
}, [
$channel, $timesToSleep]);

$consumerFutures = array_map(function (string $id) use ($channel) {
$runtime = new Runtime();
return
$runtime->run(function (string $id, Channel $channel) {
$timeToSleep = $channel->recv();
echo
"任务 $id:我将休眠 $timeToSleep 秒。".PHP_EOL;
sleep($timeToSleep);
echo
"任务 $id:已休眠 $timeToSleep 秒。".PHP_EOL;
return
$timeToSleep;
}, [
$id, $channel]);
},
$taskIds);

wait($consumerFutures);
wait([$producerFuture]);
}

function
wait(array $futures)
{
return
array_map(function ($future) {
return
$future->value();
},
$futures);
}
rustysun
5 年前
一个使用无缓冲通道的示例。
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
};
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
//无缓冲通道
$runtime=new Runtime;
$ch2=new Channel;
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
$runtime->run($sum, [array_slice($a, $num), $ch2]);
//从通道接收
$x=$ch2->recv();
$y=$ch2->recv();
$ch2->close();
echo
"\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\n错误:", $err->getMessage();
} catch(
Exception $e) {
echo
"\n异常:", $e->getMessage();
}

//输出
//ch2:18 23 41
gam6itko
3 年前
<?php

// 用一种非常奇怪的方式计算阶乘 ^_^
// 我们创建一个线程并使用带缓冲区的通道进行同步
// 在 fact 函数中,一次只执行一个线程

use parallel\{Channel, Future, Runtime};

for (
$n = 0; $n <= 10; $n++) {
echo
"!$n = " . factorial($n) . PHP_EOL;
}

/**
* 创建 $n 个线程。
*/
function factorial(int $n): int
{
// 带缓冲区的通道 - 用于同步线程 ^_^
$channel = new Channel(1);
$futureList = [];
for (
$i = 2; $i <= $n; $i++) {
$runtime = new Runtime();
$futureList[] = $runtime->run(
static function (
Channel $channel, $multiplier): void {
$f = $channel->recv();
$channel->send($f * $multiplier);
},
[
$channel, $i]
);
}

$channel->send(1);

// 等待所有线程完成
do {
$allDone = array_reduce(
$futureList,
function (
bool $c, Future $future): bool {

return
$c && $future->done();
},
true
);
} while (
false === $allDone);

return
$channel->recv();
}

// 输出:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800
rustysun
5 年前
<?php
use parallel\Channel;

function
sum(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
}

try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
$ch1=Channel::make('sum', 2);
$ch2=new Channel;
$num=count($a) / 2;
sum(array_slice($a, 0, $num), $ch1);
sum(array_slice($a, $num), $ch1);

// 从通道接收数据
$x=$ch1->recv();
$y=$ch1->recv();
$ch1->close();
echo
"\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
echo
"\nException:", $e->getMessage();
}
thierry at pielo dot net
3 个月前
<?php

/**
* Bzz 重新加载!
* 并行运行两个简单的任务,并使用通道进行同步
*
* parallel\Channel(int $capacity): 带缓冲区的通道
* 创建一个用于任务之间通信的带缓冲区的通道
* @ref https://php.net/manual/en/class.parallel-channel.php
*/

echo "zzz... " . PHP_EOL;

// 创建新的带缓冲区的通道
$channel = new \parallel\Channel(2);

\parallel\run(
function(
$channel) {
$snaps_count = rand (8, 12);
echo
"快照数量:$snaps_count" . PHP_EOL;
for (
$i=1; $i<=$snaps_count; $i++) {
$other_sleep_time = rand(3, 5);
$my_sleep_time = rand(1, 3);
echo
"发送睡眠时间到缓冲区" . PHP_EOL;
$start = microtime(true);
$channel->send($other_sleep_time);
$wait_time = microtime(true) - $start;
if (
$wait_time > .1) {
echo
"缓冲区已满。我等待了 " . round($wait_time) . " 秒" . PHP_EOL;
}
echo
"我睡眠 {$my_sleep_time} 秒" . PHP_EOL;
sleep($my_sleep_time);
}
echo
"我已完成睡眠。关闭通道" . PHP_EOL;
$channel->close();
},
[
$channel]
);

\parallel\run(
function(
$channel) {
try {
while(
true) {
$my_sleep_time = $channel->recv();
echo
"另一个睡眠 {$my_sleep_time} 秒" . PHP_EOL;
sleep($my_sleep_time);
}
} catch(
\parallel\Channel\Error\Closed $e) {
echo
"通道已关闭。另一个退出。";
die;
}
},
[
$channel]
);
To Top