数据库MySQL/Postgres/Redis异步操作 数据库异步操作

基于 aiomysql 异步操作mysql数据库

    异步操作 MySQL 的话,需要使用一个 aiomysql,直接 pip install aiomysql

  • 入门案例
# -*- coding: utf-8 -*-

# 导入异步操作的工具类库
import asyncio
import aiomysql.sa as aio_sa
"""
    # 标记点
    1. 必须使用 关键字 async 定义 需要异步处理的函数
    2. await 处理的函数中,遇到io操作标识
    3. 开启引擎
    4. 执行异步sql操作
    5. 关闭引擎
    6. 开启 事件循环对象
    7. 添加执行的异步函数
    8. 关闭 事件循环对象
    
"""
async def main():
    # 通过 aio_sa 创建 引擎
    engine = await aio_sa.create_engine(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        db='flaskdb',
        connect_timeout=10  # 连接超时 10秒
    )

    # 通过 engine.acquire 获取一个mysql 数据库的连接
    async with engine.acquire() as conn:
        # 异步执行,  返回一个 异步对象
        result = await conn.execute('select * from t1;')
        print(type(result))  # <class 'aiomysql.sa.result.ResultProxy'>

        # 获取一条记录
        data = await result.fetchone()
        print(type(data))  # <class 'aiomysql.sa.result.RowProxy'>

        print(data.keys())
        print(type(data.keys()))# collections.abc.KeysView

        # 转换成列表
        print(list(data.keys()))

        # 关闭引擎
        engine.close()
        await engine.wait_closed()


# 创建循环loop事件队列
loop = asyncio.get_event_loop()
# 添加 要异步执行的函数
loop.run_until_complete(main())
# 当完成 loop队列中的所有事件,关闭事件队列
loop.close()

  • 入门案例2
### 基于 with 开启引擎和开启连接
# -*- coding: utf-8 -*-
from pprint import pprint
import asyncio
import aiomysql.sa as aio_sa

"""
    # 标记点
    1. with 自动关闭 连接和引擎
"""


async def main():
    async  with aio_sa.create_engine(host='127.0.0.1',
                                     port=3306,
                                     user='root',
                                     password='123456',
                                     db='flaskdb',
                                     connect_timeout=10  # 连接超时 10秒
                                     ) as engine:
        async with engine.acquire() as conn:
            result = await conn.execute('select * from t1;')
            data = await result.fetchall()
            pprint(list(map(dict, data)))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

# fetchmany 指定查询的条数
data = await result.fetchmany(2)
# fetchall 查询全部
data = await result.fetchall()
# fetchone 查询一条
data = await result.fetchone()

借助SQLAlchemy,通过 aiomysql 查询数据库SQL

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text

async  def main():
    async  with aio_sa.create_engine(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            db='flaskdb',
            connect_timeout=10  # 连接超时 10秒
    ) as engine:
        async with engine.acquire() as conn:
            ###
                # 1. whereclause 条件
                # 2. from_obj  表
                # 3. text 编辑 sql 条件
            
            # 利用 text / Select 生成 SQL语句
            sql = Select([text("id , name , day")],whereclause=text("id != 1"),from_obj=text("t1"))

            # 执行SQL
            result = await  conn.execute(sql)

            data = await result.fetchall()
            pprint(list(map(dict,data)))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

借助SQLAlchemy,通过 aiomysql , 执行新增操作

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table,MetaData,create_engine

async  def main():
    async  with aio_sa.create_engine(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            db='flaskdb',
            connect_timeout=10  # 连接超时 10秒
    ) as engine:
        async with engine.acquire() as conn:

            # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
            s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')

            # 反射表, MetaData是绑定引擎,获取元数据
            t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)

            inset_sql = t1_table.insert().values(
                [
                    {
                        "id":1236,
                        "name":'alex',
                        "year":'2021',
                        "month":"06",
                        "day":"16",
                        "datetime":"2021-04-28 14:32:46"
                    },   {
                        "id":1237,
                        "name":'wusir',
                        "year":'2021',
                        "month":"06",
                        "day":"16",
                        "datetime":"2021-04-28 14:32:46"
                    },
                ]
            )

            ### 必须开启事务,否则数据是不会插入到数据库中
            async with conn.begin():

                # 执行SQL
                result = await  conn.execute(inset_sql)

                # 查看最后一条记录
                print(result.lastrowid) # 0
                # 查看影响的行数
                print(result.rowcount) # 2

        # 再次查询, 查看是否插入到数据库
        async  with engine.acquire() as conn:
            data = await  (await conn.execute('select * from t1')).fetchall()
            data = list(map(dict,data))
            pprint(data)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

借助SQLAlchemy,通过 aiomysql , 执行更新操作

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text


