PHP 会议日本 2024

Pool 类

(PECL pthreads >= 2.0.0)

介绍

池是用于调整数量的工作器的容器和控制器。

池化提供了工作器功能的更高级别的抽象,包括以 pthreads 所需的方式管理引用。

类概要

class Pool {
/* 属性 */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* 方法 */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
public resize(int $size): void
public shutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

属性

size

此池可以使用的工作器的最大数量

class

工作器的类

workers

对工作器的引用

ctor

新工作器构造函数的参数

last

workers 中最后一个使用的 Worker 的偏移量

目录

添加注释

用户贡献的注释 4 条注释

4
meadowsjared at gmail dot com
3 年前
在此示例中,它展示了如何使用池获取结果数组,使用 pThreads v3.2.1 和 php 7.3.23

<?php
class TestWork extends Threaded {
//更新后的版本,适用于 pThreads v3.2.1 和 php 7.3.23
protected $complete;
//$pData 是发送到工作线程的数据,用于执行其任务。
public function __construct($pData) {
//将所有变量传递到局部变量
$this->complete = false;
$this->testData = $pData;
}
//这是所有工作将完成的地方。
public function run() {
usleep(2000000); //休眠 2 秒以模拟大型任务
$this->complete = true;
}
public function
isDone() {
return
$this->complete;
}
}
class
ExamplePool extends Pool {
public
$data = array(); // 用于在完成后返回数据
private $numTasks = 0; // 用于知道何时完成的计数器
/**
* 重写父类的 submit 函数
* 以跟踪我们的作业
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* 用于等待所有工作线程完成
*/
public function process() {
// 只要池中还有作业
// 就运行此循环
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// 如果任务标记为已完成,则收集其结果
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
// 这是您获取已完成数据的方法 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isDone();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); // 获取所有结果
echo '<pre>';
print_r($retArr); // 返回结果数组(可能还有错误)
echo '</pre>';
?>
6
meadowsjared at gmail dot com
8 年前
请注意,使用 collect 函数时,务必扩展 pool 类,以便您可以持续检查已完成的线程,直到它们全部完成。

<?php
class TestWork extends Threaded {
protected
$complete;
//$pData 是发送到工作线程的数据,用于执行其任务。
public function __construct($pData){
//将所有变量传递到局部变量
$this->complete = false;
$this->testData = $pData;
}
//这是所有工作将完成的地方。
public function run(){
usleep(2000000); //休眠 2 秒以模拟大型任务
$this->complete = true;
}
public function
isGarbage() {
return
$this->complete;
}
}
class
ExamplePool extends Pool
{
public
$data = array();
public function
process()
{
// 只要池中还有作业
// 就运行此循环
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// 如果任务标记为已完成
// 收集其结果
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
// 这是您获取已完成数据的方法 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isGarbage();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); // 获取所有结果
echo '<pre>';
print_r($retArr); // 返回结果数组(可能还有错误)
echo '</pre>';
?>
2
olavk
9 年前
使用可收集线程(基本上是为池设计的线程)和池的简单示例

<?php

class job extends Collectable {
public
$val;

public function
__construct($val){
// 初始化一些属性
$this->val = $val;
}
public function
run(){
// 执行一些工作
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
$this->setGarbage();
}
}

// 最多3个线程同时工作
$p = new Pool(3);

$tasks = array(
new
job('0'),
new
job('1'),
new
job('2'),
new
job('3'),
new
job('4'),
new
job('5'),
new
job('6'),
new
job('7'),
new
job('8'),
new
job('9'),
new
job('10'),
);
// 将任务添加到池队列
foreach ($tasks as $task) {
$p->submit($task);
}

// shutdown 将等待当前队列完成
$p->shutdown();
// 垃圾回收检查/读取结果
$p->collect(function($checkingTask){
echo
$checkingTask->val;
return
$checkingTask->isGarbage();
});

?>
-5
fajan
10年前
示例类,用于演示 Pool/Worker 机制的用法,以及展示一些技巧和提示;)
<?php
class Config extends Threaded{ // 共享全局对象
protected $val=0, $val2=0;
protected function
inc(){++$this->val;} // protected 通过对象同步
public function inc2(){++$this->val2;} // 没有同步
}
class
WorkerClass extends Worker{
protected static
$worker_id_next = -1;
protected
$worker_id;
protected
$config;
public function
__construct($config){
$this->worker_id = ++static::$worker_id_next; // 静态成员在线程中不可用,但在“主线程”中可用
$this->config = $config;
}
public function
run(){
global
$config;
$config = $this->config; // 注意:通过引用设置将不起作用
global $worker_id;
$worker_id = $this->worker_id;
echo
"工作上下文 {$worker_id} 已创建!\n";
//$this->say_config(); // 全局同步函数。
}
protected function
say_config(){ // 'protected' 通过对象同步,因此在多个实例之间不起作用
global $config; // 可以使用共享的 $config 对象作为同步源。
$config->synchronized(function() use (&$config){ // 注意:这里可以使用闭包,但如果将闭包附加到 Threaded 对象,则它将被销毁,因为无法序列化
var_dump($config);
});
}
}
class
Task extends Stackable{ // Stackable 仍然存在,只是在文档中消失了(可能是错误)。有关更多详细信息,请参阅旧版本的文档。
protected $set;
public function
__construct($set){
$this->set = $set;
}
public function
run(){
global
$worker_id;
echo
"任务正在 {$worker_id} 中运行!\n";
usleep(mt_rand(1,100)*100);
$config = $this->getConfig();
$val = $config->arr->shift();
$config->arr[] = $this->set;
for (
$i = 0 ; $i < 1000; ++$i){
$config->inc();
$config->inc2();
}
}
public function
getConfig(){
global
$config; // WorkerClass 在线程的范围内设置了这一点,可以由 Tasks 重用作为额外的异步数据源。(例如:连接池或任务队列到解复用器)
return $config;
}
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class
PoolClass extends Pool{
public function
worker_list(){
if (
$this->workers !== null)
return
array_keys($this->workers);
return
null;
}
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10)); // submitTo 不会尝试创建工作进程

$spammed_id = -1;
for (
$i = 1; $i <= 100; ++$i){ // 添加一些作业
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
$spammed_id = $x[2];
echo
"持续向工作进程 {$spammed_id} 发送大量任务\n";
}
if (
$spammed_id != -1 && ($i % 5) == 0) // 每 5 个作业路由到一个工作进程,因此它拥有总作业的 20%(使用 3 个工作进程,它应该执行 ~33%,现在它有 (33+20)%,因此只有在计划执行平衡时才委派给工作进程……)
$pool->submitTo($spammed_id,new Task(10*$i));
else
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // “val” 正好是 100000,“val2” 可能略小
// 此外:如果禁用垃圾邮件发送器,你会发现“arr”的顺序是随机的。
?>
To Top