本篇內(nèi)容介紹了“怎么在Python中異步操作數(shù)據(jù)庫”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
公司主營業(yè)務(wù):網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出河源免費(fèi)做網(wǎng)站回饋大家。
異步操作 MySQL 的話,需要使用一個 aiomysql,直接 pip install aiomysql 即可。
aiomysql 底層依賴于 pymysql,所以 aiomysql 并沒有單獨(dú)實(shí)現(xiàn)相應(yīng)的連接驅(qū)動,而是在 pymysql 之上進(jìn)行了封裝。
下面先來看看如何查詢記錄。
import asyncio import aiomysql.sa as aio_sa async def main(): # 創(chuàng)建一個異步引擎 engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) # 通過 engine.acquire() 獲取一個連接 async with engine.acquire() as conn: # 異步執(zhí)行, 返回一個對象 result = await conn.execute("SELECT * FROM girl") # 通過 await result.fetchone() 可以獲取滿足條件的第一條記錄, 一個對象 data = await result.fetchone() # 可以將對象想象成一個字典 print(data.keys())# KeysView((1, '古明地覺', 16, '地靈殿')) print(list(data.keys()))# ['id', 'name', 'age', 'place'] print(data.values())# ValuesView((1, '古明地覺', 16, '地靈殿')) print(list(data.values()))# [1, '古明地覺', 16, '地靈殿'] print(data.items())# ItemsView((1, '古明地覺', 16, '地靈殿')) print(list(data.items()))# [('id', 1), ('name', '古明地覺'), ('age', 16), ('place', '地靈殿')] # 直接轉(zhuǎn)成字典也是可以的 print(dict(data))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} # 最后別忘記關(guān)閉引擎, 當(dāng)然你在創(chuàng)建引擎的時(shí)候也可以通過 async with aio_sa.create_engine 的方式創(chuàng)建 # async with 語句結(jié)束后會自動執(zhí)行下面兩行代碼 engine.close() await engine.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
怎么樣,是不是很簡單呢,和同步庫的操作方式其實(shí)是類似的。但是很明顯,我們在獲取記錄的時(shí)候不會只獲取一條,而是會獲取多條,獲取多條的話使用 await result.fetchall() 即可。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa async def main(): # 通過異步上下文管理器的方式創(chuàng)建, 會自動幫我們關(guān)閉引擎 async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: result = await conn.execute("SELECT * FROM girl") # 此時(shí)的 data 是一個列表, 列表里面是對象 data = await result.fetchall() # 將里面的元素轉(zhuǎn)成字典 pprint(list(map(dict, data))) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
除了 fetchone、fetchall 之外,還有一個 fetchmany,可以獲取指定記錄的條數(shù)。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa async def main(): # 通過異步上下文管理器的方式創(chuàng)建, 會自動幫我們關(guān)閉引擎 async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: result = await conn.execute("SELECT * FROM girl") # 默認(rèn)是獲取一條, 得到的仍然是一個列表 data = await result.fetchmany(2) pprint(list(map(dict, data))) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
以上就是通過 aiomysql 查詢數(shù)據(jù)庫中的記錄,沒什么難度。但是值得一提的是,await conn.execute 里面除了可以傳遞一個原生的 SQL 語句之外,我們還可以借助 SQLAlchemy。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl")) result = await conn.execute(sql) data = await result.fetchall() pprint(list(map(dict, data))) """ [{'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
然后是添加記錄,我們同樣可以借助 SQLAlchemy 幫助我們拼接 SQL 語句。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: # 我們還需要創(chuàng)建一個 SQLAlchemy 中的引擎, 然后將表反射出來 s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True insert_sql = tbl.insert().values( [{"name": "十六夜咲夜", "age": 17, "place": "紅魔館"}, {"name": "琪露諾", "age": 60, "place": "霧之湖"}]) # 注意: 執(zhí)行的執(zhí)行必須開啟一個事務(wù), 否則數(shù)據(jù)是不會進(jìn)入到數(shù)據(jù)庫中的 async with conn.begin(): # 同樣會返回一個對象 # 盡管我們插入了多條, 但只會返回最后一條的插入信息 result = await conn.execute(insert_sql) # 返回最后一條記錄的自增 id print(result.lastrowid) # 影響的行數(shù) print(result.rowcount) # 重新查詢, 看看記錄是否進(jìn)入到數(shù)據(jù)庫中 async with engine.acquire() as conn: data = await (await conn.execute("select * from girl")).fetchall() data = list(map(dict, data)) pprint(data) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}, {'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '紅魔館'}, {'age': 60, 'id': 17, 'name': '琪露諾', 'place': '霧之湖'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
還是很方便的,但是插入多條記錄的話只會返回插入的最后一條記錄的信息,所以如果你希望獲取每一條的信息,那么就一條一條插入。
修改記錄和添加記錄是類似的,我們來看一下。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine, text async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True) update_sql = tbl.update().where(text("name = '古明地覺'")).values({"place": "東方地靈殿"}) # 同樣需要開啟一個事務(wù) async with conn.begin(): result = await conn.execute(update_sql) print(result.lastrowid)# 0 print(result.rowcount) # 1 # 查詢結(jié)果 async with engine.acquire() as conn: data = await (await conn.execute("select * from girl where name = '古明地覺'")).fetchall() data = list(map(dict, data)) pprint(data) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '東方地靈殿'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
可以看到,記錄被成功的修改了。
刪除記錄就更簡單了,直接看代碼。
import asyncio import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine, text async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True) update_sql = tbl.delete()# 全部刪除 # 同樣需要開啟一個事務(wù) async with conn.begin(): result = await conn.execute(update_sql) # 返回最后一條記錄的自增 id, 我們之前修改了 id = 0 記錄, 所以它跑到最后了 print(result.lastrowid)# 0 # 受影響的行數(shù) print(result.rowcount) # 6 loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
此時(shí)數(shù)據(jù)庫中的記錄已經(jīng)全部被刪除了。
整體來看還是比較簡單的,并且支持的功能也比較全面。
異步操作 PostgreSQL 的話,我們有兩個選擇,一個是 asyncpg 庫,另一個是 aiopg 庫。
asyncpg 是自己實(shí)現(xiàn)了一套連接驅(qū)動,而 aiopg 則是對 psycopg2 進(jìn)行了封裝,個人更推薦 asyncpg,性能和活躍度都比 aiopg 要好。
下面來看看如何使用 asyncpg,首先是安裝,直接 pip install asyncpg 即可。
首先是查詢記錄。
import asyncio from pprint import pprint import asyncpg async def main(): # 創(chuàng)建連接數(shù)據(jù)庫的驅(qū)動 conn = await asyncpg.connect(host="localhost", port=5432, user="postgres", password="zgghyys123", database="postgres", timeout=10) # 除了上面的方式,還可以使用類似于 SQLAlchemy 的方式創(chuàng)建 # await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # 調(diào)用 await conn.fetchrow 執(zhí)行 select 語句,獲取滿足條件的單條記錄 # 調(diào)用 await conn.fetch 執(zhí)行 select 語句,獲取滿足條件的全部記錄 row1 = await conn.fetchrow("select * from girl") row2 = await conn.fetch("select * from girl") # 返回的是一個 Record 對象,這個 Record 對象等于將返回的記錄進(jìn)行了一個封裝 # 至于怎么用后面會說 print(row1)#pprint(row2) """ [,,] """ # 關(guān)閉連接 await conn.close() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
以上我們演示了如何使用 asyncpg 來獲取數(shù)據(jù)庫中的記錄,我們看到執(zhí)行 select 語句的話,我們可以使用 conn.fetchrow(query) 來獲取滿足條件的單條記錄,conn.fetch(query) 來獲取滿足條件的所有記錄。
Record 對象
我們說使用 conn.fetchone 查詢得到的是一個 Record 對象,使用 conn.fetch 查詢得到的是多個 Record 對象組成的列表,那么這個 Rcord 對象怎么用呢?
import asyncio import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") row = await conn.fetchrow("select * from girl") print(type(row))#print(row)## 這個 Record 對象可以想象成一個字典 # 我們可以將返回的字段名作為 key, 通過字典的方式進(jìn)行獲取 print(row["id"], row["name"])# 1 古明地覺 # 除此之外,還可以通過 get 獲取,獲取不到的時(shí)候會返回默認(rèn)值 print(row.get("id"), row.get("name"))# 1 古明地覺 print(row.get("xxx"), row.get("xxx", "不存在的字段"))# None 不存在的字段 # 除此之外還可以調(diào)用 keys、values、items,這個不用我說,都應(yīng)該知道意味著什么 # 只不過返回的是一個迭代器 print(row.keys())#print(row.values())#print(row.items())## 我們需要轉(zhuǎn)成列表或者元組 print(list(row.keys()))# ['id', 'name', 'age', 'place'] print(list(row.values()))# [1, '古明地覺', 16, '地靈殿'] print(dict(row.items()))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} print(dict(row))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} # 關(guān)閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
當(dāng)然我們也可以借助 SQLAlchemy 幫我們拼接 SQL 語句。
import asyncio from pprint import pprint import asyncpg from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl")) # 我們不能直接傳遞一個 Select 對象, 而是需要將其轉(zhuǎn)成原生的字符串才可以 rows = await conn.fetch(str(sql)) pprint(list(map(dict, rows))) """ [{'id': 2, 'name': '椎名真白', 'place': '櫻花莊'}, {'id': 3, 'name': '古明地戀', 'place': '地靈殿'}] """ # 關(guān)閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
此外,conn.fetch 里面還支持占位符,使用百分號加數(shù)字的方式,舉個例子:
import asyncio from pprint import pprint import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") rows = await conn.fetch("select * from girl where id != $1", 1) pprint(list(map(dict, rows))) """ [{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '櫻花莊'}, {'age': 15, 'id': 3, 'name': '古明地戀', 'place': '地靈殿'}] """ # 關(guān)閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
還是推薦使用 SQLAlchemy 的方式,這樣更加方便一些,就像 aiomysql 一樣。但是對于 asyncpg 而言,實(shí)際上接收的是一個原生的 SQL 語句,是一個字符串,因此它不能像 aiomysql 一樣自動識別 Select 對象,我們還需要手動將其轉(zhuǎn)成字符串。而且這樣還存在一個問題,至于是什么我們下面介紹添加記錄的時(shí)候說。
然后是添加記錄,我們看看如何往庫里面添加數(shù)據(jù)。
import asyncio from pprint import pprint import asyncpg from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # 執(zhí)行 insert 語句我們可以使用 execute row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)", '十六夜咲夜', 17, '紅魔館') pprint(row)# INSERT 0 1 pprint(type(row))#await conn.close() if __name__ == '__main__': asyncio.run(main())
通過 execute 可以插入單條記錄,同時(shí)返回相關(guān)信息,但是說實(shí)話這個信息沒什么太大用。除了 execute 之外,還有 executemany,用來執(zhí)行多條插入語句。
import asyncio import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # executemany:第一條參數(shù)是一個模板,第二條命令是包含多個元組的列表 # 執(zhí)行多條記錄的話,返回的結(jié)果為 None rows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)", [('十六夜咲夜', 17, '紅魔館'), ('琪露諾', 60, '霧之湖')]) print(rows)# None # 關(guān)閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
注意:如果是執(zhí)行大量 insert 語句的話,那么 executemany 要比 execute 快很多,但是 executemany 不具備事務(wù)功。
“怎么在Python中異步操作數(shù)據(jù)庫”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!