真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Linux下如何使用管道和消息隊(duì)列

這篇文章將為大家詳細(xì)講解有關(guān)Linux下如何使用管道和消息隊(duì)列,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供白云企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、成都網(wǎng)站建設(shè)、HTML5建站、小程序制作等業(yè)務(wù)。10年已為白云眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

POSIX  的一個(gè)核心目標(biāo)就是線程安全。

請(qǐng)查看一些 mq_open 函數(shù)的 man 頁(yè),這個(gè)函數(shù)屬于內(nèi)存隊(duì)列的 API。這個(gè) man 頁(yè)中有關(guān) 特性 的章節(jié)帶有一個(gè)小表格:

接口特性
mq_open()線程安全MT-Safe

上面的 MT-Safe(MT 指的是多線程multi-threaded)意味著 mq_open 函數(shù)是線程安全的,進(jìn)而暗示是進(jìn)程安全的:一個(gè)進(jìn)程的執(zhí)行和它的一個(gè)線程執(zhí)行的過(guò)程類似,假如競(jìng)爭(zhēng)條件不會(huì)發(fā)生在處于相同進(jìn)程的線程中,那么這樣的條件也不會(huì)發(fā)生在處于不同進(jìn)程的線程中。MT-Safe 特性保證了調(diào)用 mq_open 時(shí)不會(huì)出現(xiàn)競(jìng)爭(zhēng)條件。一般來(lái)說(shuō),基于通道的 IPC 是并發(fā)安全的,盡管在下面例子中會(huì)出現(xiàn)一個(gè)有關(guān)警告的注意事項(xiàng)。

無(wú)名管道

首先讓我們通過(guò)一個(gè)特意構(gòu)造的命令行例子來(lái)展示無(wú)名管道是如何工作的。在所有的現(xiàn)代系統(tǒng)中,符號(hào) | 在命令行中都代表一個(gè)無(wú)名管道。假設(shè)我們的命令行提示符為 %,接下來(lái)考慮下面的命令:

## 寫(xiě)入方在 | 左邊,讀取方在右邊% sleep 5 | echo "Hello, world!"

sleepecho 程序以不同的進(jìn)程執(zhí)行,無(wú)名管道允許它們進(jìn)行通信。但是上面的例子被特意設(shè)計(jì)為沒(méi)有通信發(fā)生。問(wèn)候語(yǔ) “Hello, world!” 出現(xiàn)在屏幕中,然后過(guò)了 5 秒后,命令行返回,暗示 sleepecho 進(jìn)程都已經(jīng)結(jié)束了。這期間發(fā)生了什么呢?

在命令行中的豎線 | 的語(yǔ)法中,左邊的進(jìn)程(sleep)是寫(xiě)入方,右邊的進(jìn)程(echo)為讀取方。默認(rèn)情況下,讀取方將會(huì)阻塞,直到從通道中能夠讀取到字節(jié)數(shù)據(jù),而寫(xiě)入方在寫(xiě)完它的字節(jié)數(shù)據(jù)后,將發(fā)送流已終止end-of-stream的標(biāo)志。(即便寫(xiě)入方過(guò)早終止了,一個(gè)流已終止的標(biāo)志還是會(huì)發(fā)給讀取方。)無(wú)名管道將保持到寫(xiě)入方和讀取方都停止的那個(gè)時(shí)刻。

在上面的例子中,sleep 進(jìn)程并沒(méi)有向通道寫(xiě)入任何的字節(jié)數(shù)據(jù),但在 5 秒后就終止了,這時(shí)將向通道發(fā)送一個(gè)流已終止的標(biāo)志。與此同時(shí),echo 進(jìn)程立即向標(biāo)準(zhǔn)輸出(屏幕)寫(xiě)入問(wèn)候語(yǔ),因?yàn)檫@個(gè)進(jìn)程并不從通道中讀入任何字節(jié),所以它并沒(méi)有等待。一旦 sleepecho 進(jìn)程都終止了,不會(huì)再用作通信的無(wú)名管道將會(huì)消失然后返回命令行提示符。

下面這個(gè)更加實(shí)用的示例將使用兩個(gè)無(wú)名管道。我們假定文件 test.dat 的內(nèi)容如下:

thisisthewaytheworldends

下面的命令:

% cat test.dat | sort | uniq

