SQLAlchmey使用
分类:
IT文章
•
2022-05-24 20:24:14
一、介绍:
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
二、安装 pip3 install sqlalchemy
三、使用:
1、连接数据库的方式
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
1 MySQL-Python
2 mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
3
4 pymysql
5 mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
6
7 MySQL-Connector
8 mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
9
10 cx_Oracle
11 oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
12
13 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
2、执行原生的sql语句
1 import time
2 import threading
3 import sqlalchemy
4 from sqlalchemy import create_engine
5 from sqlalchemy.engine.base import Engine
6
7 # 示例一:
8 engine = create_engine(
9 "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
10 max_overflow=2, # 超过连接池大小外最多创建的连接
11 pool_size=5, # 连接池大小
12 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
13 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
14 )
15
16 conn = engine.raw_connection()
17 cursor = conn.cursor()
18 cursor.execute(
19 "select * from t1"
20 )
21 result = cursor.fetchall()
22 cursor.close()
23 conn.close()
24
25 # 示例二:
26 engine = create_engine(
27 "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
28 max_overflow=0, # 超过连接池大小外最多创建的连接
29 pool_size=5, # 连接池大小
30 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
31 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
32 )
33
34
35 def task(arg):
36 conn = engine.raw_connection()
37 cursor = conn.cursor()
38 cursor.execute(
39 "select sleep(2)" # 睡两秒
40 )
41 result = cursor.fetchall()
42 cursor.close()
43 conn.close()
44
45
46 for i in range(20):
47 t = threading.Thread(target=task, args=(i,))
48 t.start()
原生sql
3、创建表结构
1 # !/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 import datetime
4 from sqlalchemy import create_engine
5 from sqlalchemy.ext.declarative import declarative_base
6 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
7
8 Base = declarative_base()
9
10
11 class Users(Base):
12 __tablename__ = 'users' # 数据库表名称
13 id = Column(Integer, primary_key=True) # id 主键
14 name = Column(String(32), index=True, nullable=False) # name列,index创建索引
15
16
17 def init_db():
18 """
19 根据类创建数据库表
20 :return:
21 """
22 engine = create_engine(
23 "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
24 max_overflow=0, # 超过连接池大小外最多创建的连接
25 pool_size=5, # 连接池大小
26 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
27 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
28 )
29
30 Base.metadata.create_all(engine)
31
32
33 def drop_db():
34 """
35 根据类删除数据库表
36 :return:
37 """
38 engine = create_engine(
39 "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
40 max_overflow=0, # 超过连接池大小外最多创建的连接
41 pool_size=5, # 连接池大小
42 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
43 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
44 )
45
46 Base.metadata.drop_all(engine)
47
48
49 if __name__ == '__main__':
50 # drop_db()
51 # init_db()
52 pass
简单表结构
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now) # now不要加()
extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'email'),
)
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))
class b2g(Base):
__tablename__ = 'b2g'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
创建有约束的表关系
4、连接数据库
# !/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import models
# 1.创建连接池
engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
# 2.从连接池中获取数据库连接
session = Session()
# 3.执行ORM操作
obj1 = models.Users(name="alex1",email='alex1@xx.com')
session.add(obj1)
session.commit()
# 4.关闭数据库连接(将连接放回连接池)
session.close()
方式1
# !/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
import models
# 1.创建连接池
engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
# 2.从连接池中获取数据库连接
# session = Session()
session = scoped_session(Session)
# 3.执行ORM操作
obj1 = models.Users(name="alex2",email='alex2@xx.com')
session.add(obj1)
session.commit()
# 4.关闭数据库连接(将连接放回连接池)
session.close()
基于scoped_session实现线程安全
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
def task(arg):
session = Session()
obj1 = Users(name="alex1")
session.add(obj1)
session.commit()
for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()
多线程下实现
5、基础增删改查操作
1 # !/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 from sqlalchemy.ext.declarative import declarative_base
4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
5 from sqlalchemy.orm import sessionmaker, relationship
6 from sqlalchemy import create_engine
7 from sqlalchemy.sql import text
8 import time
9 import threading
10 import models
11
12 # 1.创建连接池
13 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
14 Session = sessionmaker(bind=engine)
15
16 # 2.从连接池中获取数据库连接
17 session = Session()
18
19 # 3.执行ORM操作
20
21 # 1.添加数据
22 """
23 # 添加一条数据
24 obj1 = models.Users(name="alex1",email='alex2@xx.com', age=20)
25 session.add(obj1)
26
27 # 添加多条数据
28 session.add_all([
29 models.Users(name="egon1",email='egon3@xx.com', age=20),
30 models.Users(name="egon2",email='egon4@xx.com', age=20),
31 ])
32
33 '''
34 结果:
35 mysql> select * from users;
36 +----+-------+--------------+---------------------+-------+
37 | id | name | email | ctime | extra |
38 +----+-------+--------------+---------------------+-------+
39 | 1 | alex1 | alex1@xx.com | 2018-03-30 16:11:51 | NULL |
40 | 2 | alex2 | alex2@xx.com | 2018-03-30 16:11:51 | NULL |
41 | 3 | alex3 | alex3@xx.com | 2018-03-30 16:11:51 | NULL |
42 | 4 | alex4 | alex4@xx.com | 2018-03-30 16:11:51 | NULL |
43 | 5 | alex5 | alex5@xx.com | 2018-03-30 16:11:51 | NULL |
44 | 6 | egon1 | egon1@xx.com | 2018-03-30 16:14:08 | NULL |
45 | 7 | egon2 | egon2@xx.com | 2018-03-30 16:14:08 | NULL |
46 +----+-------+--------------+---------------------+-------+
47 7 rows in set (0.00 sec)
48
49 '''
50
51 session.commit()
52 """
53
54 # 2.查询数据
55 """
56 # r1 = session.query(models.Users).all() # 查询到所有数据
57 # print(r1) # 拿到所有的对象
58
59 # r2 = session.query(models.Users.name.label('xx'), models.Users.email).all() # label起别名
60 # print(r2)
61
62 # r3 = session.query(models.Users).filter(models.Users.name == 'alex1').all() # 查询名字是alex1的数据,
63 # r4 = session.query(models.Users).filter_by(name='alex1').all() # 查询名字是alex1的所有数据,
64 # r5 = session.query(models.Users).filter_by(name='alex1').first() # 查询名字是alex1的第一条数据
65 # print(r3)
66 # print(r4)
67 # print(r5)
68
69 # 查询id小于3,name='alex2'的数据
70 # r6 = session.query(models.Users).filter(text('id<:value and name=:name')).params(value=3,name='alex2').all()
71 # 查询id小于3,name='alex2'的数据,并按照id排序
72 # r7 = session.query(models.Users).filter(text('id<:value')).params(value=3).order_by(models.Users.id).all()
73 # print(r6)
74 # print(r7)
75
76 # 执行原生的sql语句
77 # r8 = session.query(models.Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='egon1').all()
78 # print(r8)
79 """
80
81 # 3.修改数据
82 """
83 # session.query(models.Users).filter(models.Users.id > 1).update({'name':'egon'})
84 # 修改name
85 # session.query(models.Users).filter(models.Users.id <= 2).update({models.Users.name:models.Users.name+'aha'},synchronize_session=False)
86 # 修改age
87 session.query(models.Users).filter(models.Users.id > 0).update({"age": models.Users.age + 1}, synchronize_session="evaluate")
88 '''
89 结果:
90 mysql> select * from users;
91 +----+----------+-----+--------------+---------------------+-------+
92 | id | name | age | email | ctime | extra |
93 +----+----------+-----+--------------+---------------------+-------+
94 | 1 | alex1aha | 21 | alex1@xx.com | 2018-03-30 16:48:54 | NULL |
95 | 2 | egonaha | 21 | egon1@xx.com | 2018-03-30 16:48:54 | NULL |
96 | 3 | egon | 21 | egon2@xx.com | 2018-03-30 16:48:54 | NULL |
97 +----+----------+-----+--------------+---------------------+-------+
98 '''
99
100 session.commit()
101 """
102
103 # 4.删除数据
104 # session.query(models.Users).filter(models.Users.name=='egon').delete()
105 # session.commit()
106
107
108 # 4.关闭数据库连接(将连接放回连接池)
109 session.close()
基础数据操作
6、常用的查询操作
1 # !/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 from sqlalchemy.ext.declarative import declarative_base
4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
5 from sqlalchemy.orm import sessionmaker, relationship
6 from sqlalchemy import create_engine
7 from sqlalchemy.sql import text
8 import time
9 import threading
10 import models
11
12 # 1.创建连接池
13 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
14 Session = sessionmaker(bind=engine)
15
16 # 2.从连接池中获取数据库连接
17 session = Session()
18
19 # 3.执行ORM操作
20 # 1.条件查询
21 """
22 # ret = session.query(models.Users).filter_by(name='alex1aha').all()
23
24 # , 表示and关系 between:区间
25 # ret = session.query(models.Users).filter(models.Users.id > 1, models.Users.name == 'egonaha').all()
26 # ret = session.query(models.Users).filter(models.Users.id.between(1, 7), models.Users.name == 'egon1').all()
27
28 # in_,表示在[]中
29 # ret = session.query(models.Users).filter(models.Users.id.in_([1,5,6])).all()
30
31 # ~:表示非
32 # ret = session.query(models.Users).filter(~models.Users.id.in_([5,6,7])).all()
33
34 # 嵌套子查询:session.query(models.Users.id).filter_by(name='egon1'))
35 # ret = session.query(models.Users).filter(models.Users.id.in_(session.query(models.Users.id).filter_by(name='egon1'))).all()
36 # for row in ret:
37 # print(row)
38
39 # and_和or_查询
40 from sqlalchemy import and_, or_
41
42 # and_:且
43 # ret = session.query(models.Users).filter(and_(models.Users.id > 3, models.Users.name == 'alex1')).all()
44
45 # or_:或
46 # ret = session.query(models.Users).filter(or_(models.Users.id < 2, models.Users.name == 'egon1')).all()
47
48 # and_和or_嵌套使用
49 # ret = session.query(models.Users).filter(
50 # or_(
51 # models.Users.id < 2, # or的内部是或的关系
52 # and_(models.Users.name == 'egon1', models.Users.id > 3), # 在and内部是且的关系
53 # models.Users.extra != ""
54 # )).all()
55 # for row in ret:
56 # print(row)
57 """
58
59
60 # 2.通配符
61 """
62 # 以e开头的
63 # ret = session.query(models.Users).filter(models.Users.name.like('e%')).all()
64 # 不是以e开头的
65 # ret = session.query(models.Users).filter(~models.Users.name.like('e%')).all()
66 # for row in ret:
67 # print(row)
68
69 """
70
71
72 # 3.限制
73 """
74 # 类似原生sql中的limit,不取最后
75 # ret = session.query(models.Users)[1:3]
76 # for row in ret:
77 # print(row)
78
79 """
80
81
82 # 4.排序
83 """
84 # 倒序
85 # ret = session.query(models.Users).order_by(models.Users.name.desc()).all()
86 # 先按name倒序,然后按id正序
87 # ret = session.query(models.Users).order_by(models.Users.name.desc(), models.Users.id.asc()).all()
88 # for row in ret:
89 # print(row.name)
90 """
91
92
93 # 5.分组
94 """
95 from sqlalchemy.sql import func
96
97 # 根据年龄分组
98 # ret = session.query(models.Users).group_by(models.Users.age).all()
99
100 # 根据年龄分组,并求组内最大id,id的和,最小id
101 # ret = session.query(
102 # func.max(models.Users.id),
103 # func.sum(models.Users.id),
104 # func.min(models.Users.id)).group_by(models.Users.age).all()
105
106 # ret = session.query(
107 # func.max(models.Users.id),
108 # func.sum(models.Users.id),
109 # func.min(models.Users.id)).group_by(models.Users.age).having(func.min(models.Users.id) >2).all()
110 #
111 # for row in ret:
112 # print(row)
113
114 """
115
116
117 # 6.连表
118 """
119 # 先查询再连表
120 # ret = session.query(models.Person, models.Hobby).filter(models.Person.hobby_id == models.Hobby.id).all()
121
122 # 先连表查询,默认是inner join
123 # ret = session.query(models.Person).join(models.Hobby).all()
124 # isouter:True;表示是左查询,SQLAlchemy没有右查询
125 # ret = session.query(models.Person).join(models.Hobby, isouter=True).all()
126
127 # for row in ret:
128 # print(row.name)
129 """
130
131
132 # 7.组合
133 """
134 # 查询到person的id大于2的person对象的名字
135 # q1 = session.query(models.Person.name).filter(models.Person.nid > 2)
136 # 查询到hobby的id小于5的hobby对象的名称
137 # q2 = session.query(models.Hobby.caption).filter(models.Hobby.id < 5)
138 # 组合查询把他们放到一起,自动去重
139 # ret = q1.union(q2).all()
140
141 # for row in ret:
142 # print(row)
143
144 # q1 = session.query(models.Person.name).filter(models.Person.nid > 2)
145 # q2 = session.query(models.Hobby.caption).filter(models.Hobby.id < 5)
146 # 重复字段也会显示
147 # ret = q1.union_all(q2).all()
148 #
149 # for row in ret:
150 # print(row)
151 """
152
153
154 # 4.关闭数据库连接(将连接放回连接池)
155 session.close()
常用查询操作
7、执行原生sql
1 from sqlalchemy.orm import sessionmaker, relationship
2 from sqlalchemy import create_engine
3 from sqlalchemy.sql import text
4 from sqlalchemy.engine.result import ResultProxy
5
6 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
7 Session = sessionmaker(bind=engine)
8
9 session = Session()
10
11 """
12 # 查询
13 cursor = session.execute('select * from users')
14 result = cursor.fetchall()
15 print(result)
16
17 # 添加
18 cursor = session.execute('insert into users(name,email,age) values(:name,:email,:age)',params={"name":'wupeiqi','email':'wpq@xx.com','age':18})
19 session.commit()
20 print(cursor.lastrowid)
21
22 """
23
24 session.close()
执行原生sql
8、基于relationship操作一对多关联字段
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 import time
4 import threading
5
6 from sqlalchemy.ext.declarative import declarative_base
7 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
8 from sqlalchemy.orm import sessionmaker, relationship
9 from sqlalchemy import create_engine
10 from sqlalchemy.sql import text
11 from sqlalchemy.engine.result import ResultProxy
12 from models import Users, Hosts, Hobby, Person
13
14 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
15 Session = sessionmaker(bind=engine)
16 session = Session()
17 # 1.添加
18 """
19 # session.add_all([
20 # Hobby(caption='乒乓球'),
21 # Hobby(caption='羽毛球'),
22 # Person(name='张三', hobby_id=7),
23 # Person(name='李四', hobby_id=8),
24 # ])
25 # 这样不仅会创建一条person表数据,还会在hobby表中生成一条数据
26 # person = Person(name='张九', hobby=Hobby(caption='姑娘'))
27 # session.add(person)
28
29 '''
30 结果:
31 mysql> select * from hobby;
32 +----+-----------+
33 | id | caption |
34 +----+-----------+
35 | 7 | 乒乓球 |
36 | 8 | 羽毛球 |
37 | 9 | 姑娘 |
38 +----+-----------+
39 3 rows in set (0.00 sec)
40
41 mysql> select * from person;
42 +-----+--------+----------+
43 | nid | name | hobby_id |
44 +-----+--------+----------+
45 | 4 | 张三 | 7 |
46 | 5 | 李四 | 8 |
47 | 6 | 张九 | 9 |
48 +-----+--------+----------+
49 3 rows in set (0.00 sec)
50 '''
51
52 # 创建hobby对象,然后添加person对象
53 # hb = Hobby(caption='动漫')
54 # hb.pers = [Person(name='文飞'), Person(name='博雅')]
55 # session.add(hb)
56 '''
57 结果:
58 mysql> select * from hobby;
59 +----+-----------+
60 | id | caption |
61 +----+-----------+
62 | 7 | 乒乓球 |
63 | 8 | 羽毛球 |
64 | 9 | 姑娘 |
65 | 10 | 动漫 |
66 +----+-----------+
67 4 rows in set (0.00 sec)
68
69 mysql> select * from person;
70 +-----+--------+----------+
71 | nid | name | hobby_id |
72 +-----+--------+----------+
73 | 4 | 张三 | 7 |
74 | 5 | 李四 | 8 |
75 | 6 | 张九 | 9 |
76 | 7 | 文飞 | 10 |
77 | 8 | 博雅 | 10 |
78 +-----+--------+----------+
79 5 rows in set (0.00 sec)
80 '''
81
82 session.commit()
83
84 """
85
86 # 2.使用relationship正向查询
87 # v = session.query(Person).first()
88 # print(v.name)
89 # print(v.hobby.caption)
90
91 # 3.使用relationship反向查询
92 # v = session.query(Hobby).first()
93 # print(v.caption)
94 # print(v.pers[0].name)
95
96 session.close()
操作一对多关联字段
9、基于relationship操作多对多关联字段
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 import time
4 import threading
5
6 from sqlalchemy.ext.declarative import declarative_base
7 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
8 from sqlalchemy.orm import sessionmaker, relationship
9 from sqlalchemy import create_engine
10 from sqlalchemy.sql import text
11 from sqlalchemy.engine.result import ResultProxy
12 from models import Users, Hosts, Hobby, Person, Girl, B2g, Boy
13
14 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
15 Session = sessionmaker(bind=engine)
16 session = Session()
17 # 1.添加
18 """
19 # 给两张多对多的表添加数据
20 # session.add_all([
21 # Boy(name='b1'),
22 # Boy(name='b2'),
23 # Girl(name='g1'),
24 # Girl(name='g2'),
25 # ])
26 # session.commit()
27
28 # 给关系表添加数据
29 # s2g = B2g(boy_id=1, girl_id=1)
30 # session.add(s2g)
31 # session.commit()
32
33 # 通过relationship添加数据
34 # gp = Girl(name='G3')
35 # gp.Boys = [Boy(name='B3'),Boy(name='B4')]
36 # session.add(gp)
37 # session.commit()
38
39 # 通过relationship添加数据
40 # ser = Boy(name='B5')
41 # ser.Girls = [Girl(name='G4'),Girl(name='G5')]
42 # session.add(ser)
43 # session.commit()
44
45 """
46
47 # 2.使用relationship正向查询
48 # v = session.query(Girl).first()
49 # print(v.name)
50 # print(v.boy)
51
52 # 3.使用relationship反向查询
53 # v = session.query(Boy).first()
54 # print(v.name)
55 # print(v.girl)
56
57
58 session.close()
多对多关系操作
10、关联子查询
1 from sqlalchemy.orm import sessionmaker, relationship
2 from sqlalchemy import create_engine
3 from sqlalchemy.sql import text, func
4 from models import Girl,Boy,B2g
5
6 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/sqlalchemy_text1?charset=utf8", max_overflow=0, pool_size=5)
7 Session = sessionmaker(bind=engine)
8 session = Session()
9
10 # 关联子查询
11 # 一个查询语句中包含了子查询,子查询中,包含了一个关联表查询操作
12 subqry = session.query(func.count(Boy.id).label("bid")).filter(Boy.id == Girl.id).correlate(Girl).as_scalar()
13 result = session.query(Girl.name, subqry)
14 print(result) # 查看原生sql语句
15
16 """
17 SELECT girl.name AS girl_name, (SELECT count(boy.id) AS bid FROM boy WHERE boy.id = girl.id) AS anon_1 FROM girl
18 """
关联子查询