SQLAlchemy
分类:
IT文章
•
2022-09-01 16:33:12
一.、介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
- Engine,框架的引擎
- Connection Pooling ,数据库连接池
- Dialect,选择连接数据库的DB API种类
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须依赖pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
# MySQL-Python
mysql+mysqldb://user:password@host[:port]/dbname
# pymysql
mysql+pymysql://username:password@host/dbname[?options]
# MySQL-Connector
mysql+mysqlconnector://user:password@host[:port]/dbname
# cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
# 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
二、使用
2.1 执行原生SQL语句
import threading
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from userinfo"
)
ret = cursor.fetchall()
print(ret)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
View Code
2.2 ORM
创建数据库表
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date, UniqueConstraint, Index
from sqlalchemy import create_engine
Base = declarative_base()
class UserInfo(Base):
__tablename__ = "UserInfo"
id = Column(Integer, primary_key=True)
name = Column(String(16), index=True, unique=True, nullable=False)
age = Column(Integer)
birthday = Column(Date)
__table_args__ = (
# UniqueConstraint("id", "name", name="unic_id_name"), # 联合唯一索引
# Index("idx_age_birthday", "age", "birthday"), # 联合索引
)
def init_db():
"""根据类创建数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""根据类删除数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)
Base.metadata.drop_all(engine)
if __name__ == "__main__":
drop_db()
init_db()
创建单表
操作数据库表(简单示例)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import UserInfo
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
obj = UserInfo(name="pd", age=18, birthday=datetime.date.today())
session.add(obj)
# 提交事务
session.commit()
# 关闭session
session.close()
PS:要想查看SQL语句,去掉all(),打印查询语句即可。
基本增删改查操作
import datetime
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.orm import sessionmaker
from models import UserInfo
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
#################### 增 ####################
"""
# 新增单条数据
obj = UserInfo(name="pd", age=18, birthday=datetime.date.today())
session.add(obj)
# 新增多条数据
session.add_all([
UserInfo(name="盲僧", age=10, birthday=datetime.date.today()),
UserInfo(name="妖姬", age=20, birthday=datetime.date.today())
])
"""
#################### 删 ####################
"""
session.query(UserInfo).filter(UserInfo.id==1).delete()
"""
#################### 改 ####################
"""
session.query(UserInfo).filter(UserInfo.name=="pd").update({UserInfo.name: "xx"})
session.query(UserInfo).filter(UserInfo.name=="pd").update({"name": "xx"})
session.query(UserInfo).filter(UserInfo.name=="pd").update({"age": UserInfo.age+10})
session.query(UserInfo).filter(UserInfo.name=="pd").update({"name": UserInfo.name+"真帅"}, synchronize_session=False) # 连接字符串
"""
#################### 查 ####################
"""
ret1 = session.query(UserInfo).all()
ret2 = session.query(UserInfo.name.label("n"), UserInfo.age).all() # label相当于起别名
for row in ret2:
print(row.n, row.age) # [('pd', 10), ('盲僧', 10), ('妖姬', 20)]
ret3 = session.query(UserInfo).filter(UserInfo.age==10).all()
ret4 = session.query(UserInfo).filter_by(age=10).all()
ret5 = session.query(UserInfo).filter_by(age=10).first()
ret6 = session.query(UserInfo).from_statement(text("select * from userinfo where name=:name")).params(name="pd").all()
for row in ret3:
print(row.name)
"""
# 提交事务(查询操作不需要)
# session.commit()
# 关闭session
session.close()
View Code
常用操作(条件、与、或、排序、分组......)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import UserInfo
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
#################### 指定查询哪列 ####################
ret1 = session.query(UserInfo.id.label("nid"), UserInfo.name).all()
for row in ret1:
print(row.nid, row.name)
#################### 条件 ####################
ret1 = session.query(UserInfo).filter_by(name="盲僧").all()
ret2 = session.query(UserInfo).filter(UserInfo.id > 1, UserInfo.name == "盲僧").all()
ret3 = session.query(UserInfo).filter(UserInfo.id.between(1, 3), UserInfo.name == "盲僧").all()
ret4 = session.query(UserInfo).filter(UserInfo.id.in_([1, 3])).all() # 查询id在什么范围
ret5 = session.query(UserInfo).filter(~UserInfo.id.in_([1, 3])).all() # 查询id不在什么范围
#################### 子查询 ####################
ret1 = session.query(UserInfo).filter(UserInfo.id.in_(session.query(UserInfo.id).filter_by(name="劫"))).all()
#################### 与、或 ####################
from sqlalchemy import and_, or_
ret1 = session.query(UserInfo).filter(and_(UserInfo.id > 3, UserInfo.name == "提莫")).all()
ret2 = session.query(UserInfo).filter(or_(UserInfo.id < 3, UserInfo.name == "提莫")).all()
for row in ret2:
print(row.name)
ret3 = session.query(UserInfo).filter(
or_(
UserInfo.id < 3,
and_(UserInfo.id > 3, UserInfo.name == "提莫")
)
).all()
for row in ret3:
print(row.name)
#################### 通配符 ####################
ret1 = session.query(UserInfo).filter(UserInfo.name.like("盲%")).first()
ret2 = session.query(UserInfo).filter(UserInfo.name.like("盲%")).all()
ret3 = session.query(UserInfo).filter(~UserInfo.name.like("盲%")).all()
################### 限制(切片) ####################
ret1 = session.query(UserInfo)[1:2] # 顾头不顾尾
#################### 排序 ####################
# desc()倒序 asc()正序
ret1 = session.query(UserInfo).order_by(UserInfo.id.desc()).all()
ret2 = session.query(UserInfo).order_by(UserInfo.id.desc(), UserInfo.name.asc()).all()
#################### 分组 ####################
from sqlalchemy.sql import func
ret1 = session.query(UserInfo).group_by(UserInfo.age).all()
ret2 = session.query(
func.max(UserInfo.id),
func.min(UserInfo.id)).group_by(UserInfo.age).all()
for item in ret2:
print(item)
ret3 = session.query(
UserInfo.depart_id,
func.count(UserInfo.id)
).group_by(UserInfo.depart_id).having(func.count(UserInfo.id) >= 2).all()
for item in ret3:
print(item)
#################### 连表 ####################
session.query(Book).join(Author).all()
session.query(tb1).join(tb2).join(tb3).all()
session.query(Book, Author).join(Author).all()
session.query(Book, Author).join(Author, Author.id == Book.author_id).all()
session.query(Book.id, Book.title, Author.name).join(Author).all()
session.query(Book.id, Book.title, Author.name).join(Author, Author.id == Book.author_id).all()
session.query(Book, Author).join(Author, isouter=True).all()
#################### 组合(两张表的数据组合到一起) ####################
# union()去重 union_all()不去重
q1 = session.query(UserInfo.name).filter(UserInfo.id < 3)
q2 = session.query(UserInfo.name).filter(UserInfo.id > 3)
ret1 = q1.union(q2).all()
q3 = session.query(UserInfo.name).filter(UserInfo.id < 3 )
q4 = session.query(UserInfo.name).filter(UserInfo.id < 3 )
ret2 = q3.union_all(q4).all()
# 提交事务(查询操作不需要)
# session.commit()
# 关闭session
session.close()
View Code
创建多个表并包含Fk关系以及基于relationship操作FK
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship
Base = declarative_base()
class Publisher(Base):
__tablename__ = "publisher"
id = Column(Integer, primary_key=True)
name = Column(String(16), index=True, nullable=False)
class Book(Base):
__tablename__ = "book"
id = Column(Integer, primary_key=True)
title = Column(String(16), index=True, nullable=False)
publisher_id = Column(Integer, ForeignKey("publisher.id"))
# 与生成表结构无关,不会在数据库生成列,仅用于查询方便
publisher = relationship("Publisher", backref="books")
def init_db():
"""根据类创建数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""根据类删除数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)
Base.metadata.drop_all(engine)
if __name__ == "__main__":
drop_db()
init_db()
models.py
from models import Book, Publisher
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
# 查询所有书籍
ret = session.query(Book).all()
for row in ret:
print(row.id, row.title)
#################### 基于join操作FK ####################
# 查询所有书籍及其出版社
# 方式1:
ret1 = session.query(Book, Publisher).join(Publisher).all()
# ret1 = session.query(Book, Publisher).join(Publisher, Publisher.id == Book.publisher_id).all()
for row in ret1:
print(row[0].id, row[0].title, row[1].name)
# 方式2:
ret2= session.query(Book.id, Book.title, Publisher.name).join(Publisher).all()
# ret2 = session.query(Book.id, Book.title, Publisher.name).join(Publisher, Publisher.id == Book.publisher_id).all()
for row in ret2:
print(row.id, row.title, row.name)
#################### 基于relationship操作FK ####################
# 查询所有书籍及其出版社名
ret3 = session.query(Book).all()
for row in ret3:
print(row.id, row.title, row.publisher.name)
# 查询"苹果出版社"出版的所有书籍
obj = session.query(Publisher).filter(Publisher.name == "苹果出版社").first()
for row in obj.books:
print(row.id, row.title, obj.name)
#################### 基于relationship的增操作 ####################
# 添加"橘子出版社",再给该出版社添加一本书"你在干嘛?"
obj = Book(title="你在干嘛?", publisher=Publisher(name="橘子出版社"))
session.add(obj)
# 添加"樱桃出版社",再给该出版社添加多本书"你看不到我"、"我看到你了"
# 可以根据relationship中的backref反向操作
obj = Publisher(name="樱桃出版社")
obj.books = [Book(title="你看不到我"), Book(title="我看到你了")]
session.add(obj)
session.commit()
# 关闭session
session.close()
View Code
创建多个表并包含M2M关系以及基于relationship操作M2M
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship
Base = declarative_base()
class Author(Base):
__tablename__ = "author"
id = Column(Integer, primary_key=True)
name = Column(String(16), index=True, nullable=False)
# 与生成表结构无关,不会在数据库生成列,仅用于查询方便
book_list = relationship("Book", secondary="author2book", backref="author_list")
class Book(Base):
__tablename__ = "book"
id = Column(Integer, primary_key=True)
title = Column(String(16), index=True, nullable=False)
class Author2Book(Base):
__tablename__ = "author2book"
id = Column(Integer, primary_key=True, autoincrement=True)
author_id = Column(Integer, ForeignKey("author.id"))
book_id = Column(Integer, ForeignKey("book.id"))
__table_args__ = (
UniqueConstraint("author_id", "book_id", name="uic_ar_bk"), # 联合唯一索引
# Index("idx_id_name", "id", "name"), # 联合索引
)
def init_db():
"""根据类创建数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""根据类删除数据库表"""
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)
Base.metadata.drop_all(engine)
if __name__ == "__main__":
drop_db()
init_db()
models.py
from models import Book, Author, Author2Book
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
#################### 普通操作M2M ####################
# 添加数据
"""
session.add_all([
Author(name="熊大"),
Author(name="光头强"),
Book(title="Python"),
Book(title="Java"),
])
session.commit()
"""
"""
session.add_all([
Author2Book(author_id=1, book_id=1),
Author2Book(author_id=1, book_id=2),
Author2Book(author_id=2, book_id=1),
])
session.commit()
"""
# 三张表关联
"""
ret = session.query(Author2Book.id, Author.name, Book.title).join(Author, Author.id == Author2Book.author_id).join(Book, Book.id == Author2Book.book_id).order_by(Author2Book.id.asc()).all()
for row in ret:
print(row.id, row.name, row.title)
# 1 熊大 Python
# 2 熊大 Java
# 3 光头强 Python
"""
#################### 基于relationship操作M2M ####################
# 查询"熊大"的所有书籍
obj = session.query(Author).filter(Author.name == "熊大").first()
for item in obj.book_list:
print(item.id, item.title)
# 查询"Python"这本书的所有作者
obj = session.query(Book).filter(Book.title == "Python").first()
for item in obj.author_list:
print(item.id, item.name)
# 添加作者"佩奇",再添加两本书"PHP"、"C",然后这两本书关联这个作者
obj = Author(name="佩奇")
obj.book_list = [Book(title="PHP"), Book(title="C")]
session.add(obj)
# 添加书籍"C++", 再添加两位作者"蜡笔小新"、"机器猫",然后这本书关联这两位作者
obj = Book(title="C++")
obj.author_list = [Author(name="蜡笔小新"), Author(name="机器猫")]
session.add(obj)
session.commit()
session.close()
View Code
基于scoped_session实现线程安全
import threading
from models import Author
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = scoped_session(SessionFactory)
def task(arg):
ret = session.query(Author).all()
for row in ret:
print(row.name)
session.remove()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
View Code
执行原生sql(多种方式)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = SessionFactory()
cursor = session.execute("insert into author(name) values(:value)", params={"value": "花木兰"})
print(cursor.lastrowid)
session.commit()
session.close()
################################################################################
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)
def task(arg):
cursor = session.execute("insert into author(name) value(:value)", params={"value": "佩奇"})
print(cursor.lastrowid)
session.commit()
session.remove()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
################################################################################
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://root:""@127.0.0.1:3306/sqlalchemy_db?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=20, # 池中没有连接最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
def task(arg):
session = SessionFactory()
cursor = session.execute("insert into author(name) value(:value)", params={"value": "乔治"})
print(cursor.lastrowid)
session.commit()
session.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
View Code