會(huì)將 cat連接concatenate的縮寫(xiě))進(jìn)程的輸出通過(guò)管道傳給 sort 進(jìn)程以生成排序后的輸出,然后將排序后的輸出通過(guò)管道傳給 uniq 進(jìn)程以消除重復(fù)的記錄(在本例中,會(huì)將兩次出現(xiàn)的 “the” 縮減為一個(gè)):

endsisthethiswayworld

下面展示的情景展示的是一個(gè)帶有兩個(gè)進(jìn)程的程序通過(guò)一個(gè)無(wú)名管道通信來(lái)進(jìn)行通信。

示例 1. 兩個(gè)進(jìn)程通過(guò)一個(gè)無(wú)名管道來(lái)進(jìn)行通信
#include  /* wait */#include #include    /* exit functions */#include    /* read, write, pipe, _exit */#include  #define ReadEnd  0#define WriteEnd 1 void report_and_exit(const char* msg) {  [perror][6](msg);  [exit][7](-1);    /** failure **/} int main() {  int pipeFDs[2]; /* two file descriptors */  char buf;       /* 1-byte buffer */  const char* msg = "Nature's first green is gold\n"; /* bytes to write */   if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");  pid_t cpid = fork();                                /* fork a child process */  if (cpid < 0) report_and_exit("fork");              /* check for failure */   if (0 == cpid) {    /*** child ***/                 /* child process */    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */     while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */     close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */    _exit(0);                                         /* exit and notify parent at once  */  }  else {              /*** parent ***/    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */     write(pipeFDs[WriteEnd], msg, [strlen][8](msg));       /* write the bytes to the pipe */    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */     wait(NULL);                                       /* wait for child to exit */    [exit][7](0);                                          /* exit normally */  }  return 0;}

上面名為 pipeUN 的程序使用系統(tǒng)函數(shù) fork 來(lái)創(chuàng)建一個(gè)進(jìn)程。盡管這個(gè)程序只有一個(gè)單一的源文件,在它正確執(zhí)行的情況下將會(huì)發(fā)生多進(jìn)程的情況。

下面的內(nèi)容是對(duì)庫(kù)函數(shù) fork 如何工作的一個(gè)簡(jiǎn)要回顧:

  • fork 函數(shù)由進(jìn)程調(diào)用,在失敗時(shí)返回 -1 給父進(jìn)程。在 pipeUN 這個(gè)例子中,相應(yīng)的調(diào)用是:


    函數(shù)調(diào)用后的返回值也被保存下來(lái)了。在這個(gè)例子中,保存在整數(shù)類型 pid_t 的變量 cpid 中。(每個(gè)進(jìn)程有它自己的進(jìn)程 ID,這是一個(gè)非負(fù)的整數(shù),用來(lái)標(biāo)記進(jìn)程)。復(fù)刻一個(gè)新的進(jìn)程可能會(huì)因?yàn)槎喾N原因而失敗,包括進(jìn)程表滿了的原因,這個(gè)結(jié)構(gòu)由系統(tǒng)維持,以此來(lái)追蹤進(jìn)程狀態(tài)。明確地說(shuō),僵尸進(jìn)程假如沒(méi)有被處理掉,將可能引起進(jìn)程表被填滿的錯(cuò)誤。

    1. pid_t cpid = fork(); /* called in parent */

  • 假如 fork 調(diào)用成功,則它將創(chuàng)建一個(gè)新的子進(jìn)程,向父進(jìn)程返回一個(gè)值,向子進(jìn)程返回另外的一個(gè)值。在調(diào)用 fork 后父進(jìn)程和子進(jìn)程都將執(zhí)行相同的代碼。(子進(jìn)程繼承了到此為止父進(jìn)程中聲明的所有變量的拷貝),特別地,一次成功的 fork 調(diào)用將返回如下的東西:

    • 向子進(jìn)程返回 0

    • 向父進(jìn)程返回子進(jìn)程的進(jìn)程 ID

  • 在一次成功的 fork 調(diào)用后,一個(gè) if/else 或等價(jià)的結(jié)構(gòu)將會(huì)被用來(lái)隔離針對(duì)父進(jìn)程和子進(jìn)程的代碼。在這個(gè)例子中,相應(yīng)的聲明為:


    1. if (0 == cpid) { /*** child ***/

    2. ...

    3. }

    4. else { /*** parent ***/

    5. ...

    6. }

