这是一个更实质性的关于如何使用 run 函数式 API 的示例。
<?php
use \parallel\{Runtime, Future, Channel, Events};
$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
$batchSize = ceil($totalIds / $workers);
$lastBatch = $totalIds % $batchSize;
$rowsToFetch = 5000;
print "总 ID: " . $totalIds . "\n";
print "批次大小: " . $batchSize . "\n";
print "最后一批: " . $lastBatch . "\n";
$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;
print "工作器 " . $worker . " 处理 ID 从 " . $startId . " 到 " . $endId . "\n";
while($tempMinId < $endId) {
for($i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print "工作器 " . $worker . " 完成批次 " . $fetchCount . " 从 ID " . $tempMinId . " 到 " . $tempMaxId . "\n";
break;
}
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if($tempMaxId > $endId) {
$tempMaxId = $endId;
}
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}
print "工作器 " . $worker . " 完成\n";
};
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if($i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}
?>