本節(jié)筆者將按照Pocsuite框架結(jié)構(gòu)以及工程化實(shí)踐,來實(shí)現(xiàn)一款自己的PoC框架。為了開一個(gè)好頭,我們先取一個(gè)好聽的名字,想威武霸氣一些可以取上古神器之類的,諸如軒轅夏禹赤霄干將,若懷著對(duì)游戲的熱愛也可以有山丘之王(Mountain King)劍圣(BladeMaster)月之女神(Priess Of the moon)。由于筆者比較懶,我們就取一個(gè)樸素的名字: AirPoc ,中文名叫它"空氣炮"吧。
成都創(chuàng)新互聯(lián)公司主要從事網(wǎng)站設(shè)計(jì)、做網(wǎng)站、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)翁牛特,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
名稱取好了,我們還要幻想一下大餅。這里請(qǐng)充分發(fā)揮想象力,幻想它的功能,你要記住,沒有我們實(shí)現(xiàn)不了的功能,如果有,打死產(chǎn)品manager即可。
這里不妨開下腦洞,為了組建兔子安全聯(lián)盟,我們計(jì)劃開發(fā)一款基于區(qū)塊鏈的PoC驗(yàn)證框架AirPoc,限定只對(duì)"兔子安全聯(lián)盟”范圍內(nèi)的網(wǎng)站進(jìn)行安全檢查,由一個(gè)AirPoc節(jié)點(diǎn)檢查出了存在漏洞的地址,將URL和PoC共享到區(qū)塊中,再由隨機(jī)的其他節(jié)點(diǎn)驗(yàn)證,驗(yàn)證成功則獲得"空氣幣",而被檢測(cè)到的網(wǎng)站所有者則需要支付"空氣幣"作為報(bào)酬。
雖然只是暫時(shí)的幻想,但是產(chǎn)品小哥哥也略帶激動(dòng)整理出了我們需要的功能。
使用簡(jiǎn)單,不要有太多的命令,可以跨平臺(tái)使用
人多力量大,能讓更多人參與進(jìn)來的
能簡(jiǎn)單操作就能內(nèi)置到其他產(chǎn)品上
驗(yàn)證速度與驗(yàn)證準(zhǔn)確率極高!
我也不知道什么好,總之你跑起來能出東西就行!
當(dāng)然,這位產(chǎn)品小哥哥可能怕被打,沒有將分布式,區(qū)塊鏈的概念加入進(jìn)來。
具體細(xì)節(jié)
下面就由筆者來具體實(shí)現(xiàn)由筆者兼職的產(chǎn)品manager隨便一想(挖坑)的東西。我們逐一分析問題,并給出最后的解決方案。
說到使用簡(jiǎn)單,我們就任性的選擇使用Python了,不信你看看Python之父的頭發(fā)。在安裝了Python之后,也可以一份代碼多處使用,但為了足夠的簡(jiǎn)單與原生,我們決定盡量少使用Python的第三方包。而目前Python最新版為3.7,我們就以此為例。
國外的眾多開源安全項(xiàng)目都有不少人參與,像Metasploit
Sqlmap
Routersploit
能貢獻(xiàn)一份代碼到上面可能是安全研究人員最想做的事情吧。
所以筆者有個(gè)想法是AirPoc的PoC倉庫可以開源到GitHub,并且能夠在線調(diào)用上面的PoC,這樣也不會(huì)為了PoC的更新而煩惱了。
內(nèi)置到其他產(chǎn)品也更是容易,如果是Python類的軟件,可以直接把AirPoc當(dāng)做包來調(diào)用,如果其他軟件,AirPoc可以開放一個(gè)RPC接口提供使用,如果不想要Python的環(huán)境,也可以通過pyinstaller之類的工具打包,我們的設(shè)計(jì)原則是盡量不依賴其他第三方庫,所以也會(huì)避免很多奇奇怪怪的問題。
想要實(shí)現(xiàn)驗(yàn)證速度與驗(yàn)證準(zhǔn)確率極高,我們要做好多線程或協(xié)程的并發(fā)模型,這里我們會(huì)在后面在詳細(xì)敘述。
最后,"我也不知道什么好,總之你跑起來能出東西就行!",如果上面的事情我們都做好了,這個(gè)應(yīng)該就是水到渠成的了~
AirPoc的框架
在完成這個(gè)"宏偉計(jì)劃"之前,我們也需要設(shè)計(jì)一下整體的代碼框架。作為一名代碼潔癖患者,一個(gè)良好的代碼結(jié)構(gòu),是萬里長(zhǎng)征的第一步。我們建立如下的目錄結(jié)構(gòu),env是虛擬環(huán)境,建立兩個(gè)目錄 lib 、 pocs , lib 用于存儲(chǔ)之后的相關(guān)核心文件, pocs 用于存儲(chǔ)poc文件,和一個(gè)文件 main.py 用作初始入口。
就像蓋大樓需要打好地基,接下來完成基礎(chǔ)框架,我們可以先不用寫具體的功能,但是了解作為"地基"的函數(shù)的意義。如下,在 main.py 文件中如下代碼,一個(gè)初始的框架就完成了。
import os import time def banner(): msg = ''' ___ _ _____ _____ _____ _____ / | | | | _ \ | _ \ / _ \ / ___| / /| | | | | |_| | | |_| | | | | | | | / / | | | | | _ / | ___/ | | | | | | / / | | | | | | \ \ | | | |_| | | |___ /_/ |_| |_| |_| \_\ |_| \_____/ \_____| {} '''.format(version) print(msg) def init(config: dict): print("[*] target:{}".format(config["url"])) def end(): print("[*] shutting down at {0}".format(time.strftime("%X"))) def start(): pass def main(): banner() config = { "url": "https://www.seebug.org/" } init(config) start() end() if __name__ == '__main__': version = "v0.00000001" main()
但是,正如你所見,版本號(hào)和我的比特幣錢包的數(shù)字竟然差不多,我們還要給它加些料。
在我們軟件的初始化的工程中,我們需要得到很多環(huán)境相關(guān)的信息。比如當(dāng)前執(zhí)行的路徑是哪? poc目錄在哪?我們輸出結(jié)果文件輸出到哪個(gè)路徑等等。
它們有一個(gè)共同的特定是,它們只需要加載一次,在后面使用中直接拿來用就行了。這種模式在軟件設(shè)計(jì)模式中有一個(gè)單獨(dú)的名詞,"單例模式"。
幸運(yùn)的是python的模塊就是天然的單例模式,因?yàn)槟K在第一次導(dǎo)入時(shí),會(huì)生成 .pyc 文件,當(dāng)?shù)诙螌?dǎo)入時(shí),就會(huì)直接加載 .pyc 文件,而不會(huì)再次執(zhí)行模塊代碼。因此,我們只需把相關(guān)的函數(shù)和數(shù)據(jù)定義在一個(gè)模塊中,就可以獲得一個(gè)單例對(duì)象了。
我們?cè)? lib 目錄里面新建一個(gè) data.py 用于存儲(chǔ)這些信息。同時(shí)將版本信息也放到這里來。
import os PATHS_ROOT = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../") PATHS_POCS = os.path.join(PATHS_ROOT, "pocs") PATHS_OUTPUT = os.path.join(PATHS_ROOT, "output") VERSION = "v0.0000001"
為了更好的來表示這些常量,我們用PEP8標(biāo)準(zhǔn)里的規(guī)范,統(tǒng)一約定用大寫和下劃線來表示常量。為了說明與之前的區(qū)別,我們象征性的將VERSION減一個(gè)0,來表達(dá)我們的比特幣又增長(zhǎng)了10倍。
在解決完我們相關(guān)的環(huán)境問題后,我們?cè)诳纯慈绾蝿?dòng)態(tài)加載模塊。在具體細(xì)節(jié)里我們說過,我們期望PoC能夠從本地或者遠(yuǎn)程網(wǎng)站(如GitHub)上加載。
這里又得分成兩種情況,如果是通過文件路徑加載動(dòng)態(tài)加載的模塊,可以直接用 __import__() 來加載,但是如果要遠(yuǎn)程加載,可能就又會(huì)復(fù)雜一點(diǎn),根據(jù)python的相關(guān)文檔,我們要自己實(shí)現(xiàn)"查找器"與"加載器" https://docs.python.org/zh-cn/3/reference/import.html 。
當(dāng)然,你也可以從遠(yuǎn)程保存到本地后,按照本地加載模式進(jìn)行加載。但是Pocsuite已經(jīng)有完整的加載器代碼了,我們可以直接拿來用。
新建 lib/loader.py 文件
import hashlib import importlib from importlib.abc import Loader def get_md5(value): if isinstance(value, str): value = value.encode(encoding='UTF-8') return hashlib.md5(value).hexdigest() def load_string_to_module(code_string, fullname=None): try: module_name = 'pocs_{0}'.format(get_md5(code_string)) if fullname is None else fullname file_path = 'airpoc://{0}'.format(module_name) poc_loader = PocLoader(module_name, file_path) poc_loader.set_data(code_string) spec = importlib.util.spec_from_file_location(module_name, file_path, loader=poc_loader) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod except ImportError: error_msg = "load module '{0}' failed!".format(fullname) print(error_msg) raise class PocLoader(Loader): def __init__(self, fullname, path): self.fullname = fullname self.path = path self.data = None def set_data(self, data): self.data = data def get_filename(self, fullname): return self.path def get_data(self, filename): if filename.startswith('airpoc://') and self.data: data = self.data else: with open(filename, encoding='utf-8') as f: data = f.read() return data def exec_module(self, module): filename = self.get_filename(self.fullname) poc_code = self.get_data(filename) obj = compile(poc_code, filename, 'exec', dont_inherit=True, optimize=-1) exec(obj, module.__dict__)
具體如何實(shí)現(xiàn)的我們可以不用關(guān)心,我們只需要知道,其中我們可以用 load_string_to_module 來從源碼中加載模塊了。如果你有興趣了解具體的實(shí)現(xiàn),可以參考上面的python官方文檔。
規(guī)則的制定
從文件或者遠(yuǎn)程加載好模塊后,就可以準(zhǔn)備運(yùn)行的相關(guān)事宜了。我們需要對(duì)PoC做一個(gè)規(guī)則的統(tǒng)一約定,讓程序更好的調(diào)用它們。
你可以將規(guī)則定義的詳細(xì),也可以一切從簡(jiǎn),主要是看使用場(chǎng)景。而前面也提到,為了保護(hù)"安全聯(lián)盟"的安全問題,所以我們需要PoC更夠比較簡(jiǎn)單的快速編寫。
同時(shí)我們還需要考慮如果PoC需要多個(gè)參數(shù)如何處理?筆者的規(guī)則是這樣定義的。
def verify(arg, **kwargs): result = {} if requests.get(arg).status_code == 200: result = { "name":"漏洞名稱", "url":arg } return result
在PoC文件中定義一個(gè) verify 函數(shù)用作驗(yàn)證使用,arg作為普通的參數(shù)傳遞,當(dāng)需要傳遞較多的參數(shù)時(shí),從kwargs中接收。在PoC驗(yàn)證成功后,也只需要返回一個(gè)字典即可,如果驗(yàn)證失敗,返回 False 或 None 即可。字典內(nèi)容由PoC編寫者制定,給予編寫者最大的靈活空間。
但是注意!PoC的質(zhì)量就需要依靠編寫者的維護(hù)。
V0.01
我們最終要實(shí)現(xiàn)的目標(biāo)是,設(shè)置好目標(biāo),程序自動(dòng)加載指定的一個(gè)或多個(gè)PoC或全部的PoC,逐個(gè)檢測(cè)目標(biāo)。剩下的部分就是怎樣將這些功能串聯(lián)在一起了。
前面我們已經(jīng)實(shí)現(xiàn)了AirPoc的基礎(chǔ)框架,現(xiàn)在只需要在其基礎(chǔ)上具體實(shí)現(xiàn)功能即可。
為了測(cè)試的方便,我們先在 pocs 目錄下按照之前定義的規(guī)則建立兩個(gè)簡(jiǎn)陋的PoC。
現(xiàn)在, main.py 中的代碼如下
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # @Time : 2019/4/25 3:13 PM # @Author : w7ay # @File : main.py import os import time from lib.data import VERSION, PATHS_POCS, POCS from lib.loader import load_string_to_module def banner(): msg = ''' ___ _ _____ _____ _____ _____ / | | | | _ \ | _ \ / _ \ / ___| / /| | | | | |_| | | |_| | | | | | | | / / | | | | | _ / | ___/ | | | | | | / / | | | | | | \ \ | | | |_| | | |___ /_/ |_| |_| |_| \_\ |_| \_____/ \_____| {} '''.format(VERSION) print(msg) def init(config: dict): print("[*] target:{}".format(config["url"])) # 加載poc,首先遍歷出路徑 _pocs = [] for root, dirs, files in os.walk(PATHS_POCS): files = filter(lambda x: not x.startswith("__") and x.endswith(".py") and x not in config.get("poc", []), files) # 過濾掉__init__.py文件以及指定poc文件 _pocs.extend(map(lambda x: os.path.join(root, x), files)) # 根據(jù)路徑加載PoC for poc in _pocs: with open(poc, 'r') as f: model = load_string_to_module(f.read()) POCS.append(model) def end(): print("[*] shutting down at {0}".format(time.strftime("%X"))) def start(config: dict): url_list = config.get("url", []) # 循環(huán)url_list與pocs,逐一對(duì)應(yīng)執(zhí)行。 for i in url_list: for poc in POCS: try: ret = poc.verify(i) except Exception as e: ret = None print(e) if ret: print(ret) def main(): banner() config = { "url": ["https://www.seebug.org/", "https://paper.seebug.org/"], "poc": [] } init(config) start(config) end() if __name__ == '__main__': main()
我們的版本也來到了0.01,它已經(jīng)是一個(gè)"成熟的”能自己跑PoC的框架了。
多線程模型
為了讓我們的框架運(yùn)行得更快一點(diǎn),我們使用多線程來處理每個(gè)PoC,因?yàn)槲覀兲幚淼娜蝿?wù)大多是I/O密集型任務(wù),所以我們也不用太糾結(jié)python是不是偽線程這個(gè)問題。
多線程模型中最簡(jiǎn)單的一種是生產(chǎn)者/消費(fèi)者的模型,啟動(dòng)多個(gè)線程來共同消費(fèi)一個(gè)隊(duì)列就行了。新建 lib/threads.py
import threading import time def exception_handled_function(thread_function, args=()): try: thread_function(*args) except KeyboardInterrupt: raise except Exception as ex: print("thread {0}: {1}".format(threading.currentThread().getName(), str(ex))) def run_threads(num_threads, thread_function, args: tuple = ()): threads = [] # 啟動(dòng)多個(gè)線程 for num_threads in range(num_threads): thread = threading.Thread(target=exception_handled_function, name=str(num_threads), args=(thread_function, args)) thread.setDaemon(True) try: thread.start() except Exception as ex: err_msg = "error occurred while starting new thread ('{0}')".format(str(ex)) print(err_msg) break threads.append(thread) # 等待所有線程完畢 alive = True while alive: alive = False for thread in threads: if thread.isAlive(): alive = True time.sleep(0.1)
值得注意的一點(diǎn)是,我們并沒有使用Python線程中推薦的 join() 來阻塞線程,因?yàn)槭褂? join() 的話,python將無法響應(yīng)用戶輸入的消息了,會(huì)導(dǎo)致Ctrl+C退出時(shí)沒有任何響應(yīng),所以以while循環(huán)的方式來阻塞線程。
接著將主程序改造成多線程的模式,將原 start() 中的"消費(fèi)者"提取出來,單獨(dú)用作一個(gè)函數(shù),用隊(duì)列接收數(shù)據(jù)即可。如下
def worker(): if not WORKER.empty(): arg, poc = WORKER.get() try: ret = poc.verify(arg) except Exception as e: ret = None print(e) if ret: print(ret) def start(config: dict): url_list = config.get("url", []) # 生產(chǎn) for arg in url_list: for poc in POCS: WORKER.put((arg, poc)) # 消費(fèi) run_threads(10, worker)
另外,線程數(shù)量是我們可配置的,我們將它改成從配置中讀取。
run_threads(config.get("thread_num", 10), worker)
再次運(yùn)行,會(huì)發(fā)現(xiàn)比以前快很多!
統(tǒng)一網(wǎng)絡(luò)請(qǐng)求
這是我們整個(gè)框架的最后一個(gè)部分,如何來統(tǒng)一網(wǎng)絡(luò)請(qǐng)求。有時(shí)我們需要讓我們的PoC框架發(fā)出的網(wǎng)絡(luò)請(qǐng)求中統(tǒng)一一下代理,UA頭等等的設(shè)置,這需要我們框架進(jìn)行統(tǒng)一的處理。在實(shí)現(xiàn)我們的目的之前,我們還需要在框架里做一個(gè)約定,約定我們的網(wǎng)絡(luò)請(qǐng)求都需要統(tǒng)一使用 requests 來進(jìn)行發(fā)包。開始時(shí)我們說到,我們會(huì)盡量不使用第三方模塊,但是 requests 模塊實(shí)在太好用了,我們將它排除在外...
Python語言動(dòng)態(tài)的機(jī)制,我們可以很容易在使用一個(gè)函數(shù)之前Hook它,將它原始的方法重定向到我們自定義的方法中,這是我們能夠統(tǒng)一網(wǎng)絡(luò)請(qǐng)求的一個(gè)前提。
def hello(arg): return "hello " + arg def hook(arg): arg = arg.upper() return "hello " + arg hello = hook print(hello("aa"))
通過hook一個(gè)函數(shù)來達(dá)到我們自己的目的。
像sqlmap這類工具,基于python內(nèi)置的 urllib 模塊,但是有大量的代碼都在處理在了網(wǎng)絡(luò)請(qǐng)求方面,甚至為了處理 chunked 發(fā)包的問題,hook重寫了更底層的 httplib 庫。
pocsuite為了統(tǒng)一調(diào)度網(wǎng)絡(luò)請(qǐng)求,hook了 requests 模塊的相關(guān)方法。我們可以具體參考其中的代碼。
pocsuite3/lib/request/patch/__init__.py 代碼很清晰的說明了hook的函數(shù)
from .remove_ssl_verify import remove_ssl_verify from .remove_warnings import disable_warnings from .hook_request import patch_session from .add_httpraw import patch_addraw from .hook_request_redirect import patch_redirect def patch_all(): disable_warnings() # 禁用了warning提示 remove_ssl_verify() # 禁用ssl驗(yàn)證 patch_session() # hook seesion函數(shù) patch_addraw() # 添加raw原生發(fā)包支持 patch_redirect() # hook 重定向函數(shù)
如果你看過requests的源碼,會(huì)知道這里面的重點(diǎn)是看它如何hook seesion函數(shù)的。
pocsuite3/lib/request/patch/hook_request.py
from pocsuite3.lib.core.data import conf from requests.models import Request from requests.sessions import Session from requests.sessions import merge_setting, merge_cookies from requests.cookies import RequestsCookieJar from requests.utils import get_encodings_from_content def session_request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=conf.timeout if 'timeout' in conf else None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None): # Create the Request merged_cookies = merge_cookies(merge_cookies(RequestsCookieJar(), self.cookies), cookies or (conf.cookie if 'cookie' in conf else None)) req = Request( method=method.upper(), url=url, headers=merge_setting(headers, conf.http_headers if 'http_headers' in conf else {}), files=files, data=data or {}, json=json, params=params or {}, auth=auth, cookies=merged_cookies, hooks=hooks, ) prep = self.prepare_request(req) proxies = proxies or (conf.proxies if 'proxies' in conf else {}) settings = self.merge_environment_settings( prep.url, proxies, stream, verify, cert ) # Send the request. send_kwargs = { 'timeout': timeout, 'allow_redirects': allow_redirects, } send_kwargs.update(settings) resp = self.send(prep, **send_kwargs) if resp.encoding == 'ISO-8859-1': encodings = get_encodings_from_content(resp.text) if encodings: encoding = encodings[0] else: encoding = resp.apparent_encoding resp.encoding = encoding return resp def patch_session(): Session.request = session_request
它重寫了 session_request 函數(shù)的方法,讓其中可以自定義我們自定義的文件頭等信息。上述代碼可能需要你看過requests才會(huì)對(duì)他有所理解,不過沒關(guān)系,我們還是以拿來主義的精神直接用即可。
為了達(dá)到此目的以及更好的優(yōu)化框架結(jié)構(gòu),我們還需要做一些小調(diào)整。
新建 lib/requests.py
from lib.data import CONF from requests.models import Request from requests.sessions import Session from requests.sessions import merge_setting, merge_cookies from requests.cookies import RequestsCookieJar from requests.utils import get_encodings_from_content def session_request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None): # Create the Request. conf = CONF.get("requests", {}) if timeout is None and "timeout" in conf: timeout = conf["timeout"] merged_cookies = merge_cookies(merge_cookies(RequestsCookieJar(), self.cookies), cookies or (conf.cookie if 'cookie' in conf else None)) req = Request( method=method.upper(), url=url, headers=merge_setting(headers, conf["headers"] if 'headers' in conf else {}), files=files, data=data or {}, json=json, params=params or {}, auth=auth, cookies=merged_cookies, hooks=hooks, ) prep = self.prepare_request(req) proxies = proxies or (conf["proxies"] if 'proxies' in conf else {}) settings = self.merge_environment_settings( prep.url, proxies, stream, verify, cert ) # Send the request. send_kwargs = { 'timeout': timeout, 'allow_redirects': allow_redirects, } send_kwargs.update(settings) resp = self.send(prep, **send_kwargs) if resp.encoding == 'ISO-8859-1': encodings = get_encodings_from_content(resp.text) if encodings: encoding = encodings[0] else: encoding = resp.apparent_encoding resp.encoding = encoding return resp def patch_session(): Session.request = session_request
同時(shí)在config中預(yù)留requests的接口
以及init的時(shí)候執(zhí)行我們的hook。
我們新編寫一個(gè)PoC,用這個(gè)網(wǎng)站測(cè)試一下 最后的效果 http://www.httpbin.org/get
pocs/poc.py
import requests def verify(arg, **kwargs): r = requests.get(arg) if r.status_code == 200: return {"url": arg, "text": r.text}
效果很好,但是如果加上https的網(wǎng)站,就有一個(gè)警告信息。
同樣參考Pocsuite的方法禁用掉warning信息
from urllib3 import disable_warnings disable_warnings()
最后有儀式感的將版本號(hào)變更為 0.1 ,AirPoc的框架部分大體完成了。
最 后
AirPoc的很多結(jié)構(gòu)思想都來源于Pocsuite,如果直接閱讀Pocsuite,也許能收獲很多東西。目前AirPoc v0.1基礎(chǔ)框架已經(jīng)差不多完成了,已經(jīng)可以從本地加載一個(gè)或多個(gè)PoC,進(jìn)行批量測(cè)試。后面我們?cè)賴L試些更好玩的,如何驗(yàn)證無回顯的情況,如何生成shellcode,以及如何操作回連的shell,敬請(qǐng)期待下節(jié)《功能篇》~。
AirPoc下載: https://images.seebug.org/archive/airpoc.zip