假如成功地復(fù)刻出了一個(gè)子進(jìn)程,pipeUN 程序?qū)⑾裣旅孢@樣去執(zhí)行。在一個(gè)整數(shù)的數(shù)列里:

int pipeFDs[2]; /* two file descriptors */

來(lái)保存兩個(gè)文件描述符,一個(gè)用來(lái)向管道中寫(xiě)入,另一個(gè)從管道中寫(xiě)入。(數(shù)組元素 pipeFDs[0] 是讀端的文件描述符,元素 pipeFDs[1] 是寫(xiě)端的文件描述符。)在調(diào)用 fork 之前,對(duì)系統(tǒng) pipe 函數(shù)的成功調(diào)用,將立刻使得這個(gè)數(shù)組獲得兩個(gè)文件描述符:

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父進(jìn)程和子進(jìn)程現(xiàn)在都有了文件描述符的副本。但分離關(guān)注點(diǎn)模式意味著每個(gè)進(jìn)程恰好只需要一個(gè)描述符。在這個(gè)例子中,父進(jìn)程負(fù)責(zé)寫(xiě)入,而子進(jìn)程負(fù)責(zé)讀取,盡管這樣的角色分配可以反過(guò)來(lái)。在 if 子句中的***個(gè)語(yǔ)句將用于關(guān)閉管道的讀端:

close(pipeFDs[WriteEnd]); /* called in child code */

在父進(jìn)程中的 else 子句將會(huì)關(guān)閉管道的讀端:

close(pipeFDs[ReadEnd]); /* called in parent code */

然后父進(jìn)程將向無(wú)名管道中寫(xiě)入某些字節(jié)數(shù)據(jù)(ASCII 代碼),子進(jìn)程讀取這些數(shù)據(jù),然后向標(biāo)準(zhǔn)輸出中回放它們。

在這個(gè)程序中還需要澄清的一點(diǎn)是在父進(jìn)程代碼中的 wait 函數(shù)。一旦被創(chuàng)建后,子進(jìn)程很大程度上獨(dú)立于它的父進(jìn)程,正如簡(jiǎn)短的 pipeUN 程序所展示的那樣。子進(jìn)程可以執(zhí)行任意的代碼,而它們可能與父進(jìn)程完全沒(méi)有關(guān)系。但是,假如當(dāng)子進(jìn)程終止時(shí),系統(tǒng)將會(huì)通過(guò)一個(gè)信號(hào)來(lái)通知父進(jìn)程。

要是父進(jìn)程在子進(jìn)程之前終止又該如何呢?在這種情形下,除非采取了預(yù)防措施,子進(jìn)程將會(huì)變成在進(jìn)程表中的一個(gè)僵尸進(jìn)程。預(yù)防措施有兩大類型:***種是讓父進(jìn)程去通知系統(tǒng),告訴系統(tǒng)它對(duì)子進(jìn)程的終止沒(méi)有任何興趣:

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二種方法是在子進(jìn)程終止時(shí),讓父進(jìn)程執(zhí)行一個(gè) wait。這樣就確保了父進(jìn)程可以獨(dú)立于子進(jìn)程而存在。在 pipeUN 程序中使用了第二種方法,其中父進(jìn)程的代碼使用的是下面的調(diào)用:

wait(NULL); /* called in parent */

這個(gè)對(duì) wait 的調(diào)用意味著一直等待直到任意一個(gè)子進(jìn)程的終止發(fā)生,因此在 pipeUN 程序中,只有一個(gè)子進(jìn)程。(其中的 NULL 參數(shù)可以被替換為一個(gè)保存有子程序退出狀態(tài)的整數(shù)變量的地址。)對(duì)于更細(xì)粒度的控制,還可以使用更靈活的 waitpid 函數(shù),例如特別指定多個(gè)子進(jìn)程中的某一個(gè)。

pipeUN 將會(huì)采取另一個(gè)預(yù)防措施。當(dāng)父進(jìn)程結(jié)束了等待,父進(jìn)程將會(huì)調(diào)用常規(guī)的 exit 函數(shù)去退出。對(duì)應(yīng)的,子進(jìn)程將會(huì)調(diào)用 _exit 變種來(lái)退出,這類變種將快速跟蹤終止相關(guān)的通知。在效果上,子進(jìn)程會(huì)告訴系統(tǒng)立刻去通知父進(jìn)程它的這個(gè)子進(jìn)程已經(jīng)終止了。

