這篇文章將為大家詳細(xì)講解有關(guān)Linux下如何使用管道和消息隊(duì)列,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供白云企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、成都網(wǎng)站建設(shè)、HTML5建站、小程序制作等業(yè)務(wù)。10年已為白云眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
POSIX 的一個(gè)核心目標(biāo)就是線程安全。
請(qǐng)查看一些 mq_open 函數(shù)的 man 頁(yè),這個(gè)函數(shù)屬于內(nèi)存隊(duì)列的 API。這個(gè) man 頁(yè)中有關(guān) 特性 的章節(jié)帶有一個(gè)小表格:
接口 | 特性 | 值 |
---|---|---|
mq_open() | 線程安全 | MT-Safe |
上面的 MT-Safe(MT 指的是多線程)意味著 mq_open
函數(shù)是線程安全的,進(jìn)而暗示是進(jìn)程安全的:一個(gè)進(jìn)程的執(zhí)行和它的一個(gè)線程執(zhí)行的過(guò)程類似,假如競(jìng)爭(zhēng)條件不會(huì)發(fā)生在處于相同進(jìn)程的線程中,那么這樣的條件也不會(huì)發(fā)生在處于不同進(jìn)程的線程中。MT-Safe 特性保證了調(diào)用 mq_open
時(shí)不會(huì)出現(xiàn)競(jìng)爭(zhēng)條件。一般來(lái)說(shuō),基于通道的 IPC 是并發(fā)安全的,盡管在下面例子中會(huì)出現(xiàn)一個(gè)有關(guān)警告的注意事項(xiàng)。
首先讓我們通過(guò)一個(gè)特意構(gòu)造的命令行例子來(lái)展示無(wú)名管道是如何工作的。在所有的現(xiàn)代系統(tǒng)中,符號(hào) |
在命令行中都代表一個(gè)無(wú)名管道。假設(shè)我們的命令行提示符為 %
,接下來(lái)考慮下面的命令:
## 寫(xiě)入方在 | 左邊,讀取方在右邊% sleep 5 | echo "Hello, world!"
sleep
和 echo
程序以不同的進(jìn)程執(zhí)行,無(wú)名管道允許它們進(jìn)行通信。但是上面的例子被特意設(shè)計(jì)為沒(méi)有通信發(fā)生。問(wèn)候語(yǔ) “Hello, world!” 出現(xiàn)在屏幕中,然后過(guò)了 5 秒后,命令行返回,暗示 sleep
和 echo
進(jìn)程都已經(jīng)結(jié)束了。這期間發(fā)生了什么呢?
在命令行中的豎線 |
的語(yǔ)法中,左邊的進(jìn)程(sleep
)是寫(xiě)入方,右邊的進(jìn)程(echo
)為讀取方。默認(rèn)情況下,讀取方將會(huì)阻塞,直到從通道中能夠讀取到字節(jié)數(shù)據(jù),而寫(xiě)入方在寫(xiě)完它的字節(jié)數(shù)據(jù)后,將發(fā)送流已終止的標(biāo)志。(即便寫(xiě)入方過(guò)早終止了,一個(gè)流已終止的標(biāo)志還是會(huì)發(fā)給讀取方。)無(wú)名管道將保持到寫(xiě)入方和讀取方都停止的那個(gè)時(shí)刻。
在上面的例子中,sleep
進(jìn)程并沒(méi)有向通道寫(xiě)入任何的字節(jié)數(shù)據(jù),但在 5 秒后就終止了,這時(shí)將向通道發(fā)送一個(gè)流已終止的標(biāo)志。與此同時(shí),echo
進(jìn)程立即向標(biāo)準(zhǔn)輸出(屏幕)寫(xiě)入問(wèn)候語(yǔ),因?yàn)檫@個(gè)進(jìn)程并不從通道中讀入任何字節(jié),所以它并沒(méi)有等待。一旦 sleep
和 echo
進(jìn)程都終止了,不會(huì)再用作通信的無(wú)名管道將會(huì)消失然后返回命令行提示符。
下面這個(gè)更加實(shí)用的示例將使用兩個(gè)無(wú)名管道。我們假定文件 test.dat
的內(nèi)容如下:
thisisthewaytheworldends
下面的命令:
% cat test.dat | sort | uniq
會(huì)將 cat
(連接的縮寫(xiě))進(jìn)程的輸出通過(guò)管道傳給 sort
進(jìn)程以生成排序后的輸出,然后將排序后的輸出通過(guò)管道傳給 uniq
進(jìn)程以消除重復(fù)的記錄(在本例中,會(huì)將兩次出現(xiàn)的 “the” 縮減為一個(gè)):
endsisthethiswayworld
下面展示的情景展示的是一個(gè)帶有兩個(gè)進(jìn)程的程序通過(guò)一個(gè)無(wú)名管道通信來(lái)進(jìn)行通信。
#include/* wait */#include #include /* exit functions */#include /* read, write, pipe, _exit */#include #define ReadEnd 0#define WriteEnd 1 void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /** failure **/} int main() { int pipeFDs[2]; /* two file descriptors */ char buf; /* 1-byte buffer */ const char* msg = "Nature's first green is gold\n"; /* bytes to write */ if (pipe(pipeFDs) < 0) report_and_exit("pipeFD"); pid_t cpid = fork(); /* fork a child process */ if (cpid < 0) report_and_exit("fork"); /* check for failure */ if (0 == cpid) { /*** child ***/ /* child process */ close(pipeFDs[WriteEnd]); /* child reads, doesn't write */ while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */ write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */ close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */ _exit(0); /* exit and notify parent at once */ } else { /*** parent ***/ close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */ write(pipeFDs[WriteEnd], msg, [strlen][8](msg)); /* write the bytes to the pipe */ close(pipeFDs[WriteEnd]); /* done writing: generate eof */ wait(NULL); /* wait for child to exit */ [exit][7](0); /* exit normally */ } return 0;}
上面名為 pipeUN
的程序使用系統(tǒng)函數(shù) fork
來(lái)創(chuàng)建一個(gè)進(jìn)程。盡管這個(gè)程序只有一個(gè)單一的源文件,在它正確執(zhí)行的情況下將會(huì)發(fā)生多進(jìn)程的情況。
下面的內(nèi)容是對(duì)庫(kù)函數(shù)
fork
如何工作的一個(gè)簡(jiǎn)要回顧:
fork
函數(shù)由父進(jìn)程調(diào)用,在失敗時(shí)返回-1
給父進(jìn)程。在pipeUN
這個(gè)例子中,相應(yīng)的調(diào)用是:函數(shù)調(diào)用后的返回值也被保存下來(lái)了。在這個(gè)例子中,保存在整數(shù)類型
pid_t
的變量cpid
中。(每個(gè)進(jìn)程有它自己的進(jìn)程 ID,這是一個(gè)非負(fù)的整數(shù),用來(lái)標(biāo)記進(jìn)程)。復(fù)刻一個(gè)新的進(jìn)程可能會(huì)因?yàn)槎喾N原因而失敗,包括進(jìn)程表滿了的原因,這個(gè)結(jié)構(gòu)由系統(tǒng)維持,以此來(lái)追蹤進(jìn)程狀態(tài)。明確地說(shuō),僵尸進(jìn)程假如沒(méi)有被處理掉,將可能引起進(jìn)程表被填滿的錯(cuò)誤。
pid_t cpid = fork(); /* called in parent */
假如
fork
調(diào)用成功,則它將創(chuàng)建一個(gè)新的子進(jìn)程,向父進(jìn)程返回一個(gè)值,向子進(jìn)程返回另外的一個(gè)值。在調(diào)用fork
后父進(jìn)程和子進(jìn)程都將執(zhí)行相同的代碼。(子進(jìn)程繼承了到此為止父進(jìn)程中聲明的所有變量的拷貝),特別地,一次成功的fork
調(diào)用將返回如下的東西:
向子進(jìn)程返回
0
向父進(jìn)程返回子進(jìn)程的進(jìn)程 ID
在一次成功的
fork
調(diào)用后,一個(gè)if
/else
或等價(jià)的結(jié)構(gòu)將會(huì)被用來(lái)隔離針對(duì)父進(jìn)程和子進(jìn)程的代碼。在這個(gè)例子中,相應(yīng)的聲明為:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}
假如成功地復(fù)刻出了一個(gè)子進(jìn)程,pipeUN
程序?qū)⑾裣旅孢@樣去執(zhí)行。在一個(gè)整數(shù)的數(shù)列里:
int pipeFDs[2]; /* two file descriptors */
來(lái)保存兩個(gè)文件描述符,一個(gè)用來(lái)向管道中寫(xiě)入,另一個(gè)從管道中寫(xiě)入。(數(shù)組元素 pipeFDs[0]
是讀端的文件描述符,元素 pipeFDs[1]
是寫(xiě)端的文件描述符。)在調(diào)用 fork
之前,對(duì)系統(tǒng) pipe
函數(shù)的成功調(diào)用,將立刻使得這個(gè)數(shù)組獲得兩個(gè)文件描述符:
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
父進(jìn)程和子進(jìn)程現(xiàn)在都有了文件描述符的副本。但分離關(guān)注點(diǎn)模式意味著每個(gè)進(jìn)程恰好只需要一個(gè)描述符。在這個(gè)例子中,父進(jìn)程負(fù)責(zé)寫(xiě)入,而子進(jìn)程負(fù)責(zé)讀取,盡管這樣的角色分配可以反過(guò)來(lái)。在 if
子句中的***個(gè)語(yǔ)句將用于關(guān)閉管道的讀端:
close(pipeFDs[WriteEnd]); /* called in child code */
在父進(jìn)程中的 else
子句將會(huì)關(guān)閉管道的讀端:
close(pipeFDs[ReadEnd]); /* called in parent code */
然后父進(jìn)程將向無(wú)名管道中寫(xiě)入某些字節(jié)數(shù)據(jù)(ASCII 代碼),子進(jìn)程讀取這些數(shù)據(jù),然后向標(biāo)準(zhǔn)輸出中回放它們。
在這個(gè)程序中還需要澄清的一點(diǎn)是在父進(jìn)程代碼中的 wait
函數(shù)。一旦被創(chuàng)建后,子進(jìn)程很大程度上獨(dú)立于它的父進(jìn)程,正如簡(jiǎn)短的 pipeUN
程序所展示的那樣。子進(jìn)程可以執(zhí)行任意的代碼,而它們可能與父進(jìn)程完全沒(méi)有關(guān)系。但是,假如當(dāng)子進(jìn)程終止時(shí),系統(tǒng)將會(huì)通過(guò)一個(gè)信號(hào)來(lái)通知父進(jìn)程。
要是父進(jìn)程在子進(jìn)程之前終止又該如何呢?在這種情形下,除非采取了預(yù)防措施,子進(jìn)程將會(huì)變成在進(jìn)程表中的一個(gè)僵尸進(jìn)程。預(yù)防措施有兩大類型:***種是讓父進(jìn)程去通知系統(tǒng),告訴系統(tǒng)它對(duì)子進(jìn)程的終止沒(méi)有任何興趣:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
第二種方法是在子進(jìn)程終止時(shí),讓父進(jìn)程執(zhí)行一個(gè) wait
。這樣就確保了父進(jìn)程可以獨(dú)立于子進(jìn)程而存在。在 pipeUN
程序中使用了第二種方法,其中父進(jìn)程的代碼使用的是下面的調(diào)用:
wait(NULL); /* called in parent */
這個(gè)對(duì) wait
的調(diào)用意味著一直等待直到任意一個(gè)子進(jìn)程的終止發(fā)生,因此在 pipeUN
程序中,只有一個(gè)子進(jìn)程。(其中的 NULL
參數(shù)可以被替換為一個(gè)保存有子程序退出狀態(tài)的整數(shù)變量的地址。)對(duì)于更細(xì)粒度的控制,還可以使用更靈活的 waitpid
函數(shù),例如特別指定多個(gè)子進(jìn)程中的某一個(gè)。
pipeUN
將會(huì)采取另一個(gè)預(yù)防措施。當(dāng)父進(jìn)程結(jié)束了等待,父進(jìn)程將會(huì)調(diào)用常規(guī)的 exit
函數(shù)去退出。對(duì)應(yīng)的,子進(jìn)程將會(huì)調(diào)用 _exit
變種來(lái)退出,這類變種將快速跟蹤終止相關(guān)的通知。在效果上,子進(jìn)程會(huì)告訴系統(tǒng)立刻去通知父進(jìn)程它的這個(gè)子進(jìn)程已經(jīng)終止了。
假如兩個(gè)進(jìn)程向相同的無(wú)名管道中寫(xiě)入內(nèi)容,字節(jié)數(shù)據(jù)會(huì)交錯(cuò)嗎?例如,假如進(jìn)程 P1 向管道寫(xiě)入內(nèi)容:
foo bar
同時(shí)進(jìn)程 P2 并發(fā)地寫(xiě)入:
baz baz
到相同的管道,***的結(jié)果似乎是管道中的內(nèi)容將會(huì)是任意錯(cuò)亂的,例如像這樣:
baz foo baz bar
只要沒(méi)有寫(xiě)入超過(guò) PIPE_BUF
字節(jié),POSIX 標(biāo)準(zhǔn)就能確保寫(xiě)入不會(huì)交錯(cuò)。在 Linux 系統(tǒng)中, PIPE_BUF
的大小是 4096 字節(jié)。對(duì)于管道我更喜歡只有一個(gè)寫(xiě)入方和一個(gè)讀取方,從而繞過(guò)這個(gè)問(wèn)題。
無(wú)名管道沒(méi)有備份文件:系統(tǒng)將維持一個(gè)內(nèi)存緩存來(lái)將字節(jié)數(shù)據(jù)從寫(xiě)方傳給讀方。一旦寫(xiě)方和讀方終止,這個(gè)緩存將會(huì)被回收,進(jìn)而無(wú)名管道消失。相反的,命名管道有備份文件和一個(gè)不同的 API。
下面讓我們通過(guò)另一個(gè)命令行示例來(lái)了解命名管道的要點(diǎn)。下面是具體的步驟:
開(kāi)啟兩個(gè)終端。這兩個(gè)終端的工作目錄應(yīng)該相同。
在其中一個(gè)終端中,鍵入下面的兩個(gè)命令(命令行提示符仍然是 %
,我的注釋以 ##
打頭。):
在最開(kāi)始,沒(méi)有任何東西會(huì)出現(xiàn)在終端中,因?yàn)榈浆F(xiàn)在為止沒(méi)有在命名管道中寫(xiě)入任何東西。
% mkfifo tester ## 創(chuàng)建一個(gè)備份文件,名為 tester
% cat tester ## 將管道的內(nèi)容輸出到 stdout
在第二個(gè)終端中輸入下面的命令:
無(wú)論在這個(gè)終端中輸入什么,它都會(huì)在另一個(gè)終端中顯示出來(lái)。一旦鍵入 Ctrl+C
,就會(huì)回到正常的命令行提示符,因?yàn)楣艿酪呀?jīng)被關(guān)閉了。
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
通過(guò)移除實(shí)現(xiàn)命名管道的文件來(lái)進(jìn)行清理:
% unlink tester
正如 mkfifo
程序的名字所暗示的那樣,命名管道也被叫做 FIFO,因?yàn)?**個(gè)進(jìn)入的字節(jié),就會(huì)***個(gè)出,其他的類似。有一個(gè)名為 mkfifo
的庫(kù)函數(shù),用它可以在程序中創(chuàng)建一個(gè)命名管道,它將在下一個(gè)示例中被用到,該示例由兩個(gè)進(jìn)程組成:一個(gè)向命名管道寫(xiě)入,而另一個(gè)從該管道讀取。
#include#include #include #include #include #include #include #define MaxLoops 12000 /* outer loop */#define ChunkSize 16 /* how many written at a time */#define IntsPerChunk 4 /* four 4-byte ints per chunk */#define MaxZs 250 /* max microseconds to sleep */ int main() { const char* pipeName = "./fifoChannel"; mkfifo(pipeName, 0666); /* read/write for user/group/others */ int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */ if (fd < 0) return -1; /** error **/ int i; for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */ int j; for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */ int k; int chunk[IntsPerChunk]; for (k = 0; k < IntsPerChunk; k++) chunk[k] = [rand][9](); write(fd, chunk, sizeof(chunk)); } usleep(([rand][9]() % MaxZs) + 1); /* pause a bit for realism */ } close(fd); /* close pipe: generates an end-of-file */ unlink(pipeName); /* unlink from the implementing file */ [printf][10]("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk); return 0;}
上面的 fifoWriter
程序可以被總結(jié)為如下:
首先程序創(chuàng)建了一個(gè)命名管道用來(lái)寫(xiě)入數(shù)據(jù):
其中的 pipeName
是備份文件的名字,傳遞給 mkfifo
作為它的***個(gè)參數(shù)。接著命名管道通過(guò)我們熟悉的 open
函數(shù)調(diào)用被打開(kāi),而這個(gè)函數(shù)將會(huì)返回一個(gè)文件描述符。
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);
在實(shí)現(xiàn)層面上,fifoWriter
不會(huì)一次性將所有的數(shù)據(jù)都寫(xiě)入,而是寫(xiě)入一個(gè)塊,然后休息隨機(jī)數(shù)目的微秒時(shí)間,接著再循環(huán)往復(fù)??偟膩?lái)說(shuō),有 768000 個(gè) 4 字節(jié)整數(shù)值被寫(xiě)入到命名管道中。
在關(guān)閉命名管道后,fifoWriter
也將使用 unlink
取消對(duì)該文件的連接。
一旦連接到管道的每個(gè)進(jìn)程都執(zhí)行了 unlink
操作后,系統(tǒng)將回收這些備份文件。在這個(gè)例子中,只有兩個(gè)這樣的進(jìn)程 fifoWriter
和 fifoReader
,它們都做了 unlink
操作。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
這個(gè)兩個(gè)程序應(yīng)該在不同終端的相同工作目錄中執(zhí)行。但是 fifoWriter
應(yīng)該在 fifoReader
之前被啟動(dòng),因?yàn)樾枰?fifoWriter
去創(chuàng)建管道。然后 fifoReader
才能夠獲取到剛被創(chuàng)建的命名管道。
#include#include #include #include #include unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */ if (n <= 3) return n > 1; if (0 == (n % 2) || 0 == (n % 3)) return 0; unsigned i; for (i = 5; (i * i) <= n; i += 6) if (0 == (n % i) || 0 == (n % (i + 2))) return 0; return 1; /* found a prime! */} int main() { const char* file = "./fifoChannel"; int fd = open(file, O_RDONLY); if (fd < 0) return -1; /* no point in continuing */ unsigned count = 0, total = 0, primes_count = 0; while (1) { int next; int i; ssize_t count = read(fd, &next, sizeof(int)); if (0 == count) break; /* end of stream */ else if (count == sizeof(int)) { /* read a 4-byte int value */ total++; if (is_prime(next)) primes_count++; } } close(fd); /* close pipe from read end */ unlink(file); /* unlink from the underlying file */ [printf][10]("Received ints: %u, primes: %u\n", total, primes_count); return 0;}
上面的 fifoReader
的內(nèi)容可以總結(jié)為如下:
因?yàn)?fifoWriter
已經(jīng)創(chuàng)建了命名管道,所以 fifoReader
只需要利用標(biāo)準(zhǔn)的 open
調(diào)用來(lái)通過(guò)備份文件來(lái)獲取到管道中的內(nèi)容:
這個(gè)文件的是以只讀打開(kāi)的。
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
然后這個(gè)程序進(jìn)入一個(gè)潛在的***循環(huán),在每次循環(huán)時(shí),嘗試讀取 4 字節(jié)的塊。read
調(diào)用:
返回 0 來(lái)暗示該流的結(jié)束。在這種情況下,fifoReader
跳出循環(huán),關(guān)閉命名管道,并在終止前 unlink
備份文件。
ssize_t count = read(fd, &next, sizeof(int));
在讀入 4 字節(jié)整數(shù)后,fifoReader
檢查這個(gè)數(shù)是否為質(zhì)數(shù)。這個(gè)操作代表了一個(gè)生產(chǎn)級(jí)別的讀取器可能在接收到的字節(jié)數(shù)據(jù)上執(zhí)行的邏輯操作。在示例運(yùn)行中,在接收到的 768000 個(gè)整數(shù)中有 37682 個(gè)質(zhì)數(shù)。
重復(fù)運(yùn)行示例, fifoReader
將成功地讀取 fifoWriter
寫(xiě)入的所有字節(jié)。這不是很讓人驚訝的。這兩個(gè)進(jìn)程在相同的機(jī)器上執(zhí)行,從而可以不用考慮網(wǎng)絡(luò)相關(guān)的問(wèn)題。命名管道是一個(gè)可信且高效的 IPC 機(jī)制,因而被廣泛使用。
下面是這兩個(gè)程序的輸出,它們?cè)诓煌慕K端中啟動(dòng),但處于相同的工作目錄:
% ./fifoWriter768000 ints sent to the pipe.###% ./fifoReaderReceived ints: 768000, primes: 37682
管道有著嚴(yán)格的先入先出行為:***個(gè)被寫(xiě)入的字節(jié)將會(huì)***個(gè)被讀,第二個(gè)寫(xiě)入的字節(jié)將第二個(gè)被讀,以此類推。消息隊(duì)列可以做出相同的表現(xiàn),但它又足夠靈活,可以使得字節(jié)塊可以不以先入先出的次序來(lái)接收。
正如它的名字所提示的那樣,消息隊(duì)列是一系列的消息,每個(gè)消息包含兩部分:
荷載,一個(gè)字節(jié)序列(在 C 中是 char)
類型,以一個(gè)正整數(shù)值的形式給定,類型用來(lái)分類消息,為了更靈活的回收
看一下下面對(duì)一個(gè)消息隊(duì)列的描述,每個(gè)消息由一個(gè)整數(shù)類型標(biāo)記:
+-+ +-+ +-+ +-+sender--->|3|--->|2|--->|2|--->|1|--->receiver +-+ +-+ +-+ +-+
在上面展示的 4 個(gè)消息中,標(biāo)記為 1 的是開(kāi)頭,即最接近接收端,然后另個(gè)標(biāo)記為 2 的消息,***接著一個(gè)標(biāo)記為 3 的消息。假如按照嚴(yán)格的 FIFO 行為執(zhí)行,消息將會(huì)以 1-2-2-3 這樣的次序被接收。但是消息隊(duì)列允許其他收取次序。例如,消息可以被接收方以 3-2-1-2 的次序接收。
mqueue
示例包含兩個(gè)程序,sender
將向消息隊(duì)列中寫(xiě)入數(shù)據(jù),而 receiver
將從這個(gè)隊(duì)列中讀取數(shù)據(jù)。這兩個(gè)程序都包含的頭文件 queue.h
如下所示:
#define ProjectId 123#define PathName "queue.h" /* any existing, accessible file would do */#define MsgLen 4#define MsgCount 6 typedef struct { long type; /* must be of type long */ char payload[MsgLen + 1]; /* bytes in the message */ } queuedMessage;
上面的頭文件定義了一個(gè)名為 queuedMessage
的結(jié)構(gòu)類型,它帶有 payload
(字節(jié)數(shù)組)和 type
(整數(shù))這兩個(gè)域。該文件也定義了一些符號(hào)常數(shù)(使用 #define
語(yǔ)句),前兩個(gè)常數(shù)被用來(lái)生成一個(gè) key
,而這個(gè) key
反過(guò)來(lái)被用來(lái)獲取一個(gè)消息隊(duì)列的 ID。ProjectId
可以是任何正整數(shù)值,而 PathName
必須是一個(gè)存在的、可訪問(wèn)的文件,在這個(gè)示例中,指的是文件 queue.h
。在 sender
和 receiver
中,它們都有的設(shè)定語(yǔ)句為:
key_t key = ftok(PathName, ProjectId); /* generate key */int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
ID qid
在效果上是消息隊(duì)列文件描述符的對(duì)應(yīng)物。
#include#include #include #include #include #include "queue.h" void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /* EXIT_FAILURE */} int main() { key_t key = ftok(PathName, ProjectId); if (key < 0) report_and_exit("couldn't get key..."); int qid = msgget(key, 0666 | IPC_CREAT); if (qid < 0) report_and_exit("couldn't get queue id..."); char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"}; int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */ int i; for (i = 0; i < MsgCount; i++) { /* build the message */ queuedMessage msg; msg.type = types[i]; [strcpy][11](msg.payload, payloads[i]); /* send the message */ msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */ [printf][10]("%s sent as type %i\n", msg.payload, (int) msg.type); } return 0;}
上面的 sender
程序?qū)l(fā)送出 6 個(gè)消息,每?jī)蓚€(gè)為一個(gè)類型:前兩個(gè)是類型 1,接著的連個(gè)是類型 2,***的兩個(gè)為類型 3。發(fā)送的語(yǔ)句:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
被配置為非阻塞的(IPC_NOWAIT
標(biāo)志),是因?yàn)檫@里的消息體量上都很小。唯一的危險(xiǎn)在于一個(gè)完整的序列將可能導(dǎo)致發(fā)送失敗,而這個(gè)例子不會(huì)。下面的 receiver
程序也將使用 IPC_NOWAIT
標(biāo)志來(lái)接收消息。
#include#include #include #include #include "queue.h" void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /* EXIT_FAILURE */} int main() { key_t key= ftok(PathName, ProjectId); /* key to identify the queue */ if (key < 0) report_and_exit("key not gotten..."); int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */ if (qid < 0) report_and_exit("no access to queue..."); int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */ int i; for (i = 0; i < MsgCount; i++) { queuedMessage msg; /* defined in queue.h */ if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0) [puts][12]("msgrcv trouble..."); [printf][10]("%s received as type %i\n", msg.payload, (int) msg.type); } /** remove the queue **/ if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */ report_and_exit("trouble removing queue..."); return 0; }
這個(gè) receiver
程序不會(huì)創(chuàng)建消息隊(duì)列,盡管 API 盡管建議那樣。在 receiver
中,對(duì)
int qid = msgget(key, 0666 | IPC_CREAT);
的調(diào)用可能因?yàn)閹в?IPC_CREAT
標(biāo)志而具有誤導(dǎo)性,但是這個(gè)標(biāo)志的真實(shí)意義是如果需要就創(chuàng)建,否則直接獲取。sender
程序調(diào)用 msgsnd
來(lái)發(fā)送消息,而 receiver
調(diào)用 msgrcv
來(lái)接收它們。在這個(gè)例子中,sender
以 1-1-2-2-3-3 的次序發(fā)送消息,但 receiver
接收它們的次序?yàn)?3-1-2-1-3-2,這顯示消息隊(duì)列沒(méi)有被嚴(yán)格的 FIFO 行為所拘泥:
% ./sendermsg1 sent as type 1msg2 sent as type 1msg3 sent as type 2msg4 sent as type 2msg5 sent as type 3msg6 sent as type 3 % ./receivermsg5 received as type 3msg1 received as type 1msg3 received as type 2msg2 received as type 1msg6 received as type 3msg4 received as type 2
上面的輸出顯示 sender
和 receiver
可以在同一個(gè)終端中啟動(dòng)。輸出也顯示消息隊(duì)列是持久的,即便 sender
進(jìn)程在完成創(chuàng)建隊(duì)列、向隊(duì)列寫(xiě)數(shù)據(jù)、然后退出的整個(gè)過(guò)程后,該隊(duì)列仍然存在。只有在 receiver
進(jìn)程顯式地調(diào)用 msgctl
來(lái)移除該隊(duì)列,這個(gè)隊(duì)列才會(huì)消失:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
關(guān)于“Linux下如何使用管道和消息隊(duì)列”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。