PHP 大会日本 2024

Pool::collect

(PECL pthreads >= 2.0.0)

Pool::collect收集已完成任务的引用

描述

public Pool::collect(Callable $collector = ?): int

允许池收集由可选提供的收集器确定为垃圾的引用。

参数

collector

一个 Callable 收集器,它返回一个布尔值,表示任务是否可以被收集。仅在极少数情况下需要使用自定义收集器。

返回值

池中剩余待收集的任务数量。

变更日志

版本 描述
PECL pthreads 3.0.0 现在返回一个整数,并且 collector 参数现在是可选的。

示例

示例 #1 Pool::collect() 的基本示例

<?php
$pool
= new Pool(4);

for (
$i = 0; $i < 15; ++$i) {
$pool->submit(new class extends Threaded {});
}

while (
$pool->collect()); // 阻塞,直到所有任务执行完毕

$pool->shutdown();

添加注释

用户贡献的注释 4 条注释

meadowsjared at gmail dot com
3 年前
在此示例中,它展示了如何使用带池的 Threaded 获取结果数组,使用 pThreads v3.2.1 和 php 7.3.23

<?php
class TestWork extends Threaded {
//更新后的版本,适用于 pThreads v3.2.1 和 php 7.3.23
protected $complete;
//$pData 是发送到您的工作线程以执行其工作的数据。
public function __construct($pData) {
//将所有变量传递到局部变量
$this->complete = false;
$this->testData = $pData;
}
//这是所有工作将完成的地方。
public function run() {
usleep(2000000); //休眠 2 秒以模拟大型作业
$this->complete = true;
}
public function
isDone() {
return
$this->complete;
}
}
class
ExamplePool extends Pool {
public
$data = array(); // 用于在完成工作后返回数据
private $numTasks = 0; // 用于知道何时完成的计数器
/**
* 重写父类的 submit 函数
* 以跟踪我们的作业
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* 用于等待所有工作线程完成
*/
public function process() {
// 只要池中有作业,就运行此循环
// 作业
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// 如果任务被标记为已完成,则收集其结果
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//这就是您如何获取已完成的数据 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isDone();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //获取所有结果
echo '<pre>';
print_r($retArr); //返回结果数组(以及可能的错误)
echo '</pre>';
?>
your dot brother dot t at hotmail dot com
9 年前
示例代码崩溃并浪费了我 2 天的工作时间
首先,`Stackable` 没有名为 $worker 的属性,或者其访问方法使其无法访问。

其次,`Stackable` 也没有 `getThreadId()` 方法。最佳实践是使用 `Thread` 类来实现线程,因为它拥有更多控制函数。最好使用 `Stackable` 来存储对象,并使用它的 `run()` 方法作为初始化。

工作示例如下:

<?php
class MyWork extends Thread {
protected
$complete;

public function
__construct() {
$this->complete = false;
}

public function
run() {
printf(
"Hello from %s in Thread #%lu\n",
__CLASS__, $this->getThreadId());
$this->complete = true;
}

public function
isComplete() {
return
$this->complete;
}
}

class
Something {}

class
MyWorker extends Worker {

public function
__construct(Something $something) {
$this->something = $something;
}

public function
run() {
/** ... **/
}
}

$pool = new Pool(8, \MyWorker::class, [new Something()]);
$pool->submit(new MyWork());

usleep(1000);

$pool->collect(function($work){
return
$work->isComplete();
});
var_dump($pool);
?>
meadowsjared at gmail dot com
8年前
请注意,使用 `collect` 函数时,务必扩展 `Pool` 类,以便您可以持续检查已完成的线程,直到所有线程都完成。

<?php
class TestWork extends Threaded {
protected
$complete;
//$pData 是发送到工作线程以执行其任务的数据。
public function __construct($pData){
//将所有变量传递到局部变量
$this->complete = false;
$this->testData = $pData;
}
//这是所有工作将完成的地方。
public function run(){
usleep(2000000); //休眠 2 秒以模拟大型作业
$this->complete = true;
}
public function
isGarbage() {
return
$this->complete;
}
}
class
ExamplePool extends Pool
{
public
$data = array();
public function
process()
{
// 只要池中有作业,就运行此循环
// 作业
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// 如果任务被标记为已完成
// 收集其结果
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//这就是您如何获取已完成的数据 [通过 $pool->process() 访问]
$this->data[] = $tmpObj;
}
return
$task->isGarbage();
});
}
// 所有作业都已完成
// 我们可以关闭池
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //获取所有结果
echo '<pre>';
print_r($retArr); //返回结果数组(以及可能的错误)
echo '</pre>';
?>
l00k at protonmail dot com
7年前
此示例演示了使用 pthreads 的 MTP 的各个方面,尤其值得注意的是与子线程的双向通信。
我找不到任何相关信息,因此我想向您展示我的研究成果。