假如兩個(gè)進(jìn)程向相同的無(wú)名管道中寫(xiě)入內(nèi)容,字節(jié)數(shù)據(jù)會(huì)交錯(cuò)嗎?例如,假如進(jìn)程 P1 向管道寫(xiě)入內(nèi)容:

foo bar

同時(shí)進(jìn)程 P2 并發(fā)地寫(xiě)入:

baz baz

到相同的管道,***的結(jié)果似乎是管道中的內(nèi)容將會(huì)是任意錯(cuò)亂的,例如像這樣:

baz foo baz bar

只要沒(méi)有寫(xiě)入超過(guò) PIPE_BUF 字節(jié),POSIX 標(biāo)準(zhǔn)就能確保寫(xiě)入不會(huì)交錯(cuò)。在 Linux 系統(tǒng)中, PIPE_BUF 的大小是 4096 字節(jié)。對(duì)于管道我更喜歡只有一個(gè)寫(xiě)入方和一個(gè)讀取方,從而繞過(guò)這個(gè)問(wèn)題。

命名管道

無(wú)名管道沒(méi)有備份文件:系統(tǒng)將維持一個(gè)內(nèi)存緩存來(lái)將字節(jié)數(shù)據(jù)從寫(xiě)方傳給讀方。一旦寫(xiě)方和讀方終止,這個(gè)緩存將會(huì)被回收,進(jìn)而無(wú)名管道消失。相反的,命名管道有備份文件和一個(gè)不同的 API。

下面讓我們通過(guò)另一個(gè)命令行示例來(lái)了解命名管道的要點(diǎn)。下面是具體的步驟:

  • 開(kāi)啟兩個(gè)終端。這兩個(gè)終端的工作目錄應(yīng)該相同。

  • 在其中一個(gè)終端中,鍵入下面的兩個(gè)命令(命令行提示符仍然是 %,我的注釋以 ## 打頭。):


    在最開(kāi)始,沒(méi)有任何東西會(huì)出現(xiàn)在終端中,因?yàn)榈浆F(xiàn)在為止沒(méi)有在命名管道中寫(xiě)入任何東西。

    1. % mkfifo tester ## 創(chuàng)建一個(gè)備份文件,名為 tester

    2. % cat tester    ## 將管道的內(nèi)容輸出到 stdout

  • 在第二個(gè)終端中輸入下面的命令:


    無(wú)論在這個(gè)終端中輸入什么,它都會(huì)在另一個(gè)終端中顯示出來(lái)。一旦鍵入 Ctrl+C,就會(huì)回到正常的命令行提示符,因?yàn)楣艿酪呀?jīng)被關(guān)閉了。

    1. % cat > tester ## redirect keyboard input to the pipe

    2. hello, world!  ## then hit Return key

    3. bye, bye       ## ditto

    4.    ## terminate session with a Control-C

  • 通過(guò)移除實(shí)現(xiàn)命名管道的文件來(lái)進(jìn)行清理:


    1. % unlink tester

正如 mkfifo 程序的名字所暗示的那樣,命名管道也被叫做 FIFO,因?yàn)?**個(gè)進(jìn)入的字節(jié),就會(huì)***個(gè)出,其他的類似。有一個(gè)名為 mkfifo 的庫(kù)函數(shù),用它可以在程序中創(chuàng)建一個(gè)命名管道,它將在下一個(gè)示例中被用到,該示例由兩個(gè)進(jìn)程組成:一個(gè)向命名管道寫(xiě)入,而另一個(gè)從該管道讀取。

示例 2. fifoWriter 程序
#include #include #include  #include #include #include #include  #define MaxLoops         12000   /* outer loop */#define ChunkSize           16   /* how many written at a time */#define IntsPerChunk         4   /* four 4-byte ints per chunk */#define MaxZs              250   /* max microseconds to sleep */ int main() {  const char* pipeName = "./fifoChannel";  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */  if (fd < 0) return -1;                       /** error **/    int i;  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */    int j;    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */      int k;      int chunk[IntsPerChunk];      for (k = 0; k < IntsPerChunk; k++)     chunk[k] = [rand][9]();                   write(fd, chunk, sizeof(chunk));     }    usleep(([rand][9]() % MaxZs) + 1);           /* pause a bit for realism */  }   close(fd);                                /* close pipe: generates an end-of-file */  unlink(pipeName);                         /* unlink from the implementing file */  [printf][10]("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);   return 0;}

