這篇文章給大家分享的是有關(guān)使用python實現(xiàn)簡單聊天室功能的案例的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
成都創(chuàng)新互聯(lián)公司專注于網(wǎng)站建設(shè)|網(wǎng)站維護公司|優(yōu)化|托管以及網(wǎng)絡(luò)推廣,積累了大量的網(wǎng)站設(shè)計與制作經(jīng)驗,為許多企業(yè)提供了網(wǎng)站定制設(shè)計服務(wù),案例作品覆蓋成都木屋等行業(yè)。能根據(jù)企業(yè)所處的行業(yè)與銷售的產(chǎn)品,結(jié)合品牌形象的塑造,量身策劃品質(zhì)網(wǎng)站。具體內(nèi)容如下
公共模塊
首先寫一個公共類,用字典的形式對數(shù)據(jù)的收發(fā),并且進行封裝,導(dǎo)入struct解決了TCP的粘包問題,并在公共類中進行了異常處理
import socket,struct,json def send_dic(c,dic): dic_json=json.dumps(dic) dic_json_length=len(dic_json.encode('utf-8')) struct_dic_json_length=struct.pack('q',dic_json_length) c.send(struct_dic_json_length) c.send(dic_json.encode('utf-8')) def get_dic(c): try: dic_length=struct.unpack('q',c.recv(8))[0] except: return {'msg':'exit'} try: dic_json=c.recv(dic_length).decode('utf-8') except: return {'msg':'exit'} dic=json.loads(dic_json) return dic
服務(wù)器端
import socket from concurrent.futures import ThreadPoolExecutor import lib.common #導(dǎo)入寫在lib里面的公共模塊,代碼在上面 import re #進行開啟服務(wù)器等一系列操作 s=socket.socket() ip_host=('127.0.0.1',8000) s.bind(ip_host) s.listen() #創(chuàng)建一個列表,用來保存客戶端及其信息 c_list=[] def get_send_msg(c,addr,c_list): while True: tag=False dic=lib.common.get_dic(c) if dic['msg']=='exit': #如果接受出異常,或是客戶端主動輸入為exit,在列表中移除客戶端信息 for i in c_list: if i['addr']==addr: c_list.remove(i) break if dic['is_siliao']==True: #客戶端發(fā)來的字典里面如果is_siliao==True,進入私聊代碼 for i in c_list: #遍歷列表,并用正則表達式截取信息 li=re.findall('(.*?)@%s(.*)'%i['name'],dic['msg']) if len(li)!=0: dic['msg']=li[0][0]+li[0][1] lib.common.send_dic(i['client'],dic) tag=True break if tag: continue #如果不是私聊,進入下面代碼,在聊天室進行群聊 for i in c_list: if i['addr']!=addr: lib.common.send_dic(i['client'],dic) while True: #用線程池,進行多次連接 print('客戶端等待連接') c,addr=s.accept() print('%s連接了服務(wù)器'%addr[1]) name=c.recv(1024).decode('utf-8')#進行第一次接受,接受客戶端的名字,為私聊的功能做準備 c_dic={'addr':addr,'client':c,'name':name}#將客戶端的信息保存在字典中 c_list.append(c_dic)#將字典加入列表 t=ThreadPoolExecutor() t.submit(get_send_msg,c,addr,c_list)
客戶端:
import lib.common from concurrent.futures import ThreadPoolExecutor c=socket.socket() ip_host=('127.0.0.1',8000) c.connect(ip_host) def send_msg(c,name): while True: msg = input ('>>:').strip () is_siliao=False if not msg: continue # if msg.startswith('@'): if '@'in msg: is_siliao=True dic = {'msg': msg,'name':name,'is_siliao':is_siliao} lib.common.send_dic(c,dic) if msg=='exit': c.close () break def get_msg(c): while True: dic=lib.common.get_dic(c) if dic['is_siliao']==True: print('來自%s的私聊:%s'%(dic['name'],dic['msg'])) continue print('%s:%s'%(dic['name'],dic['msg'])) t=ThreadPoolExecutor() name=input('你的聊天名字:').strip() c.send(name.encode('utf-8')) t.submit(send_msg,c,name) t.submit(get_msg,c)
運行代碼截圖:
感謝各位的閱讀!關(guān)于“使用python實現(xiàn)簡單聊天室功能的案例”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!