async  def main():
    async  with aio_sa.create_engine(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            db='flaskdb',
            connect_timeout=10  # 连接超时 10秒
    ) as engine:
        async with engine.acquire() as conn:

            # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
            s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')

            # 反射表, MetaData是绑定引擎,获取元数据
            t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)
            update_sql = t1_table.update().where(text('name="alex"')).values({"year":'2222'})
            ### 必须开启事务,否则数据是不会更新到数据库中
            async with conn.begin():

                # 执行SQL
                result = await  conn.execute(update_sql)

                # 查看最后一条记录
                print(result.lastrowid) # 0
                # 查看影响的行数
                print(result.rowcount) # 1

        # 再次查询, 查看是否更新到数据库
        async  with engine.acquire() as conn:
            data = await  (await conn.execute('select * from t1 where name="alex" ')).fetchall()
            data = list(map(dict,data))
            pprint(data)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

借助SQLAlchemy,通过 aiomysql , 执行删除操作

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text


async  def main():
    async  with aio_sa.create_engine(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            db='flaskdb',
            connect_timeout=10  # 连接超时 10秒
    ) as engine:
        async with engine.acquire() as conn:

            # 创建 SQLAlchemy 中的引擎, 用来将表反射出来
            s_enging = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/flaskdb')

            # 反射表, MetaData是绑定引擎,获取元数据
            t1_table = Table('t1',MetaData(bind=s_enging),autoload=Table)
            
            delete_sql = t1_table.delete() # 全部删除

            ### 必须开启事务,否则数据是不会插入到数据库中
            async with conn.begin():

                # 执行SQL
                result = await  conn.execute(delete_sql)

                # 查看最后一条记录
                print(result.lastrowid) # 0
                # 查看影响的行数
                print(result.rowcount) # 1237

        # 再次查询, 查看是否删除了所有数据
        async  with engine.acquire() as conn:
            data = await  (await conn.execute('select * from t1 where name="alex" ')).fetchall()
            data = list(map(dict,data))
            pprint(data) #  []
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

基于 asyncpg 对 PostgreSQL 执行异步操作

    异步操作 PostgreSQL,需要使用一个 asyncpg ,直接 pip install asyncpg . asyncpg相比较于psycopg2 效果更好

异步查询

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import asyncpg

async  def main():

    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )

    # - fetchrow  获取满足调价的单条记录
    row1 = await  conn.fetchrow('select * from company;')
    pprint(row1)
    print(type(row1)) # <class 'asyncpg.Record'>
    ##### <class 'asyncpg.Record'>  Record 对象的用法
    # 1. 通过字典的方式获取数据值
    print(row1['id'],row1['name'])  # 8 Paul

    # 2. 字典的get方式获取数据
    print(row1.get('id'))

    # 3. keys , values ,items
    print(list(row1.keys()))  # tuple_iterator 字段名字
    print(list(row1.values())) # RecordIterator 值
    print(dict(row1.items())) # RecordItemsIterator 字典形式  {字段名:值}
    print(dict(row1))


    # - fetch 执行 获取满足条件的全部记录
    row2 = await  conn.fetch('select * from company;')
    pprint(row2)

    # 关闭连接
    await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

基于sqlalchemy异步PSQL查询

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy import text
from sqlalchemy.sql import Select


async  def main():

    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )
    # 利用 sqlalchemy 工具类 ,组装sql语句
    sql = Select([text('id,name,age')],whereclause=text('id != 1'),from_obj=text('company'))
    rows = await conn.fetch(str(sql))  #  asyncpg  只支持字符串查询
    pprint(list(map(dict,rows)))
    
    # conn.fetch 也支持 占位符  . $ 符号 作为占位符
    rows2 = await conn.fetch("select * from company where id != $1", 1)
    pprint(list(map(dict,rows2)))

    # 关闭连接
    await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

异步PSQL添加 INSERT

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import asyncpg
"""
    # 1. executemany 执行多条sql语句 , 不具备事务功能
    # 2. execute 执行一条SQL语句 , 具有事务功能
"""

async def main():
    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )

    ### 执行 insert 语句
    # execute 执行单条 SQL语句
    row = await conn.execute("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)", 1,'zhangsan', 18, 'bj',
                             10000)
    pprint(row)  # 'INSERT 0 1'
    pprint(type(row))  # <class 'str'>


    # executemany 执行多条语句 .第一条参数是一个模板,第二条命令是包含多个元组的列表
    rows = await conn.executemany("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)",
                                  [(2,'lisi', 18, 'sh',
                             2222), (3,'wangwu', 18, 'hn',
                             3333)])

    # 关闭连接
    await conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

异步PSQL批量添加之事务