上面的 fifoWriter 程序可以被總結(jié)為如下:

  • 首先程序創(chuàng)建了一個(gè)命名管道用來(lái)寫(xiě)入數(shù)據(jù):


    其中的 pipeName 是備份文件的名字,傳遞給 mkfifo 作為它的***個(gè)參數(shù)。接著命名管道通過(guò)我們熟悉的 open 函數(shù)調(diào)用被打開(kāi),而這個(gè)函數(shù)將會(huì)返回一個(gè)文件描述符。

    1. mkfifo(pipeName, 0666); /* read/write perms for user/group/others */

    2. int fd = open(pipeName, O_CREAT | O_WRONLY);

  • 在實(shí)現(xiàn)層面上,fifoWriter 不會(huì)一次性將所有的數(shù)據(jù)都寫(xiě)入,而是寫(xiě)入一個(gè)塊,然后休息隨機(jī)數(shù)目的微秒時(shí)間,接著再循環(huán)往復(fù)??偟膩?lái)說(shuō),有 768000 個(gè) 4 字節(jié)整數(shù)值被寫(xiě)入到命名管道中。

  • 在關(guān)閉命名管道后,fifoWriter 也將使用 unlink 取消對(duì)該文件的連接。


    一旦連接到管道的每個(gè)進(jìn)程都執(zhí)行了 unlink 操作后,系統(tǒng)將回收這些備份文件。在這個(gè)例子中,只有兩個(gè)這樣的進(jìn)程 fifoWriterfifoReader,它們都做了 unlink 操作。

    1. close(fd); /* close pipe: generates end-of-stream marker */

    2. unlink(pipeName); /* unlink from the implementing file */

這個(gè)兩個(gè)程序應(yīng)該在不同終端的相同工作目錄中執(zhí)行。但是 fifoWriter 應(yīng)該在 fifoReader 之前被啟動(dòng),因?yàn)樾枰?fifoWriter 去創(chuàng)建管道。然后 fifoReader 才能夠獲取到剛被創(chuàng)建的命名管道。

示例 3. fifoReader 程序
#include #include #include #include #include   unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */  if (n <= 3) return n > 1;  if (0 == (n % 2) || 0 == (n % 3)) return 0;   unsigned i;  for (i = 5; (i * i) <= n; i += 6)     if (0 == (n % i) || 0 == (n % (i + 2))) return 0;   return 1; /* found a prime! */} int main() {  const char* file = "./fifoChannel";  int fd = open(file, O_RDONLY);   if (fd < 0) return -1; /* no point in continuing */  unsigned count = 0, total = 0, primes_count = 0;   while (1) {    int next;    int i;    ssize_t count = read(fd, &next, sizeof(int));        if (0 == count) break;                  /* end of stream */    else if (count == sizeof(int)) {        /* read a 4-byte int value */      total++;      if (is_prime(next)) primes_count++;    }     }   close(fd);       /* close pipe from read end */  unlink(file);    /* unlink from the underlying file */  [printf][10]("Received ints: %u, primes: %u\n", total, primes_count);     return 0;}

上面的 fifoReader 的內(nèi)容可以總結(jié)為如下:

  • 因?yàn)?fifoWriter 已經(jīng)創(chuàng)建了命名管道,所以 fifoReader 只需要利用標(biāo)準(zhǔn)的 open 調(diào)用來(lái)通過(guò)備份文件來(lái)獲取到管道中的內(nèi)容:


    這個(gè)文件的是以只讀打開(kāi)的。

    1. const char* file = "./fifoChannel";

    2. int fd = open(file, O_RDONLY);

  • 然后這個(gè)程序進(jìn)入一個(gè)潛在的***循環(huán),在每次循環(huán)時(shí),嘗試讀取 4 字節(jié)的塊。read 調(diào)用:


    返回 0 來(lái)暗示該流的結(jié)束。在這種情況下,fifoReader 跳出循環(huán),關(guān)閉命名管道,并在終止前 unlink 備份文件。

    1. ssize_t count = read(fd, &next, sizeof(int));

  • 在讀入 4 字節(jié)整數(shù)后,fifoReader 檢查這個(gè)數(shù)是否為質(zhì)數(shù)。這個(gè)操作代表了一個(gè)生產(chǎn)級(jí)別的讀取器可能在接收到的字節(jié)數(shù)據(jù)上執(zhí)行的邏輯操作。在示例運(yùn)行中,在接收到的 768000 個(gè)整數(shù)中有 37682 個(gè)質(zhì)數(shù)。

