parallel\Runtime 类

(0.8.0)

Runtime 对象

每个 runtime 代表一个单独的 PHP 线程,该线程在构造时创建(并引导)。然后,线程等待任务被调度:已调度任务将按先进先出 (FIFO) 顺序执行,然后线程将恢复等待,直到调度更多任务,或者它被关闭、杀死或被 PHP 对象的正常作用域规则销毁。

警告

当 runtime 被 PHP 对象的正常作用域规则销毁时,它将首先执行所有已调度的任务,并在执行过程中阻塞。

Runtime 引导

当创建新的 runtime 时,它不会与创建它的线程(或进程)共享代码。这意味着它没有加载相同的类和函数,也没有设置相同的自动加载器。在某些情况下,一个非常轻量级的 runtime 是可取的,因为要调度的任务不需要访问父线程中的代码。在那些任务需要访问相同代码的情况下,只需设置一个自动加载器作为引导程序即可。

注意:

预加载可以与 parallel 一起使用,在这种情况下,预加载的代码无需引导即可使用

类概要

final class parallel\Runtime {
/* 创建 */
public __construct()
public __construct(string $bootstrap)
/* 执行 */
public run(Closure $task): ?Future
public run(Closure $task, array $argv): ?Future
/* 加入 */
public close(): void
public kill(): void
}

目录

添加注释

用户贡献的注释 4 个注释

20
Luciano Vettoretti
4 年前
basic-multithreading-test.php

<?php
use parallel\Runtime;
use
parallel\Channel;

$test = "this var is not accesible in a thread";

// 此函数将成为线程
$thread_function = function (int $id, Channel $ch) {
// 延迟第一个线程以模拟更好的多线程
// 第二个线程始终先完成
$sleep = ($id == 2) ? 1 : 2;
sleep($sleep);

// 打印线程 ID
// 所以很清楚第二个线程先执行
// 并且你也可以确保多线程正在工作
var_dump("thread $id sleep $sleep");

// 尝试捕获全局变量,但这是不可能的
echo '$GLOBALS["test"] = ';
@
var_dump($GLOBALS["test"]);

// 共享数据的唯一方式是通过通道
$ch->send($sleep);
};

try {
// 每个 runtime 代表一个线程
$r1 = new Runtime();
$r2 = new Runtime();

// 用于共享数据的通道
$ch1 = new Channel();

// 将发送到 $thread_function 的参数
$args = array();
$args[0] = null;
$args[1] = $ch1;

// 运行线程 1
$args[0] = 1;
$r1->run($thread_function, $args);

// 运行线程 2
$args[0] = 2;
$r2->run($thread_function, $args);

// 从通道接收数据
$x = $ch1->recv();
$y = $ch1->recv();

// 关闭通道
$ch1->close();

echo
"\nData received by the channel: $x and $y";
} catch (
Error $err) {
echo
"\nError:", $err->getMessage();
} catch (
Exception $e) {
echo
"\nException:", $e->getMessage();
}
5
gam6itko
2 年前
<?php

// 确保这是真正的并行执行(showcase2)。
// 如果你在任务函数中使用 echo,应用程序有可能会卡死。
// 我们应该控制 Futures 来避免这种情况。
// 每次执行都会得到不同的输出。

use parallel\{Future, Runtime};

const
THREADS_COUNT = 10;
const
THREADS_I_MAX = 100;

// 在线程中执行
$task = static function (int $i, int $to): void {
echo
"[enter$i]";
for (
$j = 0; $j < $to; $j++) {
echo
$i;
}
echo
"[exit$i]";
};

// 创建一些线程
$runtimeList = [];
for (
$i = 0; $i < THREADS_COUNT; $i++) {
$runtimeList[] = new Runtime();
}
// 运行所有线程
$futureList = [];
foreach (
$runtimeList as $i => $runtime) {
echo
"[run$i]";
$futureList[] = $runtime->run($task, [$i, THREADS_I_MAX]);
}

// 等待所有线程完成
// 如果你删除下面的代码,你的脚本将会卡死
do {
usleep(1);
$allDone = array_reduce(
$futureList,
function (
bool $c, Future $future): bool {
return
$c && $future->done();
},
true
);
} while (
false === $allDone);
echo
"done\n";
0
Mikhail Streltsov (admin at my-fantasy dot ru)
1 year ago
new Runtime() 会重置一些基本的 (在 10+ 之前) Linux 信号。

如果你已经通过 pcntl_signal 函数设置了信号处理程序 - 你需要再次进行设置。
0
gam6itko
2 年前
<?php

// 确保这是真正的并行执行
// 你会得到不同的输出,每次脚本运行的结果都会不同

use parallel\{Channel, Runtime};

const
THREADS_COUNT = 5;
const
THREADS_I_MAX = 10;

$ch = new Channel();

// 在线程中执行
$task = static function (Channel $ch, int $i, int $to): void {
echo
"[enter: $i]\n";
for (
$j = 0; $j < $to; $j++) {
$ch->send($i);
}
echo
"[exit: $i]\n";
};

// 创建一些线程
$runtimeList = [];
for (
$i = 0; $i < THREADS_COUNT; $i++) {
$runtimeList[] = new Runtime();
}
// 运行所有线程
foreach ($runtimeList as $i => $runtime) {
echo
"[run: $i]\n";
$runtime->run($task, [$ch, $i, THREADS_I_MAX]);
}

// 从通道读取消息
$queue = '';
for (
$i = 0; $i < THREADS_COUNT * THREADS_I_MAX; $i++) {
$queue .= $ch->recv();
}
$ch->close();

echo
$queue . PHP_EOL;
To Top