<?php

class Model
{

public
$id;
public
$value;

}

class
Connection
extends Worker
{

protected static
$link;


public function
__construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}

public function
getConnection()
{
if(!
self::$link)
{
echo
'线程: '. $this->getThreadId() ." 连接到数据库\n";
self::$link = new \PDO(...);
}

return
self::$link;
}

}

/** @property Connection $worker */
class QueryTask
extends Threaded
{

public
$data;
public
$result;

protected
$_complete;


public function
__construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}

public function
run()
{
/** @var \PDO $pdo */
$pdo = $this->worker->getConnection();

$text = '线程: '. $this->worker->getThreadId() .' 任务: '. $this->data->id .' 数据: '. $this->data->value;

$t = microtime(true);

$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '"
. $text ."')
"
);
$stmt->execute();

$dt = microtime(true) - $t;

$result = (int) $stmt->rowCount();

echo
$text .' 结果: '. $result .' 执行时间: '. $dt ."s\n";

$this->result = $result;
$this->_complete = true;
}

public function
isGarbage() : bool
{
return
$this->_complete;
}

}

$t = microtime(true);

// 启动
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);

// 任务
$tasks = 10;

for(
$i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();

$pool->submit(new QueryTask($object));
}

// 等待完成
$data = [];

while(
1)
{
$newData = [];

$pool->collect(function(QueryTask $task) use (&$newData) {
if(
$task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;

$newData[ $task->data->id ] = $task->data->value;
}

return
$task->isGarbage();
});

$data = array_merge($data, $newData);

if(
count($data) == $tasks)
break;

usleep(100000);
}

var_dump($data);
?>

结果
线程: 6796 连接到数据库
线程: 3156 连接到数据库
线程: 9040 连接到数据库
线程: 7748 连接到数据库
线程: 8836 连接到数据库
任务: 0 完成,耗时: 0.0070011615753174s
任务: 4 完成,耗时: 0.0069999694824219s
任务: 2 完成,耗时: 0.0090010166168213s
任务: 3 完成,耗时: 0.0090010166168213s
任务: 1 完成,耗时: 0.003000020980835s
任务: 5 完成,耗时: 0.0069999694824219s
任务: 7 完成,耗时: 0.0079998970031738s
任务: 6 完成,耗时: 0.0049998760223389s
任务: 9 完成,耗时: 0.0079998970031738s
任务: 8 完成,耗时: 0.0069999694824219s

array(10) {
[0] =>
int(17730)
[1] =>
int(18771)
[2] =>
int(12944)
[3] =>
int(6025)
[4] =>
int(29582)
[5] =>
int(10159)
[6] =>
int(26556)
[7] =>
int(9029)
[8] =>
int(15002)
[9] =>
int(4043)
}

值得注意的地方
1. 为10个任务创建了5个工作线程。最后5个任务在已建立数据库连接的现有线程上运行。
2. 可以通过创建新任务并提交来“发送”数据到线程。
3. 可以通过collect函数检索结果。
4. 可以将简单对象传递到任务构造函数。
To Top