這篇文章主要介紹如何使用Python編寫一個高效的端口掃描器,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
創(chuàng)新互聯(lián)企業(yè)建站,10年網站建設經驗,專注于網站建設技術,精于網頁設計,有多年建站和網站代運營經驗,設計師為客戶打造網絡企業(yè)風格,提供周到的建站售前咨詢和貼心的售后服務。對于網站建設、成都網站制作中不同領域進行深入了解和探索,創(chuàng)新互聯(lián)在網站建設中充分了解客戶行業(yè)的需求,以靈動的思維在網頁中充分展現,通過對客戶行業(yè)精準市場調研,為客戶提供的解決方案。python的五大特點是什么python的五大特點:1.簡單易學,開發(fā)程序時,專注的是解決問題,而不是搞明白語言本身。2.面向對象,與其他主要的語言如C++和Java相比, Python以一種非常強大又簡單的方式實現面向對象編程。3.可移植性,Python程序無需修改就可以在各種平臺上運行。4.解釋性,Python語言寫的程序不需要編譯成二進制代碼,可以直接從源代碼運行程序。5.開源,Python是 FLOSS(自由/開放源碼軟件)之一。
PyPortScannerpython多線程端口掃描器。
輸出示例:
Github此端口掃描器的源碼,文檔及詳細調用方法見Github PythonPortScanner by Yaokai。
背景有時候,在進行網絡相關的研究的時候,我們需要執(zhí)行一些有目的的參數測量。而端口掃描就是其中比較普遍也比較重要的一項。所謂的端口掃描,就是指通過TCP握手或者別的方式來判別一個給定主機上的某些端口是否處理開放,或者說監(jiān)聽的狀態(tài)?,F有的使用比較廣泛的端口掃描工具是nmap。毋庸置疑,nmap是一款非常強大且易于使用的軟件。但nmap是一款運行于terminal中的軟件,有時在別的代碼中調用并不是很方便,甚至沒有相應的庫。另外,nmap依賴的其他庫較多,在較老的系統(tǒng)中可能無法使用較新的nmap,這樣會造成掃描的不便。另外,nmap在掃描時需要root權限。基于這個原因,我用python2.7自帶的庫開發(fā)了一款高效的多線程端口掃描器來滿足使用需要。
具體實現I. 利用TCP握手連接掃描一個給定的(ip,port)地址對
為了實現端口掃描,我們首先明白如何使用python socket
與給定的(ip, port)
進行TCP握手。為了完成TCP握手,我們需要先初始化一個TCP socket。在python
中新建一個TCP socket的代碼如下:
TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #(1) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2) TCP_sock.settimeout(delay) #(3)
其中(1)
是初始化socket的代碼,socket.AF_INTE
參數表示IPv4 socket
,socket.SOCK_STREAM
參數表示TCP socket
。這樣我們就初始化了一個使用IPv4,TCP
協(xié)議的socket。
(2)
使用了socket.setsockopt()
來設置socket的另一些參數。socket.SOL_SOCKET
指定當前socket將使用setsockopt()
中后面的參數。socket.SO_REUSEPORT
表明當前socket使用了可復用端口的設置。socket.SO_REUSEPORT
具體含義可以參考我的另一篇文章。
(3)
將socket的連接超時時間設置為delay
變量所對應的時間(以秒為單位)。這么做是為了防止我們在一個連接上等待太久。
了解了如何新建一個socket,我們就可以開始對給定的(ip,port)
對進行TCP連接。代碼如下:
try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass
因為這是一個I/O操作,為了處理可能出現的異常,我們需要在try,except
塊處理這部分操作。其次,我們根據socket.connect_ex()
方法連接目標地址,通過該方法返回的狀態(tài)代碼來判斷連接是否成功。該方法返回0
代表連接成功。所以當返回值為0
的時候將當前端口記錄為打開狀態(tài)。反之記錄為關閉。另外,當連接操作出現異常的時候,我們也將端口記錄為關閉狀態(tài),因為其并不能被成功連接(可能因為防火墻或者數據包被過濾等原因)。
需要注意的是,在連接完成后我們一定要調用socket.close()
方法來關閉與遠程端口之間的TCP連接。否則的話我們的掃描操作可能會引起所謂的TCP連接懸掛問題(Hanging TCP connection)。
總結起來,TCP握手掃描的整體代碼如下:
""" Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __TCP_connect(ip, port_number, delay, output): # Initilize the TCP socket object TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass
II. 多線程掃描端口
單線程掃描雖然邏輯簡單,但無疑是及其低效的。因為在掃描過程中要進行大量的數據包的發(fā)送和接受,所以這是一個I/O密集型的操作。如果只是用單線程進行掃描的話,程序會在等待回復的過程中浪費大量的時間。因此多線程的操作是很有必要的。這里,一個很自然的思路就是為每一個端口單獨開一個線程進行掃描。
在這里我們將需要掃描的端口列表定為從Nmap中得到的前1000個使用頻率最高的端口:
復制代碼 代碼如下:
__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563,...]
對于一個給定的ip地址,掃描的過程是這樣的:
1. 取出一個端口
2. 新建一條線程,利用__TCP_connect()
函數對該(ip,port)
進行連接操作。
3. 調用thread.start()
和thread.join()
方法,使掃描的子線程開始工作并且命令主線程等待子線程死亡后再結束。
4. 重復這個過程直到所有的端口都被掃描過。
根據以上思路,多線程掃描的代碼如下:
""" Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < __thread_limit and port_index < len(__port_list): # Start threads thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) thread.start() # lock the thread until all threads complete thread.join() port_index = port_index + 1
其中__thread_limit參數是用來限制線程數目的。output是一個字典,以(port: status)
的形式保存了掃描的結果。thread.join()
保證了主線程只有在所有子線程都結束之后才會繼續(xù)執(zhí)行,從而確保了我們一定會掃描全部的端口。
III. 多線程掃描多個網站
在多線程掃描端口的同時,如果我們能夠多線程掃描多個網站,那么掃描的效率還將進一步提高。為了達到這個目的,我們需要另一個線程去管理一個網站對應的對其端口進行掃描的所有子線程。
除此之外,在這種情況下,我們必須刪去__scan_ports_helper()
中的thread.join()
。否則主線程就會被端口掃描子線程阻塞,我們也就無法多線程掃描多個網站了。
在不使用join()
的情況下,我們如何確保一個網站的掃描線程只有在完成對其全部端口的掃描之后才會返回呢?這里我使用的方法是檢測output
字典的長度。因為在全部掃描完成后,output
的長度一定與__port_list
的長度一致。
改變后的代碼如下:
def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < __thread_limit and port_index < len(__port_list): # Start threads thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) thread.start() port_index = port_index + 1 while (len(output) < len(self.target_ports)): continue
根據以上掃描線程的代碼,端口掃描的管理線程的代碼如下所示:
""" Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout """ def __scan_ports(websites, output_ip, delay): scan_result = {} for website in websites: website = str(website) scan_result[website] = {} thread = threading.Thread(target = __scan_ports_helper, args = (ip, delay, scan_result[website])) thread.start() # lock the script until all threads complete thread.join() return scan_result
至此,我們就完成了一個多線程端口掃描器的全部代碼。
IV. 總結!利用這些代碼掃描給定網站并輸出結果
處于輸出方便的考慮,我并沒有使用多線程掃描多個網站,同時對每個網站多線程掃描多個端口的方法。在這個例子中只進行了多線程掃描端口,但同時只掃描一個網站的操作。整合起來的代碼如下:
import sys import subprocess import socket import threading import time class PortScanner: # default ports to be scanned # or put any ports you want to scan here! __port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563] # default thread number limit __thread_limit = 1000 # default connection timeout time inseconds __delay = 10 """ Constructor of a PortScanner object Keyword arguments: target_ports -- the list of ports that is going to be scanned (default self.__port_list) """ def __init__(self, target_ports = None): # If target ports not given in the arguments, use default ports # If target ports is given in the arguments, use given port lists if target_ports is None: self.target_ports = self.__port_list else: self.target_ports = target_ports """ Return the usage information for invalid input host name. """ def __usage(self): print('python Port Scanner v0.1') print('please make sure the input host name is in the form of "something.com" or "http://something.com!"\n') """ This is the function need to be called to perform port scanning Keyword arguments: host_name -- the hostname that is going to be scanned message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def scan(self, host_name, message = ''): if 'http://' in host_name or 'https://' in host_name: host_name = host_name[host_name.find('://') + 3 : ] print('*' * 60 + '\n') print('start scanning website: ' + str(host_name)) try: server_ip = socket.gethostbyname(str(host_name)) print('server ip is: ' + str(server_ip)) except socket.error as e: # If the DNS resolution of a website cannot be finished, abort that website. #print(e) print('hostname %s unknown!!!' % host_name) self.__usage() return {} # May need to return specificed values to the DB in the future start_time = time.time() output = self.__scan_ports(server_ip, self.__delay, message) stop_time = time.time() print('host %s scanned in %f seconds' %(host_name, stop_time - start_time)) print('finish scanning!\n') return output """ Set the maximum number of thread for port scanning Keyword argument: num -- the maximum number of thread running concurrently (default 1000) """ def set_thread_limit(self, num): num = int(num) if num <= 0 or num > 50000: print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!') print('The scanning process will use default thread limit!') return self.__thread_limit = num """ Set the time out delay for port scanning in seconds Keyword argument: delay -- the time in seconds that a TCP socket waits until timeout (default 10) """ def set_delay(self, delay): delay = int(delay) if delay <= 0 or delay > 100: print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)') print('The scanning process will use the default delay time') return self.__delay = delay """ Print out the list of ports being scanned """ def show_target_ports(self): print ('Current port list is:') print (self.target_ports) """ Print out the delay in seconds that a TCP socket waits until timeout """ def show_delay(self): print ('Current timeout delay is :%d' %(int(self.__delay))) """ Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports_helper(self, ip, delay, output, message): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(self.target_ports): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < self.__thread_limit and port_index < len(self.target_ports): # Start threads thread = threading.Thread(target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message)) thread.start() port_index = port_index + 1 """ Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports(self, ip, delay, message): output = {} thread = threading.Thread(target = self.__scan_ports_helper, args = (ip, delay, output, message)) thread.start() # Wait until all port scanning threads finished while (len(output) < len(self.target_ports)): continue # Print openning ports from small to large for port in self.target_ports: if output[port] == 'OPEN': print(str(port) + ': ' + output[port] + '\n') return output """ Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __TCP_connect(self, ip, port_number, delay, output, message): # Initilize the TCP socket object TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) # Initilize a UDP socket to send scanning alert message if there exists an non-empty message if message != '': UDP_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) UDP_sock.sendto(str(message), (ip, int(port_number))) try: result = TCP_sock.connect_ex((ip, int(port_number))) if message != '': TCP_sock.sendall(str(message)) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass
以上是“如何使用Python編寫一個高效的端口掃描器”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!