這篇文章將為大家詳細(xì)講解有關(guān)怎么使用Python實現(xiàn)SQL注入檢測插件,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
發(fā)展壯大離不開廣大客戶長期以來的信賴與支持,我們將始終秉承“誠信為本、服務(wù)至上”的服務(wù)理念,堅持“二合一”的優(yōu)良服務(wù)模式,真誠服務(wù)每家企業(yè),認(rèn)真做好每個細(xì)節(jié),不斷完善自我,成就企業(yè),實現(xiàn)共贏。行業(yè)涉及墻體彩繪等,在重慶網(wǎng)站建設(shè)公司、全網(wǎng)營銷推廣、WAP手機(jī)網(wǎng)站、VI設(shè)計、軟件開發(fā)等項目上具有豐富的設(shè)計經(jīng)驗。掃描器需要實現(xiàn)的功能思維導(dǎo)圖
爬蟲編寫思路
首先需要開發(fā)一個爬蟲用于收集網(wǎng)站的鏈接,爬蟲需要記錄已經(jīng)爬取的鏈接和待爬取的鏈接,并且去重,用 Python 的set()就可以解決,大概流程是:
輸入 URL
下載解析出 URL
URL 去重,判斷是否為本站
加入到待爬列表
重復(fù)循環(huán)
SQL 判斷思路
通過在 URL 后面加上AND %d=%d或者OR NOT (%d>%d)
%d后面的數(shù)字是隨機(jī)可變的
然后搜索網(wǎng)頁中特殊關(guān)鍵詞,比如:
MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...
通過這些關(guān)鍵詞就可以判斷出所用的數(shù)據(jù)庫
還需要判斷一下 waf 之類的東西,有這種東西就直接停止。簡單的方法就是用特定的 URL 訪問,如果出現(xiàn)了像IP banned,fierwall之類的關(guān)鍵詞,可以判斷出是waf。具體的正則表達(dá)式是(?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
開發(fā)準(zhǔn)備展開目錄
請安裝這些庫
pip install requests pip install beautifulsoup4
實驗環(huán)境是 Linux,創(chuàng)建一個Code目錄,在其中創(chuàng)建一個work文件夾,將其作為工作目錄
目錄結(jié)構(gòu)
/w8ay.py // 項目啟動主文件
/lib/core // 核心文件存放目錄
/lib/core/config.py // 配置文件
/script // 插件存放
/exp // exp和poc存放
步驟
SQL 檢測腳本編寫
DBMS_ERRORS = { 'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."), "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."), "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."), "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"), "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"), "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("), "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"), "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"), }
通過正則表達(dá)式就可以判斷出是哪個數(shù)據(jù)庫了
for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]): if (re.search(regex,_content)): return True
下面是我們測試語句的payload
BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
用報錯語句返回正確的內(nèi)容和錯誤的內(nèi)容進(jìn)行對比
for test_payload in BOOLEAN_TESTS: # Right Page RANDINT = random.randint(1, 255) _url = url + test_payload % (RANDINT, RANDINT) content["true"] = Downloader.get(_url) _url = url + test_payload % (RANDINT, RANDINT + 1) content["false"] = Downloader.get(_url) if content["origin"] == content["true"] != content["false"]: return "sql found: %" % url
這句
content["origin"] == content["true"] != content["false"]
意思就是當(dāng)原始網(wǎng)頁等于正確的網(wǎng)頁不等于錯誤的網(wǎng)頁內(nèi)容時,就可以判定這個地址存在注入漏洞
完整代碼:
import re, random from lib.core import Download def sqlcheck(url): if (not url.find("?")): # Pseudo-static page return false; Downloader = Download.Downloader() BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)") DBMS_ERRORS = { # regular expressions used for DBMS recognition based on error message response "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."), "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."), "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."), "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"), "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"), "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("), "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"), "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"), } _url = url + "%29%28%22%27" _content = Downloader.get(_url) for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]): if (re.search(regex,_content)): return True content = {} content['origin'] = Downloader.get(_url) for test_payload in BOOLEAN_TESTS: # Right Page RANDINT = random.randint(1, 255) _url = url + test_payload % (RANDINT, RANDINT) content["true"] = Downloader.get(_url) _url = url + test_payload % (RANDINT, RANDINT + 1) content["false"] = Downloader.get(_url) if content["origin"] == content["true"] != content["false"]: return "sql found: %" % url
將這個文件命名為sqlcheck.py,放在/script目錄中。代碼的第 4 行作用是查找 URL 是否包含?,如果不包含,比方說偽靜態(tài)頁面,可能不太好注入,因此需要過濾掉
爬蟲的編寫
爬蟲的思路上面講過了,先完成 URL 的管理,我們單獨將它作為一個類,文件保存在/lib/core/UrlManager.py
#-*- coding:utf-8 -*- class UrlManager(object): def __init__(self): self.new_urls = set() self.old_urls = set() def add_new_url(self, url): if url is None: return if url not in self.new_urls and url not in self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def has_new_url(self): return len(self.new_urls) != 0 def get_new_url(self): new_url = self.new_urls.pop() self.old_urls.add(new_url) return new_url
為了方便,我們也將下載功能單獨作為一個類使用,文件保存在lib/core/Downloader.py
#-*- coding:utf-8 -*- import requests class Downloader(object): def get(self, url): r = requests.get(url, timeout = 10) if r.status_code != 200: return None _str = r.text return _str def post(self, url, data): r = requests.post(url, data) _str = r.text return _str def download(self, url, htmls): if url is None: return None _str = {} _str["url"] = url try: r = requests.get(url, timeout = 10) if r.status_code != 200: return None _str["html"] = r.text except Exception as e: return None htmls.append(_str)
特別說明,因為我們要寫的爬蟲是多線程的,所以類中有個download方法是專門為多線程下載專用的
在lib/core/Spider.py中編寫爬蟲
#-*- coding:utf-8 -*- from lib.core import Downloader, UrlManager import threading from urllib import parse from urllib.parse import urljoin from bs4 import BeautifulSoup class SpiderMain(object): def __init__(self, root, threadNum): self.urls = UrlManager.UrlManager() self.download = Downloader.Downloader() self.root = root self.threadNum = threadNum def _judge(self, domain, url): if (url.find(domain) != -1): return True return False def _parse(self, page_url, content): if content is None: return soup = BeautifulSoup(content, 'html.parser') _news = self._get_new_urls(page_url, soup) return _news def _get_new_urls(self, page_url, soup): new_urls = set() links = soup.find_all('a') for link in links: new_url = link.get('href') new_full_url = urljoin(page_url, new_url) if (self._judge(self.root, new_full_url)): new_urls.add(new_full_url) return new_urls def craw(self): self.urls.add_new_url(self.root) while self.urls.has_new_url(): _content = [] th = [] for i in list(range(self.threadNum)): if self.urls.has_new_url() is False: break new_url = self.urls.get_new_url() ## sql check try: if (sqlcheck.sqlcheck(new_url)): print("url:%s sqlcheck is valueable" % new_url) except: pass print("craw:" + new_url) t = threading.Thread(target = self.download.download, args = (new_url, _content)) t.start() th.append(t) for t in th: t.join() for _str in _content: if _str is None: continue new_urls = self._parse(new_url, _str["html"]) self.urls.add_new_urls(new_urls)
爬蟲通過調(diào)用craw()方法傳入一個網(wǎng)址進(jìn)行爬行,然后采用多線程的方法下載待爬行的網(wǎng)站,下載之后的源碼用_parse方法調(diào)用BeautifulSoup進(jìn)行解析,之后將解析出的 URL 列表丟入 URL 管理器,這樣循環(huán),最后只要爬完了網(wǎng)頁,爬蟲就會停止
threading庫可以自定義需要開啟的線程數(shù),線程開啟后,每個線程會得到一個 url 進(jìn)行下載,然后線程會阻塞,阻塞完畢后線程放行
爬蟲和 SQL 檢查的結(jié)合
在lib/core/Spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 URL 地方調(diào)用一下
##sql check try: if(sqlcheck.sqlcheck(new_url)): print("url:%s sqlcheck is valueable"%new_url) except: pass
用try檢測可能出現(xiàn)的異常,繞過它,在文件w8ay.py中進(jìn)行測試
#-*- coding:utf-8 -*- ''' Name: w8ayScan Author: mathor Copyright (c) 2019 ''' import sys from lib.core.Spider import SpiderMain def main(): root = "https://wmathor.com" threadNum = 50 w8 = SpiderMain(root, threadNum) w8.craw() if __name__ == "__main__": main()
很重要的一點!為了使得lib和script文件夾中的.py文件可以可以被認(rèn)作是模塊,請在lib、lib/core和script文件夾中創(chuàng)建__init__.py文件,文件中什么都不需要寫
關(guān)于“怎么使用Python實現(xiàn)SQL注入檢測插件”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。