面向对象的 FIFO 通信进程
<?php
interface Communication {
public function receive($bytes = 1024);
public function getData();
public function clearData();
public function send($data);
}
class FIFOCommunication implements Communication {
private $fifos;
private $data;
static public function stream_fifo_open($fifoPath, $mode) {
if (pcntl_fork() == 0) {
if (! file_exists($fifoPath)) {
posix_mkfifo($fifoPath, $mode);
}
$fifo = fopen($fifoPath, 'w');
sleep(1);
exit(0);
}
else {
usleep(15);
return fopen($fifoPath, 'r');
}
}
static public function stream_fifo_write($fifoPath, $mode, $data) {
if (pcntl_fork() == 0) {
if (! file_exists($fifoPath)) {
posix_mkfifo($fifoPath, $mode);
}
$fifo = fopen($fifoPath, 'w');
fwrite($fifo, $data);
exit(0);
}
}
public function __construct(
$fifoInputName, $fifoInputMode,
$fifoOutputName, $fifoOutputMode
) {
$this->fifos = array(
'input' => array(
'file' => self::stream_fifo_open($fifoInputName, $fifoInputMode),
'mode' => $fifoInputMode,
'name' => $fifoInputName,
'use' => 'r',
),
'output' => array(
'mode' => $fifoOutputMode,
'name' => $fifoOutputName,
'use' => 'w'
)
);
}
public function remove($type = null) {
switch ($type) {
case 'input':
@unlink($this->fifos['input']['name']);
break;
case 'output':
@unlink($this->fifos['output']['name']);
break;
default:
@unlink($this->fifos['input']['name']);
@unlink($this->fifos['output']['name']);
}
}
public function receive($bytes = 1024) {
$readers = array($this->fifos['input']['file']);
if (stream_select($readers, $writers = null, $except = null, 0, 15) == 1) {
$data = fread($this->fifos['input']['file'], $bytes);
}
if (! empty($data)) {
$this->data .= $data;
return true;
}
else {
return false;
}
}
public function getData() {
return $this->data;
}
public function clearData() {
$this->data = null;
}
public function send($data) {
$fifoOutput = & $this->fifos['output'];
self::stream_fifo_write($fifoOutput['name'], $fifoOutput['mode'], $data);
}
}
$fifoCommunication = new FIFOCommunication(
getmypid() . '.input', 0600,
getmypid() . '.output', 0600
);
echo "COMMUNICATION STARTED\n";
while (true) {
if ($fifoCommunication->receive()) {
$data = $fifoCommunication->getData();
if ($data == "EXIT\n") {
break;
}
else {
$fifoCommunication->send('RECEIVED: ' . $fifoCommunication->getData());
}
}
sleep(1);
}
$fifoComunication->remove();
?>