這篇文章主要講解了“怎么用Python編寫簡單的聊天程序”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用Python編寫簡單的聊天程序”吧!
成都創(chuàng)新互聯(lián)是一家專業(yè)提供臨安企業(yè)網站建設,專注與成都網站設計、成都做網站、H5頁面制作、小程序制作等業(yè)務。10年已為臨安眾多企業(yè)、政府機構等服務。創(chuàng)新互聯(lián)專業(yè)網站制作公司優(yōu)惠進行中。
x01 服務端的建立
首先,在服務端,使用socket進行消息的接受,每接受一個socket的請求,就開啟一個新的線程來管理消息的分發(fā)與接受,同時,又存在一個handler來管理所有的線程,從而實現對聊天室的各種功能的處理
x02 客戶端的建立
客戶端的建立就要比服務端簡單多了,客戶端的作用只是對消息的發(fā)送以及接受,以及按照特定的規(guī)則去輸入特定的字符從而實現不同的功能的使用,因此,在客戶端這里,只需要去使用兩個線程,一個是專門用于接受消息,一個是專門用于發(fā)送消息的
至于為什么不用一個呢,那是因為,只用一個的話,當接受了消息,在發(fā)送之前接受消息的處于阻塞狀態(tài),同理,發(fā)送消息也是,那么要是將這兩個功能放在一個地方實現,就會導致沒有辦法連續(xù)發(fā)送或者接受消息了
服務端實現
import json import threading from socket import * from time import ctime class PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = ('', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print('等待客戶連接...\r\n') try: while True: cs, caddr = self.__socket.accept() # 利用handler來管理線程,實現線程之間的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: pass class ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多線程管理會話 def run(self): try: print('...連接來自于:', self.__caddr) data = '歡迎你到來PY_CHATTING!請輸入你的很cooooool的昵稱(不能帶有空格喲`)\r\n' self.__cs.sendall(bytes(data, 'utf-8')) while True: data = self.__cs.recv(self.__buf).decode('utf-8') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close() class ChattingHandler: __help_str = "[ SYSTEM ]\r\n" \ "輸入/ls,即可獲得所有登陸用戶信息\r\n" \ "輸入/h,即可獲得幫助\r\n" \ "輸入@用戶名 (注意用戶名后面的空格)+消息,即可發(fā)動單聊\r\n" \ "輸入/i,即可屏蔽群聊信息\r\n" \ "再次輸入/i,即可取消屏蔽\r\n" \ "所有首字符為/的信息都不會發(fā)送出去" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的記錄 nickname = "SOMEONE" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket與username之間的映射關系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += " " # 廣播某玩家退出聊天室 self.broadcast_system_msg(nickname + "離開了PY_CHATTING") # 管理用戶輸入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js['type'] == "login": if js['msg'] not in self.__user_name_to_socket: if ' ' in js['msg']: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '賬號不能夠帶有空格' }), cs) else: self.__user_name_to_socket[js['msg']] = cs self.__socket_to_user_name[cs] = js['msg'] self.__user_name_to_broadcast_state[js['msg']] = True self.send_to(json.dumps({ 'type': 'login', 'success': True, 'msg': '昵稱建立成功,輸入/ls可查看所有在線的人,輸入/help可以查看幫助(所有首字符為/的消息都不會發(fā)送)' }), cs) # 廣播其他人,他已經進入聊天室 self.broadcast_system_msg(js['msg'] + "已經進入了聊天室") else: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '賬號已存在' }), cs) # 若玩家處于屏蔽模式,則無法發(fā)送群聊消息 elif js['type'] == "broadcast": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js['msg'], cs) else: self.send_to(json.dumps({ 'type': 'broadcast', 'msg': '屏蔽模式下無法發(fā)送群聊信息' }), cs) elif js['type'] == "ls": self.send_to(json.dumps({ 'type': 'ls', 'msg': self.get_all_login_user_info() }), cs) elif js['type'] == "help": self.send_to(json.dumps({ 'type': 'help', 'msg': self.__help_str }), cs) elif js['type'] == "sendto": self.single_chatting(cs, js['nickname'], js['msg']) elif js['type'] == "ignore": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = "通常模式" else: msg = "屏蔽模式" self.send_to(json.dumps({ 'type': 'ignore', 'success': True, 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切換成功,現在是" + msg) }), cs) else: self.send_to({ 'type': 'ignore', 'success': False, 'msg': '切換失敗' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ 'type': 'single', 'msg': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ 'type': 'single', 'msg': '該用戶不存在' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = "[ SYSTEM ] ALIVE USER : \r\n" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + ",\r\n" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, 'utf-8')) def broadcast_system_msg(self, msg): data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg) js = json.dumps({ 'type': 'system_msg', 'msg': data }) # 屏蔽了群聊的玩家也可以獲得系統(tǒng)的群發(fā)信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def broadcast(self, msg, cs): data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ 'type': 'broadcast', 'msg': data }) # 沒有的登陸的玩家無法得知消息,屏蔽了群聊的玩家也沒辦法獲取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def main(): server = PyChattingServer() server.start_session() main()
客戶端的實現
import json import threading from socket import * is_login = False is_broadcast = True class ClientReceiveThread(threading.Thread): __buf = 1024 def __init__(self, cs): super(ClientReceiveThread, self).__init__() self.__cs = cs def run(self): self.receive_msg() def receive_msg(self): while True: msg = self.__cs.recv(self.__buf).decode('utf-8') if not msg: break js = json.loads(msg) if js['type'] == "login": if js['success']: global is_login is_login = True print(js['msg']) elif js['type'] == "ignore": if js['success']: global is_broadcast if is_broadcast: is_broadcast = False else: is_broadcast = True print(js['msg']) else: if not is_broadcast: print("[現在處于屏蔽模式]") print(js['msg']) class ClientSendMsgThread(threading.Thread): def __init__(self, cs): super(ClientSendMsgThread, self).__init__() self.__cs = cs def run(self): self.send_msg() # 根據不同的輸入格式來進行不同的聊天方式 def send_msg(self): while True: js = None msg = input() if not is_login: js = json.dumps({ 'type': 'login', 'msg': msg }) elif msg[0] == "@": data = msg.split(' ') if not data: print("請重新輸入") break nickname = data[0] nickname = nickname.strip("@") if len(data) == 1: data.append(" ") js = json.dumps({ 'type': 'sendto', 'nickname': nickname, 'msg': data[1] }) elif msg == "/help": js = json.dumps({ 'type': 'help', 'msg': None }) elif msg == "/ls": js = json.dumps({ 'type': 'ls', 'msg': None }) elif msg == "/i": js = json.dumps({ 'type': 'ignore', 'msg': None }) else: if msg[0] != '/': js = json.dumps({ 'type': 'broadcast', 'msg': msg }) if js is not None: self.__cs.sendall(bytes(js, 'utf-8')) def main(): buf = 1024 # 改變這個的地址,變成服務器的地址,那么只要部署到服務器上就可以全網使用了 address = ("127.0.0.1", 12231) cs = socket(AF_INET, SOCK_STREAM, 0) cs.connect(address) data = cs.recv(buf).decode("utf-8") if data: print(data) receive_thread = ClientReceiveThread(cs) receive_thread.start() send_thread = ClientSendMsgThread(cs) send_thread.start() while True: pass main()
感謝各位的閱讀,以上就是“怎么用Python編寫簡單的聊天程序”的內容了,經過本文的學習后,相信大家對怎么用Python編寫簡單的聊天程序這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!