重復(fù)運(yùn)行示例, fifoReader 將成功地讀取 fifoWriter 寫(xiě)入的所有字節(jié)。這不是很讓人驚訝的。這兩個(gè)進(jìn)程在相同的機(jī)器上執(zhí)行,從而可以不用考慮網(wǎng)絡(luò)相關(guān)的問(wèn)題。命名管道是一個(gè)可信且高效的 IPC 機(jī)制,因而被廣泛使用。

下面是這兩個(gè)程序的輸出,它們?cè)诓煌慕K端中啟動(dòng),但處于相同的工作目錄:

% ./fifoWriter768000 ints sent to the pipe.###% ./fifoReaderReceived ints: 768000, primes: 37682

消息隊(duì)列

管道有著嚴(yán)格的先入先出行為:***個(gè)被寫(xiě)入的字節(jié)將會(huì)***個(gè)被讀,第二個(gè)寫(xiě)入的字節(jié)將第二個(gè)被讀,以此類推。消息隊(duì)列可以做出相同的表現(xiàn),但它又足夠靈活,可以使得字節(jié)塊可以不以先入先出的次序來(lái)接收。

正如它的名字所提示的那樣,消息隊(duì)列是一系列的消息,每個(gè)消息包含兩部分:

  • 荷載,一個(gè)字節(jié)序列(在 C 中是 char)

  • 類型,以一個(gè)正整數(shù)值的形式給定,類型用來(lái)分類消息,為了更靈活的回收

看一下下面對(duì)一個(gè)消息隊(duì)列的描述,每個(gè)消息由一個(gè)整數(shù)類型標(biāo)記:

          +-+    +-+    +-+    +-+sender--->|3|--->|2|--->|2|--->|1|--->receiver          +-+    +-+    +-+    +-+

在上面展示的 4 個(gè)消息中,標(biāo)記為 1 的是開(kāi)頭,即最接近接收端,然后另個(gè)標(biāo)記為 2 的消息,***接著一個(gè)標(biāo)記為 3  的消息。假如按照嚴(yán)格的 FIFO 行為執(zhí)行,消息將會(huì)以 1-2-2-3 這樣的次序被接收。但是消息隊(duì)列允許其他收取次序。例如,消息可以被接收方以  3-2-1-2 的次序接收。

mqueue 示例包含兩個(gè)程序,sender 將向消息隊(duì)列中寫(xiě)入數(shù)據(jù),而 receiver 將從這個(gè)隊(duì)列中讀取數(shù)據(jù)。這兩個(gè)程序都包含的頭文件 queue.h 如下所示:

示例 4. 頭文件 queue.h
#define ProjectId 123#define PathName  "queue.h" /* any existing, accessible file would do */#define MsgLen    4#define MsgCount  6 typedef struct {   long type;                 /* must be of type long */   char payload[MsgLen + 1];  /* bytes in the message */  } queuedMessage;

