parallel\Events 类

(0.9.0)

事件循环

事件循环监视一组 future 或 channel(目标)的状态,以便在目标可用且操作可在不阻塞事件循环的情况下执行时,执行读取(parallel\Future::value()parallel\Channel::recv())和写入(parallel\Channel::send())操作。

类概要

final class parallel\Events implements Countable, Traversable {
/* 输入 */
public setInput(Input $input): void
/* 目标 */
public addChannel(parallel\Channel $channel): void
public addFuture(string $name, parallel\Future $future): void
public remove(string $target): void
/* 行为 */
public setBlocking(bool $blocking): void
public setTimeout(int $timeout): void
/* 轮询 */
}

目录

添加笔记

用户贡献笔记 3 笔记

16
s dot laufer at homegear dot email
4 年前
<?php
/**
展示事件使用的示例。

文档目前还很薄弱,所以我不确定这个示例是否是最佳解决方案。但它确实有效。
*/
use parallel\{Channel,Runtime,Events,Events\Event};

$myThread = function(Channel $channel) {
$events = new Events();
$events->addChannel($channel);
//$events->setBlocking(false); //取消注释以不阻塞 Events::poll()
$events->setTimeout(1000000); //在不阻塞时注释

while(true)
{
/*
...
你的代码。
...
*/

//读取所有可用事件
try
{
$event = NULL;
do
{
$event = $events->poll(); //如果有事件则返回非空值
if($event && $event->source == 'myChannel')
{
//看起来,目标在返回事件后被删除,
//所以再次添加它。
$events->addChannel($channel);
if(
$event->type == Event\Type::Read)
{
if(
is_array($event->value) && count($event->value) > 0)
{
if(
$event->value['name'] == 'stop')
{
echo
'Stopping thread';
return;
//停止
}
else
{
echo
'Event: '.$event->value['name'].' => '.$event->value['value'].PHP_EOL;
}
}
}
else if(
$event->type == Event\Type::Close) return; //停止
}
}
while(
$event);
}
catch(
Events\Error\Timeout $ex)
{
//超时
echo 'Timeout'.PHP_EOL;
}
}
};

class
MyClass {
private
$runtime;
private
$future;
private
$channel;

public function
start() {
//创建运行时
$this->runtime = new Runtime();

//创建缓冲通道。
//缓冲通道不会阻塞 Channel::send()。
//注意,目标名称在进程中需要是唯一的。
$this->channel = Channel::make('myChannel', Channel::Infinite);

global
$myThread;
$this->future = $this->runtime->run($myThread, [$this->channel]);
}

public function
stop() {
$this->channel->send(['name' => 'stop', 'value' => true]);

$this->future->value(); //等待线程完成
$this->channel->close();
}

public function
emit(string $name, $value)
{
$this->channel->send(['name' => $name, 'value' => $value]);
}
}

$a = new MyClass();
$a->start();

for(
$i = 0; $i < 5; $i++)
{
$a->emit('test', $i);
sleep(0.5);
}

sleep(2);

for(
$i = 5; $i < 10; $i++)
{
$a->emit('test', $i);
sleep(0.5);
}

$a->stop();
?>
1
gam6itko
2 years ago
<?php

// 这个示例展示了 Events 如何处理 Future 事件

use parallel\{Events, Events\Event, Runtime};

$events = new Events();

$runtime = new Runtime();

// 读取 (类型: 1)
$future = $runtime->run(
static function (
string $name) {
return
"Future#$name result";
},
[
'Read']
);
$events->addFuture("Future#Read", $future);

// 取消 (类型: 4)
$future = $runtime->run(
static function (
string $name) {
throw new
\Exception("Exception#$name");
},
[
'Cancel']
);
$events->addFuture("Future#Cancel", $future);

// 杀死 (类型: 5)
$future = $runtime->run(
static function () {
sleep(100000);
},
[]
);
$events->addFuture("Future#Kill", $future);
$future->cancel(); // 杀死它

// 错误 (类型: 6)
$future = $runtime->run(
static function () {
$memoryEater = [];
$i = 0;
while (++
$i) {
$memoryEater[] = $i;
}
}
);
$events->addFuture("Future#Error", $future);

// 读取事件

/** @var Event $event */
foreach ($events as $i => $event) {
echo
str_pad('', 50, '=') . " EVENT_$i\n";
var_dump($event);
echo
"\n";
}
0
gam6itko
2 years ago
<?php

// 下面的例子展示了如何从主线程停止子线程

use parallel\{Channel, Events, Events\Event\Type, Runtime};

$fnThread = static function (Channel $channel) {
$events = new Events();
$events->addChannel($channel);
$events->setBlocking(false); // 不要在 Events::poll() 上阻塞

while (true) {
if (
$event = $events->poll()) {
// 似乎,目标在返回事件后被删除,所以需要再次添加。
$events->addChannel($channel);

if (
Type::Read === $event->type) {
echo
"- 收到值: ".$event->value.PHP_EOL;
} elseif (
Type::Close === $event->type) {
echo
"- 接收关闭事件".PHP_EOL;
return
"我完成了";
}

}
echo
"- \n";
usleep(500_000);
}
};

// 主线程
$runtime = new Runtime();
$channel = new Channel();
$future = $runtime->run($fnThread, [$channel]);

$channel->send('message1');
sleep(2);
$channel->send('message2');
sleep(2);

echo
"关闭通道\n";
$channel->close();
echo
"future 说: ".$future->value();
echo
PHP_EOL;
To Top