生產(chǎn)中會生成大量的系統(tǒng)日志、應(yīng)用程序日志、安全日志等等,通過對日志的分析,可了解服務(wù)器的負(fù)載、健康狀態(tài),可分析客戶的分布情況、客戶的行為,甚至基于這些分析可做出預(yù)測;
成都創(chuàng)新互聯(lián)是網(wǎng)站建設(shè)專家,致力于互聯(lián)網(wǎng)品牌建設(shè)與網(wǎng)絡(luò)營銷,專業(yè)領(lǐng)域包括成都網(wǎng)站建設(shè)、做網(wǎng)站、電商網(wǎng)站制作開發(fā)、成都小程序開發(fā)、微信營銷、系統(tǒng)平臺開發(fā),與其他網(wǎng)站設(shè)計及系統(tǒng)開發(fā)公司不同,我們的整合解決方案結(jié)合了恒基網(wǎng)絡(luò)品牌建設(shè)經(jīng)驗(yàn)和互聯(lián)網(wǎng)整合營銷的理念,并將策略和執(zhí)行緊密結(jié)合,且不斷評估并優(yōu)化我們的方案,為客戶提供全方位的互聯(lián)網(wǎng)品牌整合方案!一般采集流程:
日志產(chǎn)出-->采集-->存儲-->分析-->存儲-->可視化;
采集(logstash、flume(apache)、scribe(facebook));
開源實(shí)時日志分析,ELK平臺:
logstash收集日志,存放到ES集群中,kibana從ES中查詢數(shù)據(jù)生成圖表,返回browser;
離線分析;
在線分析,一份生成日志,一份傳給大數(shù)據(jù)實(shí)時處理服務(wù);
實(shí)時處理技術(shù):storm、spark;
分析的前提:
半結(jié)構(gòu)化數(shù)據(jù):日志是半結(jié)構(gòu)化數(shù)據(jù),是有組織的,有格式的數(shù)據(jù),可分割成行和列,可當(dāng)作表來處理,也可分析里面的數(shù)據(jù);
文本分析:日志是文本文件,需要依賴文件io、字符串操作、正則等技術(shù),通過這些技術(shù)能把日志中需要的數(shù)據(jù)提取出來;
例:
123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
提取數(shù)據(jù):
1、用空格分割;
方1:
方2:先空格分割,遇""[]特殊處理;
2、用正則提取;
1、
import datetime
logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800]
"GET / HTTP/1.1" 200 8642 "-"
"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
names = ('remote','','','datetime','request','status','length','','useragent')
ops = (None,None,None,lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
lambda request: dict(zip(['method','url','protocol'],request.split())),int,int,None,None)
def extract(line):
fields = []
flag = False
tmp = ''
for field in line.split():
#???? print(field)
if not flag and (field.startswith('[') or field.startswith('"')):
if field.endswith(']') or field.endswith('"'):
fields.append(field.strip())
else:
tmp += field[1:]
#???????????? print(tmp)
flag = True
continue
if flag:
if field.endswith(']') or field.endswith('"'):
tmp += ' ' + field[:-1]
fields.append(tmp)
flag = False
tmp = ''
else:
?????tmp += ' ' + field
continue
fields.append(field)
print(fields)
info = {}
for i,field in enumerate(fields):
#???????? print(i,field)
name = names[i]
op = ops[i]
if op:
info[name] = (op(field),op)
return info
print(extract(logs))
輸出:
['123.125.71.36', '-', '-', '06/Apr/2017:18:09:25 +0800', 'GET / HTTP/1.1', '200', '8642', '"-"', 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)']
Out[16]:
{'datetime': (datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))),
'length': (8642, int),
'request': ({'method': 'GET', 'protocol': 'HTTP/1.1', 'url': '/'},
'status': (200, int)}
2、
((?:\d{1,3}\.){3}\d{1,3}) - - \[([/:+ \w]+)\] "(\w+) (\S+) ([/\.\w\d]+)" (\d+) (\d+) .+ "(.+)"
import datetime
import re
# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
pattern = '''(?P
regex = re.compile(pattern)
def extract(line)->dict:
matcher = regex.match(line)
info = None
if matcher:
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info
# print(extract(logs))
def load(path:str):?? #裝載日志文件
with open(path) as f:
for line in f:
d = extract(line)
if d:
yield d?? #生成器函數(shù)
else:
continue?? #不合格數(shù)據(jù),pycharm中左下角TODO(view-->Status Bar)
g = load('access.log')
print(next(g))
print(next(g))
print(next(g))
# for i in g:
#???? print(i)
輸出:
{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}
{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}
注:
代碼若在jupyter下,注意logs中內(nèi)容不能換行;
滑動窗口:
或叫時間窗口,時間窗口函數(shù),在數(shù)據(jù)分析領(lǐng)域極其重要;
很多數(shù)據(jù),如日志,都是和時間相關(guān)的,都是按時間順序產(chǎn)生的,在數(shù)據(jù)分析時,要按照時間來求值;
interval,表示每一次求值的時間間隔;
width,時間窗口寬度,指一次求值的時間窗口寬度,每個時間窗口的數(shù)據(jù)不均勻;
當(dāng)width > interval
有重疊;
當(dāng)width = interval
數(shù)據(jù)求值沒有重疊;
當(dāng)width < interval
一般不采納這種方案,會有數(shù)據(jù)缺失;
如業(yè)務(wù)數(shù)據(jù)有1000萬條,要求每次漏幾個,這不影響統(tǒng)計趨勢;
c2 = c1 - delta
delta = width - interval
delta = 0時,width = interval
時序數(shù)據(jù),運(yùn)維環(huán)境中,日志、監(jiān)控等產(chǎn)生的數(shù)據(jù)是按時間先后產(chǎn)生并記錄下來的,與時間相關(guān)的數(shù)據(jù),一般按時間對數(shù)據(jù)進(jìn)行分析;
數(shù)據(jù)分析基本程序結(jié)構(gòu):
例:
一函數(shù),無限的生成隨機(jī)數(shù)函數(shù),產(chǎn)生時間相關(guān)的數(shù)據(jù),返回->時間+隨機(jī)數(shù);
每次取3個數(shù)據(jù),求平均值;
import random
import datetime
# def source():
#???? while True:
#???????? yield datetime.datetime.now(),random.randint(1,100)
# i = 0
# for x in source():
#???? print(x)
#???? i += 1
#???? if i > 100:
# ????????break
# for _ in range(100):
#???? print(next(source()))
def source():
while True:
yield {'value': random.randint(1,100),'datetime':datetime.datetime.now()}
src = source()
# lst = []
# lst.append(next(src))
# lst.append(next(src))
# lst.append(next(src))
lst = [next(src) for _ in range(3)]
def handler(iterable):
values = [x['value'] for x in iterable]
return sum(values) // len(values)
print(lst)
print(handler(lst))
窗口函數(shù):
import datetime
import re
# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
pattern = '''(?P
regex = re.compile(pattern)
def extract(line)->dict:
matcher = regex.match(line)
info = None
if matcher:
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info
# print(extract(logs))
def load(path:str):
with open(path) as f:
for line in f:
d = extract(line)
if d:
yield d
else:
continue
# g = load('access.log')
# print(next(g))
# print(next(g))
# print(next(g))
# for i in g:
#???? print(i)
def window(src,handler,width:int,interval:int):
# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')
seconds = width - interval
delta = datetime.timedelta(seconds)
buffer = []
for x in src:
if x:
buffer.append(x)
current = x['datetime']
if (current-start).total_seconds() >= interval:
ret = handler(buffer)
# print(ret)
start = current
# tmp = []
# for i in buffer:
#???? if i['datetime'] > current - delta:
#???? ????tmp.append(i)
buffer = [i for i in buffer if i['datetime'] > current - delta]
def donothing_handler(iterable:list):
print(iterable)
return iterable
def handler(iterable:list):
pass?? #TODO
def size_handler(iterable:list):
pass?? #TODO
# window(load('access.log'),donothing_handler,8,5)
# window(load('access.log'),donothing_handler,10,5)
window(load('access.log'),donothing_handler,5,5)
輸出:
[{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}]
[{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}]
[{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}]
分發(fā):
生產(chǎn)者消費(fèi)者模型:
對于一個監(jiān)控系統(tǒng),需要處理很多數(shù)據(jù),包括日志;
要有數(shù)據(jù)的采集、分析;
被監(jiān)控對象,即數(shù)據(jù)的producer生產(chǎn)者,數(shù)據(jù)的處理程序,即數(shù)據(jù)的consumer消費(fèi)者;
傳統(tǒng)的生產(chǎn)者消費(fèi)者模型,生產(chǎn)者生產(chǎn),消費(fèi)者消費(fèi),這種模型有些問題,開發(fā)的代碼耦合太高,如果生產(chǎn)規(guī)模擴(kuò)大,不易擴(kuò)展,生產(chǎn)和消費(fèi)的速度難匹配;
queue隊(duì)列,食堂打飯;
producer-consumer,賣包子;消費(fèi)速度 >= 生產(chǎn)速度;解決辦法:queue,作用:解耦(在程序間實(shí)現(xiàn)解耦(服務(wù)間解耦))、緩沖;
注:
zeromq,底層通信協(xié)議用;
大多數(shù)*mq,都是消費(fèi)隊(duì)列;
kafka,性能極高;
FIFO,先進(jìn)先出;
LIFO,后進(jìn)先出;
數(shù)據(jù)的生產(chǎn)是不穩(wěn)定的,會造成短時間數(shù)據(jù)的潮涌,需要緩沖;
消費(fèi)者消費(fèi)能力不一樣,有快有慢,消費(fèi)者可以自己決定消費(fèi)緩沖區(qū)中的數(shù)據(jù);
單機(jī)可用queue(內(nèi)建模塊)構(gòu)建進(jìn)程內(nèi)的隊(duì)列,滿足多個線程間的生產(chǎn)消費(fèi)需要;
大型系統(tǒng)可使用第三方消息中間件,rabbitmq、rocketmq、kafka;
queue模塊:
queue.Queue(maxsize=0),queue提供了一個FIFO先進(jìn)先出的隊(duì)列Queue,創(chuàng)建FIFO隊(duì)列,返回Queue對象;maxsize <= 0,隊(duì)列長度沒有限制;
q = queue.Queue()
q.get(block=True,timeout=None),從隊(duì)列中移除元素并返回這個元素,只要get過即拿走就沒了;
block阻塞,timeout超時;
若block=True,是阻塞,timeout=None,就是一直阻塞,timeout有值,即阻塞到一定秒數(shù)拋Empty異常;
若blcok=False,是非阻塞,timeout將被忽略,要么成功返回一個元素,要么拋Empty異常;
q.get_nowait(),等價于q.get(block=False)或q.get(False),即要么成功返回一個元素,要么拋Empty異常;這種阻塞效果,要多線程中舉例;
q.put(item,block=True,timeout=None),把一個元素加入到隊(duì)列中去,
block=True,timeout=None,一直阻塞直至有空位放元素;
block=True,timeout=5,阻塞5秒拋Full異常;
block=False,timeout失效,立即返回,能塞進(jìn)去就塞,不能則拋Full異常;
q.put_nowait(item),等價于q.put(item,False);
注:
Queue的長度是個近似值,不準(zhǔn)確,因?yàn)樯a(chǎn)消費(fèi)一直在進(jìn)行;
q.get(),只要get過,即拿走,數(shù)據(jù)就沒了;而kafka中,拿走數(shù)據(jù)后,kafka中仍保留有,由consumer來清理;
例:
from queue import Queue
import random
q = Queue()
q.put(random.randint(1,100))
q.put(random.randint(1,100))
print(q.get())
print(q.get())
# print(q.get())?? #block
print(q.get(timeout=3))
輸出:
2
35
Traceback (most recent call last):
File "/home/python/magedu/projects/cmdb/queue_Queue.py", line 12, in
print(q.get(timeout=3))
File "/ane/python3.6/lib/python3.6/queue.py", line 172, in get
raise Empty
queue.Empty
分發(fā)器的實(shí)現(xiàn):
生產(chǎn)者(數(shù)據(jù)源)生產(chǎn)數(shù)據(jù),緩沖到消息隊(duì)列中;
數(shù)據(jù)處理流程:數(shù)據(jù)加載-->提取-->分析(滑動窗口函數(shù));
處理大量數(shù)據(jù)時,對于一個數(shù)據(jù)源來說,需要多個消費(fèi)者處理,但如何分配數(shù)據(jù)?
需要一個分發(fā)器(調(diào)度器),把數(shù)據(jù)分發(fā)給不同的消費(fèi)者處理;
每一個消費(fèi)者拿到數(shù)據(jù)后,有自己的處理函數(shù),所以要有一種注冊機(jī)制;
數(shù)據(jù)加載-->提取-->分發(fā)-->分析函數(shù)1|分析函數(shù)2,一個數(shù)據(jù)通過分發(fā)器,發(fā)送給n個消費(fèi)者,分析函數(shù)1|分析函數(shù)2為不同的handler,不同的窗口寬度,間隔時間;
如何分發(fā)?
一對多,副本發(fā)送(一個數(shù)據(jù)通過分發(fā)器,發(fā)送到n個消費(fèi)者),用輪詢;
MQ?
在生產(chǎn)者和消費(fèi)者之間用消息隊(duì)列,那么所有的消費(fèi)者共用一個消息隊(duì)列?(這需要解決爭搶的問題);還是各自擁有一個消息隊(duì)列?(較容易);
注冊?
在調(diào)度器內(nèi)部記錄有哪些消費(fèi)者,記錄消費(fèi)者自己的隊(duì)列;
線程?
由于一條數(shù)據(jù)會被多個不同的注冊過的handler處理,所以最好的方式是多線程;
注:
import threading
t = threading.Thread(target=window,args=(src,handler,width,interval))?? #target,線程中運(yùn)行的函數(shù),args,這個函數(shù)運(yùn)行時需要的實(shí)參用tuple
t.start()
分析功能:
分析日志很重要,通過海量數(shù)據(jù)的分析就能知道是否遭受了***,是否是爬取的高峰期,是否有盜鏈;
分析的邏輯放到handler中;
window僅通過時間窗口挪動取數(shù)據(jù),不要將其的功能做的豐富全面,若需統(tǒng)一處理,獨(dú)立出單獨(dú)的函數(shù);
注:
爬蟲:baiduspider,googlebot,SEO,http,request,response;
狀態(tài)碼分析:
狀態(tài)碼中包含了很多信息;
304,服務(wù)器收到客戶端提交的請求數(shù),發(fā)現(xiàn)資源未變化,要求browser使用靜態(tài)資源的緩存;
404,server找不到請求的資源;
304占比大,說明靜態(tài)緩存效果明顯;
404占比大,說明出現(xiàn)了錯誤鏈接,或深度嗅探網(wǎng)站資源;
若400,500占比突然開始增大,網(wǎng)站一定出問題了;
import datetime
import re
from queue import Queue
import threading
# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
pattern = '''(?P
regex = re.compile(pattern)
def extract(line)->dict:
matcher = regex.match(line)
info = None
if matcher:
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info
# print(extract(logs))
def load(path:str):
with open(path) as f:
for line in f:
d = extract(line)
if d:
yield d
else:
continue
# g = load('access.log')
# print(next(g))
# print(next(g))
# print(next(g))
# for i in g:
#???? print(i)
# def window(src,handler,width:int,interval:int):
#???? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
#???? start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')
#???? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')
#???? seconds = width - interval
#???? delta = datetime.timedelta(seconds)
#???? buffer = []
#
#???? for x in src:
#???????? if x:
#???????????? buffer.append(x)
#??????????? ?current = x['datetime']
#???????? if (current-start).total_seconds() >= interval:
#???????????? ret = handler(buffer)
#???????????? # print(ret)
#???????????? start = current
#???????????? # tmp = []
#???????????? # for i in buffer:
#???????????? #???? if i['datetime'] > current - delta:
#???????????? #???????? tmp.append(i)
#???????????? buffer = [i for i in buffer if i['datetime'] > current - delta]
# window(load('access.log'),donothing_handler,8,5)
# window(load('access.log'),donothing_handler,10,5)
# window(load('access.log'),donothing_handler,5,5)
def window(src:Queue,handler,width:int,interval:int):
# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')
delta = datetime.timedelta(width-interval)
buffer = []
while True:
data = src.get()
if data:
buffer.append(data)
current = data['datetime']
if (current-start).total_seconds() >= interval:
ret = handler(buffer)
# print(ret)
start = current
buffer = [i for i in buffer if i['datetime'] > current - delta]
def donothing_handler(iterable:list):
print(iterable)
return iterable
def handler(iterable:list):
pass?? #TODO
def size_handler(iterable:list):
pass?? #TODO
def status_handler(iterable:list):
d = {}
for item in iterable:
key = item['status']
if key not in d.keys():
d[key] = 0
d[key] += 1
total = sum(d.values())
print({k:v/total*100 for k,v in d.items()})?? #return
def dispatcher(src):
queues = []
threads = []
def reg(handler,width,interval):
q = Queue()
queues.append(q)
t = threading.Thread(target=window,args=(q,handler,width,interval))
threads.append(t)
def run():
for t in threads:
t.start()
for x in src:
for q in queues:
q.put(x)
return reg,run
reg,run = dispatcher(load('access.log'))
reg(status_handler,8,5)
run()
日志文件加載:
改為接受一批;
如果一批路徑,迭代每一個路徑;
如果路徑是一個普通文件,按行讀取內(nèi)容(假設(shè)是日志文件);
如果路徑是一個目錄,就遍歷路徑下的所有普通文件,每一個文件按行處理,不遞歸處理子目錄;
def openfile(path:str):
with open(path) as f:
for line in f:
d = extract(line)
if d:
yield d
else:
continue
def load(*paths):
for file in paths:
p = Path(file)
if not p.exists():
continue
if p.is_dir():
for x in p.iterdir():
if x.is_file():
# for y in openfile(str(x)):
#?? ??yield y
yield from openfile(str(x))
elif p.is_file():
# for y in openfile(str(p)):
#???? yield y
yield from openfile(str(p))
離線日志分析項(xiàng)目:
可指定文件或目錄,對日志進(jìn)行數(shù)據(jù)分析;
分析函數(shù)可動態(tài)注冊;
數(shù)據(jù)可分發(fā)給不同的分析處理程序處理;
關(guān)鍵步驟:
數(shù)據(jù)源處理(處理一行行數(shù)據(jù));
拿到數(shù)據(jù)后的處理(作為分析,一小批一小批處理,窗口函數(shù));
分發(fā)器(生產(chǎn)者和消費(fèi)者間作為橋梁作用);
瀏覽器分析:
useragent,指軟件按一定的格式向遠(yuǎn)端服務(wù)器提供一個標(biāo)記自己的字符串;
在http協(xié)議中,使用user-agent字段傳送一這個字符串,這個值可被修改(想偽裝誰都可以);
格式:([platform details]) [extensions]
例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36"
注:
chrome-->console,navigator.userAgent,將內(nèi)容復(fù)制粘貼到傲游的自定義UserAgent中;
信息提取模塊:
user-agents、pyyaml、ua-parser;
]$ pip install user-agents pyyaml ua-parser
例:
from user_agents import parse
u = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36'
ua = parse(u)
print(ua.browser)
print(ua.browser.family)
print(ua.browser.version_string)
輸出:
Browser(family='Chrome', version=(28, 0, 1500), version_string='28.0.1500')
Chrome
28.0.1500
整合,完整代碼:
import datetime
import re
from queue import Queue
import threading
from pathlib import Path
from user_agents import parse
from collections import defaultdict
# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int,
'request': lambda request: dict(zip(('method','url','protocol'),request.split())),
'useragent': lambda useragent: parse(useragent)
}
# pattern = '''(?P
pattern = '''(?P
regex = re.compile(pattern)
def extract(line)->dict:
matcher = regex.match(line)
info = None
if matcher:
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
# print(info)
return info
# print(extract(logs))
# def load(path:str):
#???? with open(path) as f:
#???????? for line in f:
#???????????? d = extract(line)
#???????????? if d:
#???????????????? yield d
#???????????? else:
#???????????????? continue
def openfile(path:str):
with open(path) as f:
for line in f:
d = extract(line)
if d:
yield d
else:
continue
def load(*paths):
for file in paths:
p = Path(file)
if not p.exists():
continue
if p.is_dir():
for x in p.iterdir():
if x.is_file():
# for y in openfile(str(x)):
#?? ??yield y
yield from openfile(str(x))
elif p.is_file():
# for y in openfile(str(p)):
#???? yield y
yield from openfile(str(p))
# g = load('access.log')
# print(next(g))
# print(next(g))
# print(next(g))
# for i in g:
#???? print(i)
# def window(src,handler,width:int,interval:int):
#???? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
#???? start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')
#???? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')
#???? seconds = width - interval
#???? delta = datetime.timedelta(seconds)
#???? buffer = []
#
#???? for x in src:
#???????? if x:
#???????????? buffer.append(x)
#???????????? current = x['datetime']
#???????? if (current-start).total_seconds() >= interval:
#???????????? ret = handler(buffer)
#???????????? # print(ret)
#???????????? start = current
#???????????? # tmp = []
#???????????? # for i in buffer:
#?? ??????????#???? if i['datetime'] > current - delta:
#???????????? #???????? tmp.append(i)
#???????????? buffer = [i for i in buffer if i['datetime'] > current - delta]
# window(load('access.log'),donothing_handler,8,5)
# window(load('access.log'),donothing_handler,10,5)
# window(load('access.log'),donothing_handler,5,5)
def window(src:Queue,handler,width:int,interval:int):
# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}
start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')
delta = datetime.timedelta(width-interval)
buffer = []
while True:
data = src.get()
if data:
buffer.append(data)
current = data['datetime']
if (current-start).total_seconds() >= interval:
?????ret = handler(buffer)
# print(ret)
start = current
buffer = [i for i in buffer if i['datetime'] > current - delta]
def donothing_handler(iterable:list):
print(iterable)
return iterable
def handler(iterable:list):
pass?? #TODO
def size_handler(iterable:list):
pass?? #TODO
def status_handler(iterable:list):
d = {}
for item in iterable:
key = item['status']
if key not in d.keys():
d[key] = 0
d[key] += 1
total = sum(d.values())
print({k:v/total*100 for k,v in d.items()})?? #return
browsers = defaultdict(lambda :0)
def browser_handler(iterable:list):
# browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family,ua.browser.version_string)
# browsers[key] = browsers.get(key,0) + 1
browsers[key] += 1
return browsers
def dispatcher(src):
queues = []
threads = []
def reg(handler,width,interval):
q = Queue()
queues.append(q)
t = threading.Thread(target=window,args=(q,handler,width,interval))
threads.append(t)
def run():
for t in threads:
t.start()
for x in src:
for q in queues:
q.put(x)
return reg,run
reg,run = dispatcher(load('access.log'))
reg(status_handler,8,5)
reg(browser_handler,5,5)
run()
print(browsers)
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。