使用 pyserial 就可以處理串口通信,這個(gè)包是跨平臺(tái)的。
創(chuàng)新互聯(lián)是一家專(zhuān)注于成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)與策劃設(shè)計(jì),南關(guān)網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專(zhuān)注于網(wǎng)站建設(shè)十載,網(wǎng)設(shè)計(jì)領(lǐng)域的專(zhuān)業(yè)建站公司;建站業(yè)務(wù)涵蓋:南關(guān)等地區(qū)。南關(guān)做網(wǎng)站價(jià)格咨詢(xún):18982081108
示例程序在這里:
import?serial
#?創(chuàng)建serial實(shí)例
serialport?=?serial.Serial()
serialport.port?=?'COM1'
serialport.baudrate?=?9600
serialport.parity?=?'N'
serialport.bytesize?=?8
serialport.stopbits?=?1
serialport.timeout?=?0.6
try:
serialport.open()
serialport.setDTR(True)
serialport.setRTS(True)
except?Exception,?ex:
print?ex
#?發(fā)送數(shù)據(jù)
serialport.write(raw_data)
#?根據(jù)項(xiàng)目要求,可以開(kāi)一個(gè)線(xiàn)程掃描接收數(shù)據(jù)
python里面使用serial庫(kù)來(lái)操作串口,serial的使用流程跟平常的類(lèi)似,也是打開(kāi)、關(guān)閉、讀、寫(xiě)
一般就是設(shè)置端口,波特率。
使用serial.Serial創(chuàng)建實(shí)體的時(shí)候會(huì)去打開(kāi)串口,之后可以使用is_open開(kāi)判斷下是否串口是否打開(kāi)正常。
使用ser.close即可關(guān)閉串口
數(shù)據(jù)的寫(xiě)使用ser.write接口,如果寫(xiě)的是十六進(jìn)制的數(shù)據(jù)使用bytearray來(lái)定義,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])
讀數(shù)據(jù)使用ser.read接口,一般會(huì)先使用in_waiting來(lái)判斷下是否有數(shù)據(jù),然后開(kāi)始讀
下面舉一個(gè)例子,說(shuō)明下我們?cè)趯?shí)際的使用情況。
一般會(huì)單獨(dú)創(chuàng)建一個(gè)進(jìn)程來(lái)作為數(shù)據(jù)的接收,然后再配合上標(biāo)記位或者信號(hào)量來(lái)處理邏輯
由于測(cè)試工作的需要,在C端產(chǎn)品上經(jīng)常使用串口進(jìn)行通信,而測(cè)試腳本大部分時(shí)候又采用python編寫(xiě),于是就不得不了解并熟悉python下的串口通信實(shí)現(xiàn)方法了,整理如下以備隨時(shí)使用:
一、說(shuō)明
pyserial封裝了python環(huán)境下對(duì)串口的訪問(wèn),其兼容各種平臺(tái),并有統(tǒng)一的操作接口。通過(guò)python屬性訪問(wèn)串口設(shè)置,并可對(duì)串口的各種配置參數(shù)(如串口名,波特率、停止校驗(yàn)位、流控、超時(shí)等等)做修改,再進(jìn)行串口通信的類(lèi)與接口封裝后,非常方便地被調(diào)用和移植。
二、模塊安裝
pip insatll pyserial
三、初始化與參數(shù)說(shuō)明
import serial
ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)
下面看看 serial.Serial 原生類(lèi)
四、不同平臺(tái)下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB連接串行口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用樹(shù)莓派的GPIO口連接串行口ser=serial.Serial(1,9600,timeout=0.5)#winsows系統(tǒng)使用COM1口連接串行口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系統(tǒng)使用COM1口連接串行口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系統(tǒng)使用COM1口連接串行口
五、串口屬性
ser.name?#串口名稱(chēng)
ser.port?#端口號(hào)
ser.baudrate #波特率
ser.bytesize #字節(jié)大小
ser.parity #校驗(yàn)位N-無(wú)校驗(yàn),E-偶校驗(yàn),O-奇校驗(yàn)
ser.stopbits #停止位
ser.timeout #讀超時(shí)設(shè)置
ser.writeTimeout #寫(xiě)超時(shí)
ser.xonxoff #軟件流控
ser.rtscts #硬件流控
ser.dsrdtr #硬件流控
ser.interCharTimeout #字符間隔超時(shí)?
六、串口常用方法
isOpen():查看端口是否被打開(kāi)。
open() :打開(kāi)端口‘。
close():關(guān)閉端口。
read(size=1):從端口讀字節(jié)數(shù)據(jù)。默認(rèn)1個(gè)字節(jié)。
read_all():從端口接收全部數(shù)據(jù)。
write(data):向端口寫(xiě)數(shù)據(jù)。
readline():讀一行數(shù)據(jù)。
readlines():讀多行數(shù)據(jù)。
in_waiting():返回輸入緩存中的字節(jié)數(shù)。
out_waiting():返回輸出緩存中的字節(jié)數(shù)。
flush():等待所有數(shù)據(jù)寫(xiě)出。
flushInput():丟棄接收緩存中的所有數(shù)據(jù)。
flushOutput():終止當(dāng)前寫(xiě)操作,并丟棄發(fā)送緩存中的數(shù)據(jù)。
sendBreadk(duration=0.25):發(fā)送BREAK條件,并于duration時(shí)間之后返回IDLE
setBreak(level=True):根據(jù)level設(shè)置break條件。
setRTS(level=True):設(shè)置請(qǐng)求發(fā)送(RTS)的控制信號(hào)
setDTR(level=True):設(shè)置數(shù)據(jù)終端準(zhǔn)備就緒的控制信號(hào)
七、類(lèi)與接口封裝
import time
import serial
import serial.tools.list_ports
# 串口操作類(lèi)
class serialCommunication(object):
def __init__(self, port, bps, timeout):# 可配置更多參數(shù)
? ? port_list =self.show_usable_com()
if len(port_list) 0:
if portnot in port_list:
self.port = port_list[0]
else:
self.port = port
else:
print("no usable serial, please plugin your serial board")
return
? ? self.bps = bps
self.timeout = timeout
try:
# 初始化串口,并得到串口對(duì)象,根據(jù)需要可拓展更多參數(shù)
? ? ? ? self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)
except Exception as e:# 拋出異常
? ? ? ? print("Exception={}".format(e))
# 顯示可用串口列表
@staticmethod
def show_usable_com():
serialport_list = []
portInfo_list =list(serial.tools.list_ports.comports())
if len(portInfo_list) =0:
print("can not find any serial port!")
else:
print(portInfo_list)
for i in range(len(portInfo_list)):
plist =list(portInfo_list[i])
print(plist)
serialport_list.append(plist[0])
print(serialport_list)
return serialport_list
# 輸出串口基本信息
def serial_infor(self):
print(self.ser.name)# 設(shè)備名字
? ? print(self.ser.port)# 讀或者寫(xiě)端口
? ? print(self.ser.baudrate)# 波特率
? ? print(self.ser.bytesize)# 字節(jié)大小
? ? print(self.ser.parity)# 校驗(yàn)位
? ? print(self.ser.stopbits)# 停止位
? ? print(self.ser.timeout)# 讀超時(shí)設(shè)置
? ? print(self.ser.writeTimeout)# 寫(xiě)超時(shí)
? ? print(self.ser.xonxoff)# 軟件流控
? ? print(self.ser.rtscts)# 軟件流控
? ? print(self.ser.dsrdtr)# 硬件流控
? ? print(self.ser.interCharTimeout)# 字符間隔超時(shí)
# 打開(kāi)串口
def serial_open(self):
try:
if not self.ser.isOpen():
self.ser.open()
except Exception as e:# 拋出異常
? ? ????print("serial_open Exception={}".format(e))
self.ser.close()
# 讀取指定大小的數(shù)據(jù)
# 從串口讀size個(gè)字節(jié)。如果指定超時(shí),則可能在超時(shí)后返回較少的字節(jié);如果沒(méi)有指定超時(shí),則會(huì)一直等到收完指定的字節(jié)數(shù)。
def serial_read_with_size(self, size):
try:
self.serial_open()
return self.ser.read(size).decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 讀取當(dāng)前串口緩存中的所有數(shù)據(jù)
def serial_read_data(self):
try:
self.serial_open()
datalen =self.ser.inWaiting()
? ? ? ? if datalen ==0:
return None
? ? ? ? return self.ser.read(datalen).decode("utf-8")
except Exception as e:
print("serial_read_data Exception={}".format(e))
self.ser.close()
# 讀串口全部數(shù)據(jù),注意timeout的設(shè)置
# 在設(shè)定的timeout時(shí)間范圍內(nèi),如果讀取的字節(jié)數(shù)據(jù)是有效的(就是非空)那就直接返回,
# 否則一直會(huì)等到設(shè)定的timeout時(shí)間并返回這段時(shí)間所讀的全部字節(jié)數(shù)據(jù)。
def serial_read_all(self):
try:
self.serial_open()
return self.ser.read_all().decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 讀一行數(shù)據(jù)
# 使用readline()時(shí)應(yīng)該注意:打開(kāi)串口時(shí)應(yīng)該指定超時(shí),否則如果串口沒(méi)有收到新行,則會(huì)一直等待。
# 如果沒(méi)有超時(shí),readline會(huì)報(bào)異常。
def serial_read_line(self):
try:
self.serial_open()
return self.ser.readline().decode("utf-8")
except Exception as e:
print("serial_read_line Exception={}".format(e))
? ? ? ? self.ser.close()
# 讀多行數(shù)據(jù),返回行列表
def serial_read_lines(self):
try:
self.serial_open()
return self.ser.readlines().decode("utf-8")
except Exception as e:
print("serial_read_lines Exception={}".format(e))
self.ser.close()
# 寫(xiě)數(shù)據(jù)
def serial_write_data(self, data):
try:
self.serial_open()
self.ser.flushOutput()
data_len =self.ser.write(data.encode('utf-8'))
return data_len
except Exception as e:
print("serial_write_data Exception={}".format(e))
return 0
# 寫(xiě)行數(shù)據(jù),注意參數(shù)差異
def serial_write_lines(self, lines):
self.ser.writelines(lines)
# 清除串口緩存
def serial_clean(self):
try:
if self.ser.isOpen():
self.ser.flush()
except Exception as e:
print("serial_clean Exception={}".format(e))
# 關(guān)閉串口
def serial_close(self):
try:
if self.ser.isOpen():
self.ser.close()
except Exception as e:
print("serial_clean Exception={}".format(e))
if __name__ =='__main__':
testSerial = serialCommunication("COM10", 1500000, 0.5)
testSerial.serial_open()
testSerial.serial_infor()
testSerial.serial_write_data("ifconfig eth0\n")
time.sleep(0.1)
data = testSerial.serial_read_all()
print(data)
testSerial.serial_close()
八、其他
1)ser.VERSION表示pyserial版本;?另外,ser.name表示設(shè)備名稱(chēng)
2)端口設(shè)置可以被讀入字典,也可從字典加載設(shè)置:
getSettingDict():返回當(dāng)前串口設(shè)置的字典
applySettingDict(d):應(yīng)用字典到串口設(shè)置
3)?Readline()是讀一行,以/n結(jié)束,要是沒(méi)有/n就一直讀,阻塞。注意:打開(kāi)串口時(shí)應(yīng)該指定超時(shí),否則如果串口沒(méi)有收到新行,則會(huì)一直等待。
4)serial.read_all 與?serial.read_all()區(qū)別
serial.read_all:讀取串口所有的參數(shù)信息
serial.read_all():超時(shí)時(shí)間內(nèi)從串口讀取的所有數(shù)據(jù)
5) 異常信息
exception serial.SerialException
exception serial.SerialTimeoutException
用ser.isOpen()查看返回False,說(shuō)明ser.close()起作用了啊。用管理員身份打開(kāi)cmd,再執(zhí)行腳本試試?