本篇內容介紹了“怎么使用ZeroMQ消息庫在C和Python間共享數(shù)據(jù)”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
創(chuàng)新互聯(lián)建站專注于上杭企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,購物商城網(wǎng)站建設。上杭網(wǎng)站建設公司,為上杭等地區(qū)提供建站服務。全流程按需網(wǎng)站開發(fā),專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)建站專業(yè)和態(tài)度為您提供的服務
ZeroMQ 提供了一個更簡單的過程:
編寫一小段 C 代碼,從硬件讀取數(shù)據(jù),然后把發(fā)現(xiàn)的東西作為消息發(fā)送出去。
使用 Python 編寫接口,實現(xiàn)新舊基礎設施之間的對接。
Pieter Hintjens 是 ZeroMQ 項目發(fā)起者之一,他是個擁有 有趣視角和作品 的非凡人物。
本教程中,需要:
一個 C 編譯器(例如 GCC 或 Clang)
libzmq 庫
Python 3
ZeroMQ 的 Python 封裝
Fedora 系統(tǒng)上的安裝方法:
$ dnf install clang zeromq zeromq-devel python3 python3-zmq
Debian 和 Ubuntu 系統(tǒng)上的安裝方法:
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
如果有問題,參考對應項目的安裝指南(上面附有鏈接)。
因為這里針對的是個設想的場景,本教程虛構了包含兩個函數(shù)的操作庫:
fancyhw_init()
用來初始化(設想的)硬件
fancyhw_read_val()
用于返回從硬件讀取的數(shù)據(jù)
將庫的完整代碼保存到文件 libfancyhw.h
中:
#ifndef LIBFANCYHW_H#define LIBFANCYHW_H #include#include // This is the fictitious hardware interfacing library void fancyhw_init(unsigned int init_param){ srand(init_param);} int16_t fancyhw_read_val(void){ return (int16_t)rand();} #endif
這個庫可以模擬你要在不同語言實現(xiàn)的組件間交換的數(shù)據(jù),中間有個隨機數(shù)發(fā)生器。
下面從包含管理數(shù)據(jù)傳輸?shù)膸扉_始,逐步實現(xiàn) C 接口。
開始先加載必要的庫(每個庫的作用見代碼注釋):
// For printf()#include// For EXIT_*#include // For memcpy()#include // For sleep()#include #include #include "libfancyhw.h"
定義 main
函數(shù)和后續(xù)過程中必要的參數(shù):
int main(void){ const unsigned int INIT_PARAM = 12345; const unsigned int REPETITIONS = 10; const unsigned int PACKET_SIZE = 16; const char *TOPIC = "fancyhw_data"; ...
所有的庫都需要初始化。虛構的那個只需要一個參數(shù):
fancyhw_init(INIT_PARAM);
ZeroMQ 庫需要實打實的初始化。首先,定義對象 context
,它是用來管理全部的套接字的:
void *context = zmq_ctx_new(); if (!context){ printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE;}
之后定義用來發(fā)送數(shù)據(jù)的套接字。ZeroMQ 支持若干種套接字,各有其用。使用 publish
套接字(也叫 PUB
套接字),可以復制消息并分發(fā)到多個接收端。這使得你可以讓多個接收端接收同一個消息。沒有接收者的消息將被丟棄(即不會入消息隊列)。用法如下:
void *data_socket = zmq_socket(context, ZMQ_PUB);
套接字需要綁定到一個具體的地址,這樣客戶端就知道要連接哪里了。本例中,使用了 TCP 傳輸層(當然也有 其它選項,但 TCP 是不錯的默認選擇):
const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0){ printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE;}
下一步, 計算一些后續(xù)要用到的值。 注意下面代碼中的 TOPIC
,因為 PUB
套接字發(fā)送的消息需要綁定一個主題。主題用于供接收者過濾消息:
const size_t topic_size = strlen(TOPIC);const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
啟動一個發(fā)送消息的循環(huán),循環(huán) REPETITIONS
次:
for (unsigned int i = 0; i < REPETITIONS; i++){ ...
發(fā)送消息前,先填充一個長度為 PACKET_SIZE
的緩沖區(qū)。本庫提供的是 16 個位的有符號整數(shù)。因為 C 語言中 int
類型占用空間大小與平臺相關,不是確定的值,所以要使用指定寬度的 int
變量:
int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++){ buffer[j] = fancyhw_read_val();} printf("Read %u data values\n", PACKET_SIZE);
消息的準備和發(fā)送的第一步是創(chuàng)建 ZeroMQ 消息,為消息分配必要的內存空間??瞻椎南⑹怯糜诜庋b要發(fā)送的數(shù)據(jù)的:
zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size);if (rmi != 0){ printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break;}
現(xiàn)在內存空間已分配,數(shù)據(jù)保存在 ZeroMQ 消息 “信封”中。函數(shù) zmq_msg_data()
返回一個指向封裝數(shù)據(jù)緩存區(qū)頂端的指針。第一部分是主題,之后是一個空格,最后是二進制數(shù)。主題和二進制數(shù)據(jù)之間的分隔符采用空格字符。需要遍歷緩存區(qū)的話,使用類型轉換和 指針算法。(感謝 C 語言,讓事情變得直截了當。)做法如下:
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))
通過 data_socket
發(fā)送消息:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);if (rs != envelope_size){ printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break;}
使用數(shù)據(jù)之前要先解除封裝:
zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
C 語言不提供 垃圾收集 功能,用完之后記得要自己掃尾。發(fā)送消息之后結束程序之前,需要運行掃尾代碼,釋放分配的內存:
const int rc = zmq_close(data_socket); if (rc != 0){ printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE;} const int rd = zmq_ctx_destroy(context); if (rd != 0){ printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE;} return EXIT_SUCCESS;
保存下面完整的接口代碼到本地名為 hw_interface.c
的文件:
// For printf()#include// For EXIT_*#include // For memcpy()#include // For sleep()#include #include #include "libfancyhw.h" int main(void){ const unsigned int INIT_PARAM = 12345; const unsigned int REPETITIONS = 10; const unsigned int PACKET_SIZE = 16; const char *TOPIC = "fancyhw_data"; fancyhw_init(INIT_PARAM); void *context = zmq_ctx_new(); if (!context) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } void *data_socket = zmq_socket(context, ZMQ_PUB); const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } const size_t topic_size = strlen(TOPIC); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size); for (unsigned int i = 0; i < REPETITIONS; i++) { int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++) { buffer[j] = fancyhw_read_val(); } printf("Read %u data values\n", PACKET_SIZE); zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size); if (rmi != 0) { printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } memcpy(zmq_msg_data(&envelope), TOPIC, topic_size); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t)); const size_t rs = zmq_msg_send(&envelope, data_socket, 0); if (rs != envelope_size) { printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %s\n", i, TOPIC); sleep(1); } const int rc = zmq_close(data_socket); if (rc != 0) { printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } const int rd = zmq_ctx_destroy(context); if (rd != 0) { printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } return EXIT_SUCCESS;}
用如下命令編譯:
$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
如果沒有編譯錯誤,你就可以運行這個接口了。貼心的是,ZeroMQ PUB
套接字可以在沒有任何應用發(fā)送或接受數(shù)據(jù)的狀態(tài)下運行,這簡化了使用復雜度,因為這樣不限制進程啟動的次序。
運行該接口:
$ ./hw_interfaceTopic: fancyhw_data; topic size: 12; Envelope size: 45Read 16 data valuesMessage sent; i: 0, topic: fancyhw_dataRead 16 data valuesMessage sent; i: 1, topic: fancyhw_dataRead 16 data values......
輸出顯示數(shù)據(jù)已經(jīng)通過 ZeroMQ 完成發(fā)送,現(xiàn)在要做的是讓一個程序去讀數(shù)據(jù)。
現(xiàn)在已經(jīng)準備好從 C 程序向 Python 應用傳送數(shù)據(jù)了。
需要兩個庫幫助實現(xiàn)數(shù)據(jù)傳輸。首先是 ZeroMQ 的 Python 封裝:
$ python3 -m pip install zmq
另一個就是 struct 庫,用于解碼二進制數(shù)據(jù)。這個庫是 Python 標準庫的一部分,所以不需要使用 pip
命令安裝。
Python 程序的第一部分是導入這些庫:
import zmqimport struct
使用 ZeroMQ 時,只能向常量 TOPIC
定義相同的接收端發(fā)送消息:
topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic))
下一步,初始化上下文和套接字。使用 subscribe
套接字(也稱為 SUB
套接字),它是 PUB
套接字的天生伴侶。這個套接字發(fā)送時也需要匹配主題。
with zmq.Context() as context: socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5555") socket.setsockopt(zmq.SUBSCRIBE, topic) i = 0 ...
啟動一個無限循環(huán),等待接收發(fā)送到 SUB
套接字的新消息。這個循環(huán)會在你按下 Ctrl+C
組合鍵或者內部發(fā)生錯誤時終止:
try: while True: ... # we will fill this in next except KeyboardInterrupt: socket.close() except Exception as error: print("ERROR: {}".format(error)) socket.close()
這個循環(huán)等待 recv()
方法獲取的新消息,然后將接收到的內容從第一個空格字符處分割開,從而得到主題:
binary_topic, data_buffer = socket.recv().split(b' ', 1)
Python 此時尚不知道主題是個字符串,使用標準 ASCII 編解碼器進行解碼:
topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i))print("\ttopic: '{}'".format(topic))
下一步就是使用 struct
庫讀取二進制數(shù)據(jù),它可以將二進制數(shù)據(jù)段轉換為明確的數(shù)值。首先,計算數(shù)據(jù)包中數(shù)值的組數(shù)。本例中使用的 16 個位的有符號整數(shù)對應的是 struct
格式字符 中的 h
:
packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size))
知道數(shù)據(jù)包中有多少組數(shù)據(jù)后,就可以通過構建一個包含數(shù)據(jù)組數(shù)和數(shù)據(jù)類型的字符串,來定義格式了(比如“16h
”):
struct_format = "{:d}h".format(packet_size)
將二進制數(shù)據(jù)串轉換為可直接打印的一系列數(shù)字:
data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data))
下面是 Python 實現(xiàn)的完整的接收端:
#! /usr/bin/env python3 import zmqimport struct topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic)) with zmq.Context() as context: socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5555") socket.setsockopt(zmq.SUBSCRIBE, topic) i = 0 try: while True: binary_topic, data_buffer = socket.recv().split(b' ', 1) topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i)) print("\ttopic: '{}'".format(topic)) packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size)) struct_format = "{:d}h".format(packet_size) data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data)) i += 1 except KeyboardInterrupt: socket.close() except Exception as error: print("ERROR: {}".format(error)) socket.close()
將上面的內容保存到名為 online_analysis.py
的文件。Python 代碼不需要編譯,你可以直接運行它。
運行輸出如下:
$ ./online_analysis.pyReading messages with topic: b'fancyhw_data'Message 0: topic: 'fancyhw_data' packet size: 16 data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)Message 1: topic: 'fancyhw_data' packet size: 16 data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)......
“怎么使用ZeroMQ消息庫在C和Python間共享數(shù)據(jù)”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質量的實用文章!