看起来 msg_receive() 会分配一个大小为 $maxsize 的内存,然后才尝试从队列中接收消息到分配的内存中。因为我的脚本在 $maxsize = 1 Gib 时会死掉,但在 $maxsize = 10 Kib 时可以正常工作。
(PHP 4 >= 4.3.0, PHP 5, PHP 7, PHP 8)
msg_receive — 从消息队列接收消息
$queue
,$desired_message_type
,&$received_message_type
,$max_message_size
,&$message
,$unserialize
= true
,$flags
= 0,&$error_code
= null
msg_receive() 将从指定的 queue
中接收第一个类型为 desired_message_type
的消息。
queue
消息队列。
desired_message_type
如果 desired_message_type
为 0,则返回队列前端的消息。如果 desired_message_type
大于 0,则返回该类型的第一条消息。如果 desired_message_type
小于 0,则读取队列中类型小于或等于 desired_message_type
绝对值的第一个消息。如果没有消息匹配条件,您的脚本将等待直到队列中出现合适的消息。您可以通过在 flags
参数中指定 MSG_IPC_NOWAIT
来阻止脚本阻塞。
received_message_type
接收到的消息类型将存储在此参数中。
max_message_size
max_message_size
指定了要接受的最大消息大小;如果队列中的消息大于此大小,则函数将失败(除非您按如下所示设置 flags
)。
message
接收到的消息将存储在 message
中,除非在接收消息时出现错误。
unserialize
如果设置为 true
,则消息将被视为使用与会话模块相同的机制序列化。消息将被反序列化,然后返回到您的脚本。这使您可以轻松地从其他 PHP 脚本接收数组或复杂的对象结构,或者如果您正在使用 WDDX 序列化器,则可以从任何与 WDDX 兼容的源接收。
如果 unserialize
为 false
,则消息将作为二进制安全字符串返回。
flags
可选的 flags
允许您将标志传递给低级 msgrcv 系统调用。它默认为 0,但您可以指定以下一个或多个值(通过将它们加在一起或进行 OR 运算)。
MSG_IPC_NOWAIT |
如果没有 desired_message_type 的消息,立即返回,不要等待。函数将失败并返回一个整数值,对应于 MSG_ENOMSG 。 |
MSG_EXCEPT |
在与大于 0 的 desired_message_type 结合使用时,此标志将导致函数接收第一个不等于 desired_message_type 的消息。 |
MSG_NOERROR |
如果消息长度超过 max_message_size ,设置此标志将截断消息为 max_message_size ,并且不会发出错误信号。 |
error_code
如果函数失败,可选的 error_code
将设置为系统 errno 变量的值。
成功完成后,消息队列数据结构将更新如下:msg_lrpid
设置为调用进程的进程 ID,msg_qnum
递减 1,msg_rtime
设置为当前时间。
看起来 msg_receive() 会分配一个大小为 $maxsize 的内存,然后才尝试从队列中接收消息到分配的内存中。因为我的脚本在 $maxsize = 1 Gib 时会死掉,但在 $maxsize = 10 Kib 时可以正常工作。
<?php error_reporting(E_ALL);
/**
* 使用 System V 消息队列发送和接收消息的示例
*
* 要尝试此脚本,请同步/异步运行它两次。 一次使用 ?typ=send,一次使用 ?typ=receive
*
* @author Thomas Eimers - Mehrkanal GmbH
*
* 本文档以“按现状”提供,不附带任何形式的明示或暗示保证,包括但不限于对适销性或特定用途的适用性的暗示保证。
*/
header('Content-Type: text/plain; charset=ISO-8859-1');
echo "开始...\n";
// 创建 System V 消息队列。 整数是队列号
$queue = msg_get_queue(100379);
// 发送选项
$message='nachricht'; // 传输数据
$serialize_needed=false; // 传输数据是否需要序列化?
$block_send=false; // 如果消息无法发送(队列已满...),则阻塞(true/false)
$msgtype_send=1; // 任何大于 0 的整数。 它为每条消息签名。 因此,您可以在一个队列中处理多条消息
// 类型。
// 接收选项
$msgtype_receive=1; // 我们想要接收哪种类型的消息? (这里,类型与发送的类型相同,
// 但是,如果您将其设置为 0,则会接收队列中下一个任何类型的消息。
$maxsize=100; // 您希望接收的最大数据长度。
$option_receive=MSG_IPC_NOWAIT; // 如果队列中没有所需类型的消息,则在不等待的情况下继续。
// 如果设置为 NULL,则等待消息。
// 发送或接收 20 条消息
for ($i=0;$i<20;$i++) {
sleep(1);
// 此操作发送
if ($_GET['typ']=='send') {
if(msg_send($queue,$msgtype_send, $message,$serialize_needed, $block_send,$err)===true) {
echo "消息已发送。\n";
} else {
var_dump($err);
}
// 此操作接收
} else {
$queue_status=msg_stat_queue($queue);
echo '队列中的消息: '.$queue_status['msg_qnum']."\n";
// 警告:即使在上一行代码之前队列中还有消息,现在也可能不再是这种情况!
if ($queue_status['msg_qnum']>0) {
if (msg_receive($queue,$msgtype_receive ,$msgtype_erhalten,$maxsize,$daten,$serialize_needed, $option_receive, $err)===true) {
echo "已接收数据".$daten."\n";
} else {
var_dump($err);
}
}
}
}
?>
似乎 2Mb 的最大大小是 php 的某种阈值,超过该阈值,msg_receive() 将开始使用大量 CPU(对于不断推送消息的发送方,接收 10000 条消息在我的计算机上从 0.01 秒跳到 1.5 秒)所以,如果可以,请尽量保持在该阈值以下。
例如,考虑这种 Linux 环境
<?php
// 文件 send.php
$ip = msg_get_queue(12340);
msg_send($ip,8,"abcd",false,false,$err);
//-----------------------------------------------------
<?php
// 文件 receive.php
$ip = msg_get_queue(12340);
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "msgtype {$msgtype} data {$data}\n";
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "msgtype {$msgtype} data {$data}\n";
?>
现在运行
在终端 #1 中,运行 php5 receive.php
在终端 #2 中,运行 php5 receive.php
在终端 #3 中,运行 php5 send.php
显示来自队列的消息将交替出现。 也就是说,您运行一次 send.php,消息将显示在终端 #1 中。 第二次运行,它将显示在 t#2 中,第三次运行在 #1 中,依此类推。
这应该在终端中作为您的 apache 用户运行,在 msg_send 的笔记中调用脚本,它们将进行通信。
#! /usr/bin/env php
<?php
$MSGKEY = 519051; // 消息
$msg_id = msg_get_queue ($MSGKEY, 0600);
while (1) {
if (msg_receive ($msg_id, 1, $msg_type, 16384, $msg, true, 0, $msg_error)) {
if ($msg == 'Quit') break;
echo "$msg\n";
} else {
echo "收到 $msg_error 获取消息\n";
break;
}
}
msg_remove_queue ($msg_id);
?>