上面的頭文件定義了一個(gè)名為 queuedMessage 的結(jié)構(gòu)類型,它帶有 payload(字節(jié)數(shù)組)和 type(整數(shù))這兩個(gè)域。該文件也定義了一些符號(hào)常數(shù)(使用 #define 語(yǔ)句),前兩個(gè)常數(shù)被用來(lái)生成一個(gè) key,而這個(gè) key 反過(guò)來(lái)被用來(lái)獲取一個(gè)消息隊(duì)列的 ID。ProjectId 可以是任何正整數(shù)值,而 PathName 必須是一個(gè)存在的、可訪問(wèn)的文件,在這個(gè)示例中,指的是文件 queue.h。在 senderreceiver 中,它們都有的設(shè)定語(yǔ)句為:

key_t key = ftok(PathName, ProjectId); /* generate key */int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 在效果上是消息隊(duì)列文件描述符的對(duì)應(yīng)物。

示例 5. sender 程序
#include  #include  #include #include #include #include "queue.h" void report_and_exit(const char* msg) {  [perror][6](msg);  [exit][7](-1); /* EXIT_FAILURE */} int main() {  key_t key = ftok(PathName, ProjectId);   if (key < 0) report_and_exit("couldn't get key...");    int qid = msgget(key, 0666 | IPC_CREAT);   if (qid < 0) report_and_exit("couldn't get queue id...");   char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */  int i;  for (i = 0; i < MsgCount; i++) {    /* build the message */    queuedMessage msg;    msg.type = types[i];    [strcpy][11](msg.payload, payloads[i]);     /* send the message */    msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */    [printf][10]("%s sent as type %i\n", msg.payload, (int) msg.type);  }  return 0;}

上面的 sender 程序?qū)l(fā)送出 6 個(gè)消息,每?jī)蓚€(gè)為一個(gè)類型:前兩個(gè)是類型 1,接著的連個(gè)是類型 2,***的兩個(gè)為類型 3。發(fā)送的語(yǔ)句:

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

被配置為非阻塞的(IPC_NOWAIT 標(biāo)志),是因?yàn)檫@里的消息體量上都很小。唯一的危險(xiǎn)在于一個(gè)完整的序列將可能導(dǎo)致發(fā)送失敗,而這個(gè)例子不會(huì)。下面的 receiver 程序也將使用 IPC_NOWAIT 標(biāo)志來(lái)接收消息。

示例 6. receiver 程序
#include  #include  #include #include #include "queue.h" void report_and_exit(const char* msg) {  [perror][6](msg);  [exit][7](-1); /* EXIT_FAILURE */}  int main() {   key_t key= ftok(PathName, ProjectId); /* key to identify the queue */  if (key < 0) report_and_exit("key not gotten...");    int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */  if (qid < 0) report_and_exit("no access to queue...");   int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */  int i;  for (i = 0; i < MsgCount; i++) {    queuedMessage msg; /* defined in queue.h */    if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)      [puts][12]("msgrcv trouble...");    [printf][10]("%s received as type %i\n", msg.payload, (int) msg.type);  }   /** remove the queue **/  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */    report_and_exit("trouble removing queue...");    return 0; }

這個(gè) receiver 程序不會(huì)創(chuàng)建消息隊(duì)列,盡管 API 盡管建議那樣。在 receiver 中,對(duì)

int qid = msgget(key, 0666 | IPC_CREAT);

的調(diào)用可能因?yàn)閹в?IPC_CREAT 標(biāo)志而具有誤導(dǎo)性,但是這個(gè)標(biāo)志的真實(shí)意義是如果需要就創(chuàng)建,否則直接獲取sender 程序調(diào)用 msgsnd 來(lái)發(fā)送消息,而 receiver 調(diào)用 msgrcv 來(lái)接收它們。在這個(gè)例子中,sender 以 1-1-2-2-3-3 的次序發(fā)送消息,但 receiver 接收它們的次序?yàn)?3-1-2-1-3-2,這顯示消息隊(duì)列沒(méi)有被嚴(yán)格的 FIFO 行為所拘泥:

% ./sendermsg1 sent as type 1msg2 sent as type 1msg3 sent as type 2msg4 sent as type 2msg5 sent as type 3msg6 sent as type 3 % ./receivermsg5 received as type 3msg1 received as type 1msg3 received as type 2msg2 received as type 1msg6 received as type 3msg4 received as type 2

上面的輸出顯示 senderreceiver 可以在同一個(gè)終端中啟動(dòng)。輸出也顯示消息隊(duì)列是持久的,即便 sender 進(jìn)程在完成創(chuàng)建隊(duì)列、向隊(duì)列寫(xiě)數(shù)據(jù)、然后退出的整個(gè)過(guò)程后,該隊(duì)列仍然存在。只有在 receiver 進(jìn)程顯式地調(diào)用 msgctl 來(lái)移除該隊(duì)列,這個(gè)隊(duì)列才會(huì)消失:

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

關(guān)于“Linux下如何使用管道和消息隊(duì)列”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。


新聞名稱:Linux下如何使用管道和消息隊(duì)列
文章來(lái)源:http://weahome.cn/article/jscjcg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部