此示例展示了 MTP 与 pthreads 的各个方面 - 特别值得注意的是,它与子线程的双向通信。
我找不到关于这方面的任何资料,因此我想向您展示我的研究结果。
<?php
class Model
{
public $id;
public $value;
}
class Connection
extends Worker
{
protected static $link;
public function __construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}
public function getConnection()
{
if(!self::$link)
{
echo '线程: '. $this->getThreadId() ." 连接到数据库\n";
self::$link = new \PDO(...);
}
return self::$link;
}
}
class QueryTask
extends Threaded
{
public $data;
public $result;
protected $_complete;
public function __construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}
public function run()
{
$pdo = $this->worker->getConnection();
$text = '线程: '. $this->worker->getThreadId() .' 任务: '. $this->data->id .' 数据: '. $this->data->value;
$t = microtime(true);
$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')
");
$stmt->execute();
$dt = microtime(true) - $t;
$result = (int) $stmt->rowCount();
echo $text .' 结果: '. $result .' 执行时间: '. $dt ."s\n";
$this->result = $result;
$this->_complete = true;
}
public function isGarbage() : bool
{
return $this->_complete;
}
}
$t = microtime(true);
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);
$tasks = 10;
for($i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();
$pool->submit(new QueryTask($object));
}
$data = [];
while(1)
{
$newData = [];
$pool->collect(function(QueryTask $task) use (&$newData) {
if($task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
$newData[ $task->data->id ] = $task->data->value;
}
return $task->isGarbage();
});
$data = array_merge($data, $newData);
if(count($data) == $tasks)
break;
usleep(100000);
}
var_dump($data);
?>
结果
线程: 6796 连接到数据库
线程: 3156 连接到数据库
线程: 9040 连接到数据库
线程: 7748 连接到数据库
线程: 8836 连接到数据库
任务: 0 完成时间: 0.0070011615753174s
任务: 4 完成时间: 0.0069999694824219s
任务: 2 完成时间: 0.0090010166168213s
任务: 3 完成时间: 0.0090010166168213s
任务: 1 完成时间: 0.003000020980835s
任务: 5 完成时间: 0.0069999694824219s
任务: 7 完成时间: 0.0079998970031738s
任务: 6 完成时间: 0.0049998760223389s
任务: 9 完成时间: 0.0079998970031738s
任务: 8 完成时间: 0.0069999694824219s
array(10) {
[0] =>
int(17730)
[1] =>
int(18771)
[2] =>
int(12944)
[3] =>
int(6025)
[4] =>
int(29582)
[5] =>
int(10159)
[6] =>
int(26556)
[7] =>
int(9029)
[8] =>
int(15002)
[9] =>
int(4043)
}
值得注意的地方
1. 为 10 个任务创建 5 个 worker。 5 个最后任务在已经建立数据库连接的现有线程上运行。
2. 通过创建新任务并提交,可以将数据“发送”到线程。
3. 可以使用 collect 函数检索结果。
4. 可以将简单的对象传递给任务构造函数。