看起来 msg_receive() 分配大小为 $maxsize 的内存,然后才尝试将队列中的消息接收到底层内存中。因为我的脚本在 $maxsize = 1 Gib 时崩溃,但在 $maxsize = 10 Kib 时工作。
(PHP 4 >= 4.3.0, PHP 5, PHP 7, PHP 8)
msg_receive — 从消息队列接收消息
$queue
,$desired_message_type
,&$received_message_type
,$max_message_size
,&$message
,$unserialize
= true
,$flags
= 0,&$error_code
= null
msg_receive() 将接收来自指定 queue
的第一个类型为 desired_message_type
的消息。
queue
消息队列。
desired_message_type
如果 desired_message_type
为 0,则返回队列前端的消息。如果 desired_message_type
大于 0,则返回该类型的第一个消息。如果 desired_message_type
小于 0,则读取队列中类型小于或等于 desired_message_type
绝对值的第一个消息。如果没有任何消息符合条件,脚本将等待直到有合适的到消息到达队列。可以通过在 flags
参数中指定 MSG_IPC_NOWAIT
来防止脚本阻塞。
received_message_type
接收到的消息的类型将存储在此参数中。
max_message_size
max_message_size
指定了要接受的消息的最大大小;如果队列中的消息大于此大小,则函数将失败(除非您按如下所述设置 flags
)。
message
除非接收消息时出错,否则接收到的消息将存储在 message
中。
unserialize
如果设置为 true
,则消息的处理方式与会话模块使用的序列化机制相同。消息将被反序列化,然后返回到您的脚本。这允许您轻松地从其他 PHP 脚本接收数组或复杂的 对象结构,或者如果您使用 WDDX 序列化器,则可以从任何与 WDDX 兼容的源接收。
如果 unserialize
为 false
,则消息将作为二进制安全字符串返回。
flags
可选的 flags
允许您将标志传递给底层的 msgrcv 系统调用。它默认为 0,但您可以指定一个或多个以下值(通过添加或 OR 运算组合它们)。
MSG_IPC_NOWAIT |
如果没有 desired_message_type 的消息,则立即返回,并且不等待。函数将失败并返回与 MSG_ENOMSG 对应的整数值。 |
MSG_EXCEPT |
将此标志与大于 0 的 desired_message_type 组合使用,将使函数接收第一个不等于 desired_message_type 的消息。 |
MSG_NOERROR |
如果消息长度超过 max_message_size ,则设置此标志将截断消息到 max_message_size ,并且不会发出错误信号。 |
error_code
如果函数失败,则可选的 error_code
将设置为系统 errno 变量的值。
成功完成后,消息队列数据结构将如下更新:msg_lrpid
设置为调用进程的进程 ID,msg_qnum
减 1,msg_rtime
设置为当前时间。
看起来 msg_receive() 分配大小为 $maxsize 的内存,然后才尝试将队列中的消息接收到底层内存中。因为我的脚本在 $maxsize = 1 Gib 时崩溃,但在 $maxsize = 10 Kib 时工作。
<?php error_reporting(E_ALL);
/**
* 通过 System V 消息队列发送和接收消息的示例
*
* 要尝试此脚本,请同步/异步运行两次。一次使用 ?typ=send,一次使用 ?typ=receive
*
* @author Thomas Eimers - Mehrkanal GmbH
*
* 本文档的发布旨在希望能有所帮助,但不提供任何担保;
* 甚至不包含对适销性或特定用途适用性的暗示担保。
*/
header('Content-Type: text/plain; charset=ISO-8859-1');
echo "开始...\n";
// 创建 System V 消息队列。整数值是队列的编号
$queue = msg_get_queue(100379);
// 发送选项
$message='nachricht'; // 传输数据
$serialize_needed=false; // 传输数据是否需要序列化?
$block_send=false; // 如果消息无法发送(队列已满…)(true/false)则阻塞
$msgtype_send=1; // 任何大于 0 的整数。它为每条消息签名。因此,您可以在一个队列中处理多条消息
// 类型。
// 接收选项
$msgtype_receive=1; // 我们要接收哪种类型的消息?(这里,类型与我们发送的类型相同,
// 但如果您将其设置为 0,则接收队列中任何类型的下一条消息。
$maxsize=100; // 您想要接收的最大数据长度。
$option_receive=MSG_IPC_NOWAIT; // 如果队列中没有所需类型的消息,则继续运行而无需等待。
// 如果设置为 NULL,则等待消息。
// 发送或接收 20 条消息
for ($i=0;$i<20;$i++) {
sleep(1);
// 此处发送消息
if ($_GET['typ']=='send') {
if(msg_send($queue,$msgtype_send, $message,$serialize_needed, $block_send,$err)===true) {
echo "消息已发送。\n";
} else {
var_dump($err);
}
// 此处接收消息
} else {
$queue_status=msg_stat_queue($queue);
echo '队列中的消息数量: '.$queue_status['msg_qnum']."\n";
// 注意:仅仅因为前一行代码中队列中还有消息,并不意味着现在仍然如此!
if ($queue_status['msg_qnum']>0) {
if (msg_receive($queue,$msgtype_receive ,$msgtype_erhalten,$maxsize,$daten,$serialize_needed, $option_receive, $err)===true) {
echo "接收到的数据".$daten."\n";
} else {
var_dump($err);
}
}
}
}
?>
似乎 2MB 的最大大小是 php 的某种阈值,超过这个值,msg_receive() 就会开始占用大量 CPU(对于持续推送消息的发送者,接收 10000 条消息在我的计算机上会从 0.01 秒跳到 1.5 秒),所以如果可以的话,尽量保持在该阈值以下。
例如考虑一下 Linux 的情况
<?php
// 文件 send.php
$ip = msg_get_queue(12340);
msg_send($ip,8,"abcd",false,false,$err);
//-----------------------------------------------------
<?php
// 文件 receive.php
$ip = msg_get_queue(12340);
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "msgtype "{$msgtype}" data "{$data}"\n";
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "msgtype "{$msgtype}" data "{$data}"\n";
?>
现在运行
在终端 #1 中:php5 receive.php
在终端 #2 中:php5 receive.php
在终端 #3 中:php5 send.php
显示来自队列的消息将交替出现。这意味着您运行一次 send.php,消息将显示在终端 #1 中。第二次运行它将在 t#2 中,第三次在 #1 中,依此类推。
这应该以您的 apache 用户身份在终端中运行,在 msg_send 的注释中调用脚本,它们将进行通信。
#! /usr/bin/env php
<?php
$MSGKEY = 519051; // 消息
$msg_id = msg_get_queue ($MSGKEY, 0600);
while (1) {
if (msg_receive ($msg_id, 1, $msg_type, 16384, $msg, true, 0, $msg_error)) {
if ($msg == 'Quit') break;
echo "$msg\n";
} else {
echo "接收 $msg_error 获取消息失败\n";
break;
}
}
msg_remove_queue ($msg_id);
?>