PHP Conference Japan 2024

parallel\Runtime 类

(0.8.0)

运行时对象

每个运行时表示一个单独的 PHP 线程,线程在构造时创建(并引导)。然后线程等待调度任务:已调度的任务将按 FIFO 执行,然后线程将恢复等待,直到调度更多任务,或者它被关闭、终止或由 PHP 对象的正常作用域规则销毁。

警告

当运行时由 PHP 对象的正常作用域规则销毁时,它将首先执行所有已调度的任务,并在执行过程中阻塞。

运行时引导

当创建一个新的运行时时,它不与创建它的线程(或进程)共享代码。这意味着它没有加载相同的类和函数,也没有设置相同的自动加载器。在某些情况下,非常轻量级的运行时是可取的,因为要调度的任务不需要访问父线程中的代码。在任务确实需要访问相同代码的情况下,只需将自动加载器设置为引导程序即可。

注意:

预加载可以与 parallel 一起使用,在这种情况下,预加载的代码无需引导即可使用

类概要

final class parallel\Runtime {
/* 创建 */
public __construct()
public __construct(string $bootstrap)
/* 执行 */
public run(Closure $task): ?Future
public run(Closure $task, array $argv): ?Future
/* 加入 */
public close(): void
public kill(): void
}

目录

添加注释

用户贡献的注释 4 条注释

20
Luciano Vettoretti
5 年前
basic-multithreading-test.php

<?php
use parallel\Runtime;
use
parallel\Channel;

$test = "this var is not accesible in a thread";

// 此函数将作为线程
$thread_function = function (int $id, Channel $ch) {
// 延迟第一个线程以模拟更好的多线程
// 第二个线程始终先完成
$sleep = ($id == 2) ? 1 : 2;
sleep($sleep);

// 打印线程 ID
// 因此很清楚第二个线程先执行
// 并且您还可以确保多线程正在工作
var_dump("thread $id sleep $sleep");

// 尝试捕获全局变量,但这是不可能的
echo '$GLOBALS["test"] = ';
@
var_dump($GLOBALS["test"]);

// 共享数据的唯一方法是在通道之间
$ch->send($sleep);
};

try {
// 每个运行时表示一个线程
$r1 = new Runtime();
$r2 = new Runtime();

// 通道,数据将在其中共享
$ch1 = new Channel();

// 将发送到 $thread_function 的参数
$args = array();
$args[0] = null;
$args[1] = $ch1;

// 运行线程 1
$args[0] = 1;
$r1->run($thread_function, $args);

// 运行线程 2
$args[0] = 2;
$r2->run($thread_function, $args);

// 从通道接收数据
$x = $ch1->recv();
$y = $ch1->recv();

// 关闭通道
$ch1->close();

echo
"\nData received by the channel: $x and $y";
} catch (
Error $err) {
echo
"\nError:", $err->getMessage();
} catch (
Exception $e) {
echo
"\nException:", $e->getMessage();
}
5
gam6itko
3 年前
<?php

// 确保这是真正的并行执行(showcase2)。
// 然后你在任务函数中使用 echo,应用程序很有可能挂起。
// 我们应该控制 Futures 来避免这种行为。
// 你每次执行都会得到不同的输出。

use parallel\{Future, Runtime};

const
THREADS_COUNT = 10;
const
THREADS_I_MAX = 100;

// 在线程中执行
$task = static function (int $i, int $to): void {
echo
"[enter$i]";
for (
$j = 0; $j < $to; $j++) {
echo
$i;
}
echo
"[exit$i]";
};

// 创建一些线程
$runtimeList = [];
for (
$i = 0; $i < THREADS_COUNT; $i++) {
$runtimeList[] = new Runtime();
}
// 运行所有线程
$futureList = [];
foreach (
$runtimeList as $i => $runtime) {
echo
"[run$i]";
$futureList[] = $runtime->run($task, [$i, THREADS_I_MAX]);
}

// 等待所有线程完成
// 如果你删除下面的代码,你的脚本将会挂起
do {
usleep(1);
$allDone = array_reduce(
$futureList,
function (
bool $c, Future $future): bool {
return
$c && $future->done();
},
true
);
} while (
false === $allDone);
echo
"done\n";
0
Mikhail Streltsov (my-fantasy dot ru网站管理员)
1 年前
new Runtime() 将重置一些基本的(在 10+ 之前)Linux 信号。

如果您已经通过 pcntl_signal 函数设置了信号处理程序 - 您需要再次执行此操作
0
gam6itko
3 年前
<?php

// 确保这是真正的并行执行
// 你在每次运行脚本时都会得到不同的输出

use parallel\{Channel, Runtime};

const
THREADS_COUNT = 5;
const
THREADS_I_MAX = 10;

$ch = new Channel();

// 在线程中执行
$task = static function (Channel $ch, int $i, int $to): void {
echo
"[enter: $i]\n";
for (
$j = 0; $j < $to; $j++) {
$ch->send($i);
}
echo
"[exit: $i]\n";
};

// 创建一些线程
$runtimeList = [];
for (
$i = 0; $i < THREADS_COUNT; $i++) {
$runtimeList[] = new Runtime();
}
// 运行所有线程
foreach ($runtimeList as $i => $runtime) {
echo
"[run: $i]\n";
$runtime->run($task, [$ch, $i, THREADS_I_MAX]);
}

// 从通道读取消息
$queue = '';
for (
$i = 0; $i < THREADS_COUNT * THREADS_I_MAX; $i++) {
$queue .= $ch->recv();
}
$ch->close();

echo
$queue . PHP_EOL;
To Top