看起來 msg_receive() 會分配大小為 $maxsize 的記憶體,然後才嘗試從佇列接收訊息到已分配的記憶體中。因為我的腳本在 $maxsize = 1 Gib 時會失效,但在 $maxsize = 10 Kib 時可以正常運作。
(PHP 4 >= 4.3.0, PHP 5, PHP 7, PHP 8)
msg_receive — 從訊息佇列接收訊息
$queue
,$desired_message_type
,&$received_message_type
,$max_message_size
,&$message
,$unserialize
= true
,$flags
= 0,&$error_code
= null
msg_receive() 會從指定的 queue
中接收第一個訊息,訊息類型由 desired_message_type
指定。
queue
訊息佇列。
desired_message_type
如果 desired_message_type
為 0,則會返回佇列最前面的訊息。如果 desired_message_type
大於 0,則會返回該類型的第一個訊息。如果 desired_message_type
小於 0,則會讀取佇列上類型小於或等於 desired_message_type
絕對值的第一個訊息。如果沒有符合條件的訊息,您的腳本將會等待,直到佇列中出現合適的訊息。您可以透過在 flags
參數中指定 MSG_IPC_NOWAIT
來防止腳本阻塞。
received_message_type
接收到的訊息類型將會儲存在此參數中。
max_message_size
可接受的訊息最大大小由 max_message_size
指定;如果佇列中的訊息大於此大小,則函式將會失敗(除非您設定 flags
,如下所述)。
message
接收到的訊息將會儲存在 message
中,除非接收訊息時發生錯誤。
unserialize
如果設定為 true
(真),則訊息會被視為已使用與工作階段模組相同的機制序列化。訊息將會被反序列化,然後返回到您的腳本。這讓您可以輕鬆地從其他 PHP 腳本接收陣列或複雜的物件結構,或者如果您使用的是 WDDX 序列化器,則可以從任何與 WDDX 相容的來源接收。
如果 unserialize
為 false
(假),則訊息將會以二進位安全字串的形式返回。
flags
選用的 flags
參數允許您將旗標傳遞給底層的 msgrcv 系統呼叫。它的預設值為 0,但您可以指定下列一個或多個值(透過將它們相加或進行 OR 運算)。
MSG_IPC_NOWAIT |
如果沒有 desired_message_type 類型的訊息,則立即返回,不要等待。函式將會失敗,並返回一個對應於 MSG_ENOMSG 的整數值。 |
MSG_EXCEPT |
將此旗標與大於 0 的 desired_message_type 結合使用,將會導致函式接收第一個不等於 desired_message_type 的訊息。 |
MSG_NOERROR |
如果訊息長度大於 max_message_size ,設定此旗標將會把訊息截斷為 max_message_size ,並且不會發出錯誤訊號。 |
error_code
如果函式失敗,選用的 error_code
參數將會被設定為系統 errno 變數的值。
成功完成後,訊息佇列資料結構將會更新如下:msg_lrpid
設定為呼叫程序的程序 ID,msg_qnum
減 1,msg_rtime
設定為目前時間。
看起來 msg_receive() 會分配大小為 $maxsize 的記憶體,然後才嘗試從佇列接收訊息到已分配的記憶體中。因為我的腳本在 $maxsize = 1 Gib 時會失效,但在 $maxsize = 10 Kib 時可以正常運作。
<?php error_reporting(E_ALL);
/**
* Example for sending and receiving Messages via the System V Message Queue
*
* To try this script run it synchron/asynchron twice times. One time with ?typ=send and one time with ?typ=receive
*
* @author Thomas Eimers - Mehrkanal GmbH
*
* This document is distributed in the hope that it will be useful, but without any warranty;
* without even the implied warranty of merchantability or fitness for a particular purpose.
*/
header('Content-Type: text/plain; charset=ISO-8859-1');
echo "Start...\n";
// Create System V Message Queue. Integer value is the number of the Queue
$queue = msg_get_queue(100379);
// Sendoptions
$message='nachricht'; // Transfering Data
$serialize_needed=false; // Must the transfer data be serialized ?
$block_send=false; // Block if Message could not be send (Queue full...) (true/false)
$msgtype_send=1; // Any Integer above 0. It signeds every Message. So you could handle multible message
// type in one Queue.
// Receiveoptions
$msgtype_receive=1; // Whiche type of Message we want to receive ? (Here, the type is the same as the type we send,
// but if you set this to 0 you receive the next Message in the Queue with any type.
$maxsize=100; // How long is the maximal data you like to receive.
$option_receive=MSG_IPC_NOWAIT; // If there are no messages of the wanted type in the Queue continue without wating.
// If is set to NULL wait for a Message.
// Send or receive 20 Messages
for ($i=0;$i<20;$i++) {
sleep(1);
// This one sends
if ($_GET['typ']=='send') {
if(msg_send($queue,$msgtype_send, $message,$serialize_needed, $block_send,$err)===true) {
echo "Message sendet.\n";
} else {
var_dump($err);
}
// This one received
} else {
$queue_status=msg_stat_queue($queue);
echo 'Messages in the queue: '.$queue_status['msg_qnum']."\n";
// WARNUNG: nur weil vor einer Zeile Code noch Nachrichten in der Queue waren, muss das jetzt nciht mehr der Fall sein!
if ($queue_status['msg_qnum']>0) {
if (msg_receive($queue,$msgtype_receive ,$msgtype_erhalten,$maxsize,$daten,$serialize_needed, $option_receive, $err)===true) {
echo "Received data".$daten."\n";
} else {
var_dump($err);
}
}
}
}
?>
2Mb 的 maxsize 似乎是 PHP 的某種門檻值,超過這個值,msg_receive() 就會開始大量使用 CPU(在我的電腦上,如果發送端持續推送訊息,接收 10000 個訊息的時間會從 0.01 秒跳到 1.5 秒),所以如果可以的話,請盡量保持在這個門檻值以下。
例如,考慮以下 Linux 情況
<?php
//檔案 send.php
$ip = msg_get_queue(12340);
msg_send($ip,8,"abcd",false,false,$err);
//-----------------------------------------------------
<?php
//檔案 receive.php
$ip = msg_get_queue(12340);
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "訊息類型 {$msgtype} 資料 {$data}\n";
msg_receive($ip,0,$msgtype,4,$data,false,null,$err);
echo "訊息類型 {$msgtype} 資料 {$data}\n";
?>
現在執行
在終端機 #1 輸入 php5 receive.php
在終端機 #2 輸入 php5 receive.php
在終端機 #3 輸入 php5 send.php
顯示來自佇列的訊息將會交替出現。這表示您執行一次 send.php,訊息將會顯示在終端機 #1。第二次執行它會在終端機 #2,第三次 #1,依此類推。
這應該以您的 Apache 使用者身分在終端機中執行,在 msg_send 的註解中呼叫腳本,它們就會進行通訊。
#! /usr/bin/env php
<?php
$MSGKEY = 519051; // 訊息
$msg_id = msg_get_queue ($MSGKEY, 0600);
while (1) {
if (msg_receive ($msg_id, 1, $msg_type, 16384, $msg, true, 0, $msg_error)) {
if ($msg == 'Quit') break;
echo "$msg\n";
} else {
echo "擷取訊息時發生 $msg_error 錯誤\n";
break;
}
}
msg_remove_queue ($msg_id);
?>