import asyncio
from pprint import pprint
import asyncpg
"""
 1. 开启事务后,执行多个插入语句 . 遇到异常则自动回滚事务

"""
async def main():
    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )

    # 开启 一步事务 , 会开启一个事务,使用异步上下文管理
    async with conn.transaction():
        await conn.executemany("insert into company(id,name,age,address,salary) values ($1,$2,$3,$4,$5)",
                               [(2, 'lisi', 18, 'sh',2222),
                                (3, 'wangwu', 18, 'hn', 3333)])
    
    # 关闭连接
    await conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

异步PSQL修改

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import asyncpg

async def main():
    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )

    # 修改一条记录
    row = await conn.execute('update company set name = $1 where name = $2 ','zhangsan','geigei')
    print(row)  # UPDATE 0

    # 修改多条记录
    rows = await conn.executemany('update company set name = $1 where name = $2',[('zhangsangeigei','zhangsan'),('lisi','geigei2')])
    print(rows) # None


    #  returning  返回 数据 id
    row = await conn.fetch("update company set name = 'zhangsan' where name ='zhangsan' returning id")
    print(row)

    # 关闭连接
    await conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

异步PSQL删除

# -*- coding: utf-8 -*-
import asyncio
from pprint import pprint
import asyncpg

async def main():
    # 创建 数据库连接驱动
    conn = await asyncpg.connect(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10
    )

    async with conn.transaction():

        # 删除一条
        row = await conn.execute("delete from company where name in ($1,$2)","张三","李四")
        print(row)

        # 删除多条
        rows = await conn.executemany("delete from company where id = $1 ",[(2,),(3,)])
        print(rows)

        # 返回 id
        rows = await  conn.fetch("delete from company where name in ($1,$2) returning id ",'zhangsan','lisi' )
        print(rows)
    # 关闭连接
    await conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

连接池


# -*- coding: utf-8 -*-

import asyncio
import asyncpg

# asyncpg 还提供了连接池,需要的话往池子里面去取即可。

async def main():
    pool = await asyncpg.create_pool(
        host='192.168.236.128',
        port=5432,
        user='postgres',
        password='123456',
        database='mydb',
        timeout=10,
        # 连接池初始化时默认的最小连接数, 默认为 10
        min_size=10,
        # 连接池最大数量 默认是 10,
        max_size=10,
        # 每个链接最大查询数量, 超过了就换新的连接, 默认 5000
        max_queries=5000,
        # 最大不活跃时间, 默认 300.0, 超过这个时间的连接就会被关闭, 传入 0 的话则永不关闭
        max_inactive_connection_lifetime=300
    )

    async with pool.acquire() as conn:
        async with conn.transaction():
            row = await conn.fetchrow("select  '100'::int + '200'  ") # <Record ?column?=300>
            print(row)

            # 起别名  as
            row = await conn.fetchrow("select '100'::int + '200' as result ")
            print(row)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

异步版PSQL


# -*- coding: utf-8 -*-
import time
import asyncio
import asyncpg


async def run_sql(conn, query_list):
    result = []
    for query in query_list:
        result.append(await conn.fetch(query))
    await conn.close()
    return len(result)


async def main():
    async with asyncpg.create_pool(
            host='192.168.236.128',
            port=5432,
            user='postgres',
            password='123456',
            database='mydb',
            timeout=10,
            # 连接池初始化时默认的最小连接数, 默认为 10
            min_size=10,
            # 连接池最大数量 默认是 10,
            max_size=10,
            # 每个链接最大查询数量, 超过了就换新的连接, 默认 5000
            max_queries=5000,
            # 最大不活跃时间, 默认 300.0, 超过这个时间的连接就会被关闭, 传入 0 的话则永不关闭
            max_inactive_connection_lifetime=300,
    ) as pool:
        query_list = [('select * from company;') for _ in range(20)]

        # 1. 创建 4 个 异步连接
        count = len(query_list) // 5

        # 2. 切片, 将任务分成 5 份
        query_list = [query_list[c * 4:(c + 1) * 4] for c in range(count + 1)]

        tasks = []
        for q in query_list:
            conn = await pool.acquire()
            tasks.append(run_sql(conn, q))
        # gather通常被用来阶段性的一个操作,做完第一步才能做第二步
        results = await  asyncio.gather(*tasks)
        return results


if __name__ == '__main__':
    start = time.perf_counter()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
    end = time.perf_counter()

    for res in results:
        print(res)
    print(f'耗时: {end - start}')

异步操作 Redis


import asyncio
import aioredis

async def main():
    conn = await aioredis.create_connection('redis://:password@localhost:6379')
    # 指令: ex 超时
    data = await conn.execute("set","name","alex","ex",10)
    
    await conn.close()
    print(conn.closed) # True
    
asyncio.run(main())