Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 是一个用于容纳和控制可调节数量的 Worker 的容器。

池提供了一个更高层次的 Worker 功能抽象,包括按照 pthreads 要求的方式管理引用。

类概要

class Pool {
/* 属性 */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* 方法 */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
public resize(int $size): void
public shutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

属性

size

此池可使用的 Worker 的最大数量

class

Worker 的类

workers

对 Worker 的引用

ctor

新 Worker 构造函数的参数

last

在 workers 中使用的最后一个 Worker 的偏移量

目录

添加注释

用户贡献的注释 4 个注释

4
meadowsjared at gmail dot com
3 年前
在此示例中,它展示了如何使用池来获取结果数组,使用 pThreads v3.2.1 和 php 7.3.23

<?php
class TestWork extends Threaded {
// 更新后的版本,适用于 pThreads v3.2.1 和 php 7.3.23
protected $complete;
//$pData 是发送到 worker 线程以执行其工作的数据。
public function __construct($pData) {
// 将所有变量转移到局部变量
$this->complete = false;
$this->testData = $pData;
}
// 这是所有工作将要完成的地方。
public function run() {
usleep(2000000); // 休眠 2 秒以模拟大型作业
$this->complete = true;
}
public function
isDone() {
return
$this->complete;
}
}
class
ExamplePool extends Pool {
public
$data = array(); // 用于在完成工作后返回数据
private $numTasks = 0; // 用于知道何时完成的计数器
/**
* 覆盖父类的 submit 函数
* 以跟踪我们的作业
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* 用于等待所有 worker 完成
*/
public function process() {
// 只要池中有作业,就运行此循环
//
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// 如果任务被标记为已完成,则收集其结果
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
// 这是你获取已完成数据的方式 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isDone();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); // 获取所有结果
echo '<pre>';
print_r($retArr); // 返回结果数组(以及可能的错误)
echo '</pre>';
?>
6
meadowsjared at gmail dot com
8 年前
请注意,使用 collect 函数时,重要的是您需要扩展 pool 类,以便您可以继续检查已完成的线程,直到它们全部完成。

<?php
class TestWork extends Threaded {
protected
$complete;
//$pData 是发送到您的工作线程以执行其工作的数据。
public function __construct($pData){
//将所有变量传递到局部变量
$this->complete = false;
$this->testData = $pData;
}
//这里将完成所有工作。
public function run(){
usleep(2000000); //休眠 2 秒以模拟一个大型作业
$this->complete = true;
}
public function
isGarbage() {
return
$this->complete;
}
}
class
ExamplePool extends Pool
{
public
$data = array();
public function
process()
{
// 只要池中有作业,就运行此循环
// 作业
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// 如果某个任务被标记为已完成
// 收集其结果
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
// 这是您将完成的数据取回的方式 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isGarbage();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //获取所有结果
echo '<pre>';
print_r($retArr); //返回结果数组(以及可能发生的错误)
echo '</pre>';
?>
2
olavk
9 年前
一个简单的示例,使用 Collectable(基本上是为 Pool 设计的线程)和 Pool

<?php

class job extends Collectable {
public
$val;

public function
__construct($val){
// 初始化一些属性
$this->val = $val;
}
public function
run(){
// 执行一些工作
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
$this->setGarbage();
}
}

// 最多 3 个线程将同时工作
$p = new Pool(3);

$tasks = array(
new
job('0'),
new
job('1'),
new
job('2'),
new
job('3'),
new
job('4'),
new
job('5'),
new
job('6'),
new
job('7'),
new
job('8'),
new
job('9'),
new
job('10'),
);
// 将任务添加到池队列
foreach ($tasks as $task) {
$p->submit($task);
}

// shutdown 将等待当前队列完成
$p->shutdown();
// 垃圾回收检查/读取结果
$p->collect(function($checkingTask){
echo
$checkingTask->val;
return
$checkingTask->isGarbage();
});

?>
-5
fajan
10 年前
演示 Pool/Worker 机制用法,以及一些技巧和提示的示例类 ;)
<?php
class Config extends Threaded{ // 共享全局对象
protected $val=0, $val2=0;
protected function
inc(){++$this->val;} // 受对象保护的同步
public function inc2(){++$this->val2;} // 无同步
}
class
WorkerClass extends Worker{
protected static
$worker_id_next = -1;
protected
$worker_id;
protected
$config;
public function
__construct($config){
$this->worker_id = ++static::$worker_id_next; // 静态成员在线程中不可用,但在“主线程”中可用
$this->config = $config;
}
public function
run(){
global
$config;
$config = $this->config; // 注意:通过引用设置不会起作用
global $worker_id;
$worker_id = $this->worker_id;
echo
"working context {$worker_id} is created!\n";
//$this->say_config(); // 全局同步函数。
}
protected function
say_config(){ // 'protected' 受对象保护的同步,因此在多个实例之间不起作用
global $config; // 你可以使用共享的 $config 对象作为同步源。
$config->synchronized(function() use (&$config){ // 注意:你可以在此处使用闭包,但如果你将闭包附加到 Threaded 对象,它将被销毁,因为无法序列化
var_dump($config);
});
}
}
class
Task extends Stackable{ // Stackable 仍然存在,只是它从文档中消失了(可能是错误)。查看旧版本文档以获取更多详细信息。
protected $set;
public function
__construct($set){
$this->set = $set;
}
public function
run(){
global
$worker_id;
echo
"task is running in {$worker_id}!\n";
usleep(mt_rand(1,100)*100);
$config = $this->getConfig();
$val = $config->arr->shift();
$config->arr[] = $this->set;
for (
$i = 0 ; $i < 1000; ++$i){
$config->inc();
$config->inc2();
}
}
public function
getConfig(){
global
$config; // WorkerClass 在线程的范围内设置了这个,可以被任务重用作为额外的异步数据源。(例如:连接池或任务队列到解复用器)
return $config;
}
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class
PoolClass extends Pool{
public function
worker_list(){
if (
$this->workers !== null)
return
array_keys($this->workers);
return
null;
}
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10)); // submitTo 不会尝试创建 worker

$spammed_id = -1;
for (
$i = 1; $i <= 100; ++$i){ // 添加一些作业
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
$spammed_id = $x[2];
echo
"spamming worker {$spammed_id} with lots of tasks from now on\n";
}
if (
$spammed_id != -1 && ($i % 5) == 0) // 每 5 个作业路由到一个 worker,因此它拥有总作业的 20%(使用 3 个 worker 它应该执行约 33%,现在它有 (33+20)%,所以只有在你计划执行平衡时才委派给 worker...)
$pool->submitTo($spammed_id,new Task(10*$i));
else
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" 恰好为 100000,"val2" 可能略少
// 此外:如果你禁用垃圾邮件发送者,你会发现 "arr" 的顺序是随机的。
?>
To Top