下文內(nèi)容主要給大家?guī)韕ython3多進(jìn)程和協(xié)程處理MySQL數(shù)據(jù)講義,這里所講到的知識,與書籍略有不同,都是創(chuàng)新互聯(lián)專業(yè)技術(shù)人員在與用戶接觸過程中,總結(jié)出來的,具有一定的經(jīng)驗分享價值,希望給廣大讀者帶來幫助。
為茶陵等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及茶陵網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè)、茶陵網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!
python3的多進(jìn)程 + 協(xié)程處理MySQL的數(shù)據(jù),主要邏輯是拉取MySQL的數(shù)據(jù),然后使用flashtext匹配關(guān)鍵字,在存回MySQL,代碼如下(async_mysql.py
):
import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor as Pool
import aiomysql
from flashtext import KeywordProcessor
import click
class AttrDict(dict):
"""可以用"."獲取屬性,沒有該屬性時返回None的字典"""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
return None
def __setattr__(self, name, value):
self[name] = value
class AttrDictCursor(aiomysql.DictCursor):
"""繼承aiomysql的字典cursor"""
dict_type = AttrDict
class MultiProcessMysql(object):
"""用多進(jìn)程和協(xié)程處理MySQL數(shù)據(jù)"""
def __init__(self, workers=2, pool=10, start=0, end=2000):
"""第一段的參數(shù)需要跟隨需求變動"""
self.host = "192.168.0.34"
self.port = 3306
self.user = "root"
self.password = "root"
self.db = "mydb"
self.origin_table = "judgment_main_etl" # main
self.dest_table = "laws_finance1"
self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;"
self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)"
self.pool = pool # 協(xié)程數(shù)和MySQL連接數(shù)
self.aionum = self.pool
self.step = 2000 # 一次性從MySQL拉取的行數(shù)
self.workers = workers # 進(jìn)程數(shù)
self.start = start # MySQL開始的行數(shù)
self.end = end # MySQL結(jié)束的行數(shù)
self.keyword = ['非法經(jīng)營支付業(yè)務(wù)', '網(wǎng)絡(luò)洗錢', '資金池', '支付牌照', '清潔算', '網(wǎng)絡(luò)支付', '網(wǎng)上支付', '移動支付', '聚合支付', '保本保息', '擔(dān)保交易', '供應(yīng)鏈金融', '網(wǎng)貸', '網(wǎng)絡(luò)借貸', '網(wǎng)絡(luò)投資', '虛假標(biāo)的', '自融', '資金池', '關(guān)聯(lián)交易', '龐氏騙局', '網(wǎng)絡(luò)金融理財', '線上投資理財', '互聯(lián)網(wǎng)私募', '互聯(lián)網(wǎng)股權(quán)', '非法集資', '合同欺詐', '眾籌投資', '股權(quán)轉(zhuǎn)讓', '互聯(lián)網(wǎng)債權(quán)轉(zhuǎn)讓', '資本自融', '投資騙局', '洗錢', '非法集資', '網(wǎng)絡(luò)傳銷', '虛擬幣泡沫', '網(wǎng)絡(luò)互助金融', '金融欺詐', '網(wǎng)上銀行', '信用卡盜刷', '網(wǎng)絡(luò)釣魚', '信用卡信息竊取', '網(wǎng)上洗錢', '洗錢詐騙', '數(shù)字簽名更改', '支付命令竊取', '金融詐騙', '引誘投資', '隱瞞項目信息', '風(fēng)險披露', '夸大收益', '詐騙保險金', '非法經(jīng)營保險業(yè)務(wù)', '侵占客戶資金', '征信報告竊取', '金融詐騙', '破壞金融管理']
self.kp = KeywordProcessor() # flashtext是一個文本匹配包,在關(guān)鍵詞數(shù)量大時速度遠(yuǎn)大于re
self.kp.add_keywords_from_list(self.keyword)
async def createMysqlPool(self, loop):
"""每個進(jìn)程要有獨立的pool,所以不綁定self"""
pool = await aiomysql.create_pool(
loop=loop, host=self.host, port=self.port, user=self.user,
password=self.password, db=self.db, maxsize=self.pool,
charset='utf8', cursorclass=AttrDictCursor
)
return pool
def cutRange(self, start, end, times):
"""將數(shù)據(jù)區(qū)間分段"""
partition = (end - start) // times
ranges = []
tmp_end = start
while tmp_end < end:
tmp_end += partition
# 剩下的不足以再分
if (end - tmp_end) < partition:
tmp_end = end
ranges.append((start, tmp_end))
start = tmp_end
return ranges
async def findKeyword(self, db, start, end):
"""從MySQL數(shù)據(jù)中匹配出關(guān)鍵字"""
# 隨機休息一定時間,防止數(shù)據(jù)同時到達(dá),同時處理, 應(yīng)該是一部分等待,一部分處理
await asyncio.sleep(random.random() * self.workers * 2)
print("coroutine start")
async with db.acquire() as conn:
async with conn.cursor() as cur:
while start < end:
tmp_end = start + self.step
if tmp_end > end:
tmp_end = end
print("aio start: %s, end: %s" % (start, tmp_end))
# <=id 和 id<
await cur.execute(self.s_sql, (start, tmp_end))
datas = await cur.fetchall()
uuids = []
for data in datas:
if data:
for key in list(data.keys()):
if not data[key]:
data.pop(key)
keyword = self.kp.extract_keywords(
" ".join(data.values()))
if keyword:
keyword = ' '.join(set(keyword)) # 對關(guān)鍵字去重
# print(keyword)
uuids.append(
(data.uuid, data.title, data.reason, keyword))
await cur.executemany(self.i_sql, uuids)
await conn.commit()
start = tmp_end
def singleProcess(self, start, end):
"""單個進(jìn)程的任務(wù)"""
loop = asyncio.get_event_loop()
# 為每個進(jìn)程創(chuàng)建一個pool
db = loop.run_until_complete(asyncio.ensure_future(
self.createMysqlPool(loop)))
tasks = []
ranges = self.cutRange(start, end, self.aionum)
print(ranges)
for start, end in ranges:
tasks.append(self.findKeyword(db, start, end))
loop.run_until_complete(asyncio.gather(*tasks))
def run(self):
"""多進(jìn)程跑"""
tasks = []
ranges = self.cutRange(self.start, self.end, self.workers)
start_time = time.time()
with Pool(max_workers=self.workers) as executor:
for start, end in ranges:
print("processor start: %s, end: %s" % (start, end))
tasks.append(executor.submit(self.singleProcess, start, end))
for task in tasks:
task.result()
print("total time: %s" % (time.time() - start_time))
@click.command(help="運行")
@click.option("-w", "--workers", default=2, help="進(jìn)程數(shù)")
@click.option('-p', "--pool", default=10, help="協(xié)程數(shù)")
@click.option('-s', '--start', default=0, help='MySQL開始的id')
@click.option('-e', "--end", default=2640000, help="MySQL結(jié)束的id")
def main(workers, pool, start, end):
mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end)
if workers * pool > 100:
if not click.confirm('MySQL連接數(shù)超過100(%s),確認(rèn)嗎?' % (workers * pool)):
return
mp.run()
if __name__ == "__main__":
main()
運行如下:$ python3 async_mysql.py -w 2 # 可以指定其他參數(shù),也可使用默認(rèn)值
對于以上關(guān)于python3多進(jìn)程和協(xié)程處理MySQL數(shù)據(jù)講義,如果大家還有更多需要了解的可以持續(xù)關(guān)注我們創(chuàng)新互聯(lián)的行業(yè)推新,如需獲取專業(yè)解答,可在官網(wǎng)聯(lián)系售前售后的,希望該文章可給大家?guī)硪欢ǖ闹R更新。