21,SQLachemy基础

SQLAchemy是python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,

简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

ORM框架的作用就是把数据库表的一行记录与一个对象互相做自动转换。 正确使用ORM的前提是了解关系数据库的原理。 ORM就是把数据库表的行与相应的对象建立关联,互相转换。 由于关系数据库的多个表还可以用外键实现一对多、多对多等关联,相应地, ORM框架也可以提供两个对象之间的一对多、多对多等功能。

安装:

pip3 install SQLALchemy

 数据库:

21,SQLachemy基础

在这些URL 中,hostname 表示MySQL 服务所在的主机,可以是本地主机(localhost),
也可以是远程服务器。数据库服务器上可以托管多个数据库,因此database 表示要使用的
数据库名。如果数据库需要进行认证,username 和password 表示数据库用户密令

程序使用的数据库URL 必须保存到Flask 配置对象的SQLALCHEMY_DATABASE_URI 键中。配
置对象中还有一个很有用的选项,即SQLALCHEMY_COMMIT_ON_TEARDOWN 键,将其设为True
时,每次请求结束后都会自动提交数据库中的变动

from flask.ext.sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] =
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
# db 对象是SQLAlchemy 类的实例,表示程序使用的数据库,同时还获得了Flask-SQLAlchemy
提供的所有功能。

21,SQLachemy基础

21,SQLachemy基础

class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
def __repr__(self):
return '<User %r>' % self.username
示例

一、内部处理

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
  
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )
  
# 新插入行自增ID
# cur.lastrowid
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )
  
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )
  
# 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

查询 删除和插入类似 都需要先实例一个 sqlalchemy.sql.dml 对象

create_engine() 会返回一个数据库引擎,echo 参数为 True 时,会显示每条执行的 SQL 语句,生产环境下可关闭。
sessionmaker() 会生成一个数据库会话类。这个类的实例可以当成一个数据库连接,它同时还记录了一些查询的数据,并决定什么时候执行 SQL 语句。由于 SQLAlchemy 自己维护了一个数据库连接池(默认 5 个连接)

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('mysql://root:admin123@127.0.0.1:3306/sqlachemy_test?charset=utf8',echo=True)
db = MetaData(engine) # 绑定元信息
DB_Session = sessionmaker(bind=engine)
session = DB_Session()

sql_create_test = Table('sqlachemy_create_test', db,
      Column('id', Integer, primary_key=True),
      Column('name', String(40)),
      Column('email', String(120))
      )
# 创建表
# sql_create_test.create()

# 拿到一个句柄
create_i = sql_create_test.insert()
print create_i
# 插入
create_i.execute(name='zk', email='yyyxxx@16.com')
# 批量插入
create_i.execute({'name': 'ghost'}, {'name': 'test'})

 二、ORM功能使用

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。

根据类创建对象,对象转换成SQL,执行SQL。

  1、创建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))
 
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )
 
 
# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
 
 
class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
 
 
# 多对多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = 'server'
 
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
 
 
def init_db():  # 创建表
    Base.metadata.create_all(engine)
 
 
def drop_db():  # 删除表
    Base.metadata.drop_all(engine)


# drop_db()
# init_db()
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

DB_CONNECT_STRING = 'mysql://root:admin123@127.0.0.1:3306/sqlachemy_test?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
db = DB_Session()
BaseModel = declarative_base()

# primary_key主键
# nullable=False不允许为空

class RHEvEnvModel(BaseModel):
    """
    环境维护模块
    """
    __tablename__ = 'inf_kvm_env'
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)       # kvm名称
    user = Column(String(20), nullable=False)       # kvm用户
    password = Column(String(20), nullable=False)   # kvm密码
    endpoint = Column(String(20), nullable=False)   # kvm地址
    dashboard = Column(String(100), nullable=False) # 管理界面地址
    created = Column(DateTime, nullable=False)      # 创建时间
    removed = Column(DateTime)                      # 删除时间
    zone_id = Column(Integer, ForeignKey('inf_zone.id'))    # zone_id
    desc = Column(String(255))                      # kvm描述信息
    type = Column(String(20))                       # KVM类型

  2、操作表

obj = Users(name='alex1', exeven='sd')
session.add(obj)
session.add_all([
    Users(name='alex2', exeven='sd'),
    Users(name='alex3', exeven='sd'),
])
session.commit()

session.query(users.id).filter(Users.id > 2).delete()
session.commit()  

session.query(Users).filter(Users.id > 2).update({"name" : "999"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"},
synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

其他

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()
 
 
# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()
 
# 限制
ret = session.query(Users)[1:2]
 
# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
 
# 分组
from sqlalchemy.sql import func
 
ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()
 
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
 
# 连表
 
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
 
ret = session.query(Person).join(Favor).all()
 
ret = session.query(Person).join(Favor, isouter=True).all()
 
 
# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
 
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

其他2

21,SQLachemy基础

21,SQLachemy基础

from sqlalchemy import func, or_, not_


user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
session.commit()

query = session.query(User)
print query # 显示SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
    print user.name
print query.all() # 返回的是一个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None
# print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).first().name
print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(User.name)
print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()

print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in

query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()

print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回数
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()

query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name

user.name = 'd'
session.flush() # 写数据库,但并不提交
print query.get(1).name

session.delete(user)
session.flush()
print query.get(1)

session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

三、单表与多表

常用的SQLAlchemy关系选项

21,SQLachemy基础

21,SQLachemy基础

1、一对多

除了一对多之外,还有几种其他的关系类型。一对一关系可以用前面介绍的一对多关系
表示,但调用db.relationship() 时要把uselist 设为False,把“多”变成“一”。多对
一关系也可使用一对多表示,对调两个表即可,或者把外键和db.relationship() 都放在
“多”这一侧。最复杂的关系类型是多对多,需要用到第三张表,这个表称为关系表

# !/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/kong')  # 连接已存在的数据库
 
Base = declarative_base()  # 根据SQL创建一个基类
 
class Son(Base):
    __tablename__ = 'son'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
 
    father_id = Column(Integer, ForeignKey('father.id'))  # 外键(外键放在哪个类下,哪个就是多)
 
class Father(Base):
    __tablename__ = 'father'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
 
    son = relationship('Son')  # 是取与son关联的数据(通过父关联子找)
    # son = relationship('Son', backfe="father")  # backfe="father"是(“backfe”是关键字通过子关联父找)
 
Base.metadata.create_all(engine)  # 创建所有的表
# Base.metadata.drop_all(engine)  # 删除表
 
Session = sessionmaker(bind=engine)
session = Session()
f1 = Father(name='alvin', age=50)
# session.commit()
w1 = Son(name='little alvin1', age=4)
w2 = Son(name='little alvin2', age=5)
w3 = Son(name='little alvin3', age=5)
f1.son = [w1, w2, w3]
 
 
session.add_all([f1, w1, w2])
session.commit()

 关联查询(relationship)

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root@127.0.0.1:3306/kong')
 
Base = declarative_base()
 
 
class Son(Base):
    __tablename__ = 'son'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age= Column(String(16))
 
    father_id=Column(Integer,ForeignKey('father.id'))  # 外键关系,关联两张表的关系(下面的关联查询)
 
 
class Father(Base):
    __tablename__ ='father'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age= Column(String(16))
     
    son=relationship('Son',backref='father')  # 相当于在father类下写father=relationship('father')和在son类下写son=relationship('son')一样的效果
                                              # 通过儿子关联并找父亲的信息;通过父亲关联并找儿子的信息(这就是relationship的关系)
 
Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
 
# ret=session.query(Father.name.label('kkk'),Son.name.label('ppp')).join(Son)  # (关联查询)关联儿子并拿出所有的符合条件的数据
# print(ret)                                # Son.name.label('ppp')) 是给son.name起一个名字;label是关键字
 
#f1=session.query(Father).filter_by(id=1).first()  # 查询父亲的信息
# print(f1.son)
# s1=session.query(Son).filter_by(id=2).first()  # # 查询儿子的信息;filter_by是键值对形式的查询;filter是条件的形式查询
# print(s1.father.name,s1.name)
 
f1=session.query(Father).filter_by(id=1).first()  # 不加first这类的索引只能得到sql语句不能得到具体的数据。
w4=Son(name='little alvin4',age=5)  # 创建一条数据(这就是relationship内部帮实现的)
f1.son.append(w4)  # 插入这一条信息
 
 
session.add(f1)
session.commit()

2、多对多 

# !/usr/bin/env python
# -*- coding:utf-8 -*-
 
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/kong?charset=utf8')  # 连接已存在的数据库; 插入汉子就要添加编码解析?charset=utf8
 
Base = declarative_base()  # 根据SQL创建ORM的基类
 
class Men_to_Wemon(Base):
    __tablename__ = 'men_to_wemon'
    nid = Column(Integer, primary_key=True)
    id = Column(Integer, primary_key=True)
    men_id = Column(Integer, ForeignKey('men.id'))
    women_id = Column(Integer, ForeignKey('women.id'))
 
class Men(Base):
    __tablename__ = 'men'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
    # gf = relationship("Women", secondary=Men_to_Wemon.__table__)  # 可以在下面的backref='gf'替代,表示关联;
                                                                    # secondary如果有第三张表会自动关联必须加__table__,
 
class Women(Base):
    __tablename__ = 'women'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
    bf = relationship("Men", secondary=Men_to_Wemon.__table__, backref='gf')
 
Base.metadata.create_all(engine)  # 在数据库生成表
Session = sessionmaker(bind=engine)  # 通过激活sessionmaker的__call__方法来return一个Session实例(Session类下提供了增删改查的具体方法)
session = Session()
 
# 下面是插入数据
# m1 = Men(name='alex', age=18)
# m2 = Men(name='wusir', age=18)
# w1 = Women(name='如花', age=26)
# w2 = Women(name='铁蛋', age=30)
# session.add_all([m1, m2, w1, w2])
# session.commit()  # 提交添加的数据
 
# t1 = Men_to_Wemon(men_id=1, women_id=2)  # 第三张表,让之前的两张表创建一个对应关系
 
 
 
m1 = session.query(Men).filter_by(id=2).first()  # 查询Men的信息是(列表)
w1 = session.query(Women).all()  # 查询Women的信息是(列表)
m1.gf = w1  # 让查询的信息创建关系
 
session.add_all([m1])
session.commit()
 
# 需要注意的地方:
#    1 查询时如果不加all,first等,得到的是sql语句,加上后,才是具体的结果;而all的结果是一个列表。
#    2 m1.gf是一个列表,里面存放着符合条件的对象。
#    3 filter与filter_by的区别:filter是拿键值对的参数,filter_by是拿条件判断的参数。

数据库事务:

数据库会话db.session和Flasksession 对象没有关系。数据库会话也称为事务。

数据库会话能保证数据库的一致性。提交操作使用原子方式把会话中的对象全部写入数据库。

如果在写入会话的过程中发生了错误,整个会话都会失效。如果你始终把相关改动放
在会话中提交,就能避免因部分更新导致的数据库不一致性。
数据库会话也可回滚。调用db.session.rollback() 后,添加到数据库会话中的所有对象都会还原到它们在数据库时的状态。

 补充

如何查看查询生成的原生SQL:

>>> str(User.query.filter_by(role=user_role))
'SELECT users.id AS users_id, users.username AS users_username,
users.role_id AS users_role_id FROM users WHERE :param_1 = users.role_id'

如何批量插入大批数据?
可以使用非 ORM 的方式:

session.execute(
    User.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

 如何让执行的 SQL 语句增加前缀?
使用 query 对象的 prefix_with() 方法:

session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

 如何替换一个已有主键的记录?
使用 session.merge() 方法替代 session.add(),其实就是 SELECT + UPDATE:

user = User(id=1, name='ooxx')
session.merge(user)
session.commit()

 如何使用无符号整数?
可以使用 MySQL:

from sqlalchemy.dialects.mysql import INTEGER
id = Column(INTEGER(unsigned=True), primary_key=True)

 如何指定使用 InnoDB,以及使用 UTF-8 编码?
最简单的方式就是修改数据库的默认配置。如果非要在代码里指定的话,可以这样:

class User(BaseModel):
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

 如何设置外键约束?

from random import randint
from sqlalchemy import ForeignKey


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    age = Column(Integer)


class Friendship(BaseModel):
    __tablename__ = 'friendship'

    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id'))
    user_id2 = Column(Integer, ForeignKey('user.id'))

更新之后拿到更新数据的ID

    def test_sqlachemy_select():
        obj = db.session.query(VMWareEnvModel).filter_by(name='changename')
        # <class 'sqlalchemy.orm.query.Query'>
        # .first后 <class 'app.scheduler.rhev.env.models.VMWareEnvModel'>
        obj.update({VMWareEnvModel.user: "SQLTEST", VMWareEnvModel.password: "abc123456"})
        db.session.commit()
        obj_id = obj.first().id
        print obj_id

 为什么无法删除 in 操作查询出来的记录?

session.query(User).filter(User.id.in_((1, 2, 3))).delete()

 抛出这样的异常:

sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.

但这样是没问题的:

session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()
几种常见sqlalchemy查询:

    #简单查询
    print(session.query(User).all())
    print(session.query(User.name, User.fullname).all())
    print(session.query(User, User.name).all())
    
    #带条件查询
    print(session.query(User).filter_by(name='user1').all())
    print(session.query(User).filter(User.name == "user").all())
    print(session.query(User).filter(User.name.like("user%")).all())
    
    #多条件查询
    print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
    print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())
    
    #sql过滤
    print(session.query(User).filter("id>:id").params(id=1).all())
    
    #关联查询 
    print(session.query(User, Address).filter(User.id == Address.user_id).all())
    print(session.query(User).join(User.addresses).all())
    print(session.query(User).outerjoin(User.addresses).all())
    
    #聚合查询
    print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
    print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
    
    #子查询
    stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
    print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())
    
    #exists
    print(session.query(User).filter(exists().where(Address.user_id == User.id)))
    print(session.query(User).filter(User.addresses.any()))
限制返回字段查询

person = session.query(Person.name, Person.created_at,                     
             Person.updated_at).filter_by(name="zhongwei").order_by(            
             Person.created_at).first()
记录总数查询:

from sqlalchemy import func

# count User records, without
# using a subquery.
session.query(func.count(User.id))

# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).
        group_by(User.name)

from sqlalchemy import distinct

# count distinct "name" values
session.query(func.count(distinct(User.name)))

  

 
 
 

SQLAchemy是python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,

简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

ORM框架的作用就是把数据库表的一行记录与一个对象互相做自动转换。 正确使用ORM的前提是了解关系数据库的原理。 ORM就是把数据库表的行与相应的对象建立关联,互相转换。 由于关系数据库的多个表还可以用外键实现一对多、多对多等关联,相应地, ORM框架也可以提供两个对象之间的一对多、多对多等功能。

安装:

pip3 install SQLALchemy

 数据库:

21,SQLachemy基础

在这些URL 中,hostname 表示MySQL 服务所在的主机,可以是本地主机(localhost),
也可以是远程服务器。数据库服务器上可以托管多个数据库,因此database 表示要使用的
数据库名。如果数据库需要进行认证,username 和password 表示数据库用户密令

程序使用的数据库URL 必须保存到Flask 配置对象的SQLALCHEMY_DATABASE_URI 键中。配
置对象中还有一个很有用的选项,即SQLALCHEMY_COMMIT_ON_TEARDOWN 键,将其设为True
时,每次请求结束后都会自动提交数据库中的变动

from flask.ext.sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] =
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
# db 对象是SQLAlchemy 类的实例,表示程序使用的数据库,同时还获得了Flask-SQLAlchemy
提供的所有功能。

21,SQLachemy基础

21,SQLachemy基础

class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
def __repr__(self):
return '<User %r>' % self.username
示例

一、内部处理

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
  
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )
  
# 新插入行自增ID
# cur.lastrowid
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )
  
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )
  
# 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

查询 删除和插入类似 都需要先实例一个 sqlalchemy.sql.dml 对象

create_engine() 会返回一个数据库引擎,echo 参数为 True 时,会显示每条执行的 SQL 语句,生产环境下可关闭。
sessionmaker() 会生成一个数据库会话类。这个类的实例可以当成一个数据库连接,它同时还记录了一些查询的数据,并决定什么时候执行 SQL 语句。由于 SQLAlchemy 自己维护了一个数据库连接池(默认 5 个连接)

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('mysql://root:admin123@127.0.0.1:3306/sqlachemy_test?charset=utf8',echo=True)
db = MetaData(engine) # 绑定元信息
DB_Session = sessionmaker(bind=engine)
session = DB_Session()

sql_create_test = Table('sqlachemy_create_test', db,
      Column('id', Integer, primary_key=True),
      Column('name', String(40)),
      Column('email', String(120))
      )
# 创建表
# sql_create_test.create()

# 拿到一个句柄
create_i = sql_create_test.insert()
print create_i
# 插入
create_i.execute(name='zk', email='yyyxxx@16.com')
# 批量插入
create_i.execute({'name': 'ghost'}, {'name': 'test'})

 二、ORM功能使用

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。

根据类创建对象,对象转换成SQL,执行SQL。

  1、创建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))
 
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )
 
 
# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
 
 
class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
 
 
# 多对多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = 'server'
 
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
 
 
def init_db():  # 创建表
    Base.metadata.create_all(engine)
 
 
def drop_db():  # 删除表
    Base.metadata.drop_all(engine)


# drop_db()
# init_db()
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

DB_CONNECT_STRING = 'mysql://root:admin123@127.0.0.1:3306/sqlachemy_test?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
db = DB_Session()
BaseModel = declarative_base()

# primary_key主键
# nullable=False不允许为空

class RHEvEnvModel(BaseModel):
    """
    环境维护模块
    """
    __tablename__ = 'inf_kvm_env'
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)       # kvm名称
    user = Column(String(20), nullable=False)       # kvm用户
    password = Column(String(20), nullable=False)   # kvm密码
    endpoint = Column(String(20), nullable=False)   # kvm地址
    dashboard = Column(String(100), nullable=False) # 管理界面地址
    created = Column(DateTime, nullable=False)      # 创建时间
    removed = Column(DateTime)                      # 删除时间
    zone_id = Column(Integer, ForeignKey('inf_zone.id'))    # zone_id
    desc = Column(String(255))                      # kvm描述信息
    type = Column(String(20))                       # KVM类型

  2、操作表

obj = Users(name='alex1', exeven='sd')
session.add(obj)
session.add_all([
    Users(name='alex2', exeven='sd'),
    Users(name='alex3', exeven='sd'),
])
session.commit()

session.query(users.id).filter(Users.id > 2).delete()
session.commit()  

session.query(Users).filter(Users.id > 2).update({"name" : "999"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"},
synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

其他

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()
 
 
# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()
 
# 限制
ret = session.query(Users)[1:2]
 
# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
 
# 分组
from sqlalchemy.sql import func
 
ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()
 
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
 
# 连表
 
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
 
ret = session.query(Person).join(Favor).all()
 
ret = session.query(Person).join(Favor, isouter=True).all()
 
 
# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
 
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

其他2

21,SQLachemy基础

21,SQLachemy基础

from sqlalchemy import func, or_, not_


user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
session.commit()

query = session.query(User)
print query # 显示SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
    print user.name
print query.all() # 返回的是一个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None
# print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).first().name
print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(User.name)
print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()

print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in

query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()

print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回数
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()

query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name

user.name = 'd'
session.flush() # 写数据库,但并不提交
print query.get(1).name

session.delete(user)
session.flush()
print query.get(1)

session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

三、单表与多表

常用的SQLAlchemy关系选项

21,SQLachemy基础

21,SQLachemy基础

1、一对多

除了一对多之外,还有几种其他的关系类型。一对一关系可以用前面介绍的一对多关系
表示,但调用db.relationship() 时要把uselist 设为False,把“多”变成“一”。多对
一关系也可使用一对多表示,对调两个表即可,或者把外键和db.relationship() 都放在
“多”这一侧。最复杂的关系类型是多对多,需要用到第三张表,这个表称为关系表

# !/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/kong')  # 连接已存在的数据库
 
Base = declarative_base()  # 根据SQL创建一个基类
 
class Son(Base):
    __tablename__ = 'son'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
 
    father_id = Column(Integer, ForeignKey('father.id'))  # 外键(外键放在哪个类下,哪个就是多)
 
class Father(Base):
    __tablename__ = 'father'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
 
    son = relationship('Son')  # 是取与son关联的数据(通过父关联子找)
    # son = relationship('Son', backfe="father")  # backfe="father"是(“backfe”是关键字通过子关联父找)
 
Base.metadata.create_all(engine)  # 创建所有的表
# Base.metadata.drop_all(engine)  # 删除表
 
Session = sessionmaker(bind=engine)
session = Session()
f1 = Father(name='alvin', age=50)
# session.commit()
w1 = Son(name='little alvin1', age=4)
w2 = Son(name='little alvin2', age=5)
w3 = Son(name='little alvin3', age=5)
f1.son = [w1, w2, w3]
 
 
session.add_all([f1, w1, w2])
session.commit()

 关联查询(relationship)

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root@127.0.0.1:3306/kong')
 
Base = declarative_base()
 
 
class Son(Base):
    __tablename__ = 'son'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age= Column(String(16))
 
    father_id=Column(Integer,ForeignKey('father.id'))  # 外键关系,关联两张表的关系(下面的关联查询)
 
 
class Father(Base):
    __tablename__ ='father'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age= Column(String(16))
     
    son=relationship('Son',backref='father')  # 相当于在father类下写father=relationship('father')和在son类下写son=relationship('son')一样的效果
                                              # 通过儿子关联并找父亲的信息;通过父亲关联并找儿子的信息(这就是relationship的关系)
 
Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
 
# ret=session.query(Father.name.label('kkk'),Son.name.label('ppp')).join(Son)  # (关联查询)关联儿子并拿出所有的符合条件的数据
# print(ret)                                # Son.name.label('ppp')) 是给son.name起一个名字;label是关键字
 
#f1=session.query(Father).filter_by(id=1).first()  # 查询父亲的信息
# print(f1.son)
# s1=session.query(Son).filter_by(id=2).first()  # # 查询儿子的信息;filter_by是键值对形式的查询;filter是条件的形式查询
# print(s1.father.name,s1.name)
 
f1=session.query(Father).filter_by(id=1).first()  # 不加first这类的索引只能得到sql语句不能得到具体的数据。
w4=Son(name='little alvin4',age=5)  # 创建一条数据(这就是relationship内部帮实现的)
f1.son.append(w4)  # 插入这一条信息
 
 
session.add(f1)
session.commit()

2、多对多 

# !/usr/bin/env python
# -*- coding:utf-8 -*-
 
 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/kong?charset=utf8')  # 连接已存在的数据库; 插入汉子就要添加编码解析?charset=utf8
 
Base = declarative_base()  # 根据SQL创建ORM的基类
 
class Men_to_Wemon(Base):
    __tablename__ = 'men_to_wemon'
    nid = Column(Integer, primary_key=True)
    id = Column(Integer, primary_key=True)
    men_id = Column(Integer, ForeignKey('men.id'))
    women_id = Column(Integer, ForeignKey('women.id'))
 
class Men(Base):
    __tablename__ = 'men'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
    # gf = relationship("Women", secondary=Men_to_Wemon.__table__)  # 可以在下面的backref='gf'替代,表示关联;
                                                                    # secondary如果有第三张表会自动关联必须加__table__,
 
class Women(Base):
    __tablename__ = 'women'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    age = Column(String(16))
    bf = relationship("Men", secondary=Men_to_Wemon.__table__, backref='gf')
 
Base.metadata.create_all(engine)  # 在数据库生成表
Session = sessionmaker(bind=engine)  # 通过激活sessionmaker的__call__方法来return一个Session实例(Session类下提供了增删改查的具体方法)
session = Session()
 
# 下面是插入数据
# m1 = Men(name='alex', age=18)
# m2 = Men(name='wusir', age=18)
# w1 = Women(name='如花', age=26)
# w2 = Women(name='铁蛋', age=30)
# session.add_all([m1, m2, w1, w2])
# session.commit()  # 提交添加的数据
 
# t1 = Men_to_Wemon(men_id=1, women_id=2)  # 第三张表,让之前的两张表创建一个对应关系
 
 
 
m1 = session.query(Men).filter_by(id=2).first()  # 查询Men的信息是(列表)
w1 = session.query(Women).all()  # 查询Women的信息是(列表)
m1.gf = w1  # 让查询的信息创建关系
 
session.add_all([m1])
session.commit()
 
# 需要注意的地方:
#    1 查询时如果不加all,first等,得到的是sql语句,加上后,才是具体的结果;而all的结果是一个列表。
#    2 m1.gf是一个列表,里面存放着符合条件的对象。
#    3 filter与filter_by的区别:filter是拿键值对的参数,filter_by是拿条件判断的参数。

数据库事务:

数据库会话db.session和Flasksession 对象没有关系。数据库会话也称为事务。

数据库会话能保证数据库的一致性。提交操作使用原子方式把会话中的对象全部写入数据库。

如果在写入会话的过程中发生了错误,整个会话都会失效。如果你始终把相关改动放
在会话中提交,就能避免因部分更新导致的数据库不一致性。
数据库会话也可回滚。调用db.session.rollback() 后,添加到数据库会话中的所有对象都会还原到它们在数据库时的状态。

 补充

如何查看查询生成的原生SQL:

>>> str(User.query.filter_by(role=user_role))
'SELECT users.id AS users_id, users.username AS users_username,
users.role_id AS users_role_id FROM users WHERE :param_1 = users.role_id'

如何批量插入大批数据?
可以使用非 ORM 的方式:

session.execute(
    User.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

 如何让执行的 SQL 语句增加前缀?
使用 query 对象的 prefix_with() 方法:

session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

 如何替换一个已有主键的记录?
使用 session.merge() 方法替代 session.add(),其实就是 SELECT + UPDATE:

user = User(id=1, name='ooxx')
session.merge(user)
session.commit()

 如何使用无符号整数?
可以使用 MySQL:

from sqlalchemy.dialects.mysql import INTEGER
id = Column(INTEGER(unsigned=True), primary_key=True)

 如何指定使用 InnoDB,以及使用 UTF-8 编码?
最简单的方式就是修改数据库的默认配置。如果非要在代码里指定的话,可以这样:

class User(BaseModel):
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

 如何设置外键约束?

from random import randint
from sqlalchemy import ForeignKey


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    age = Column(Integer)


class Friendship(BaseModel):
    __tablename__ = 'friendship'

    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id'))
    user_id2 = Column(Integer, ForeignKey('user.id'))

更新之后拿到更新数据的ID

    def test_sqlachemy_select():
        obj = db.session.query(VMWareEnvModel).filter_by(name='changename')
        # <class 'sqlalchemy.orm.query.Query'>
        # .first后 <class 'app.scheduler.rhev.env.models.VMWareEnvModel'>
        obj.update({VMWareEnvModel.user: "SQLTEST", VMWareEnvModel.password: "abc123456"})
        db.session.commit()
        obj_id = obj.first().id
        print obj_id

 为什么无法删除 in 操作查询出来的记录?

session.query(User).filter(User.id.in_((1, 2, 3))).delete()

 抛出这样的异常:

sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.

但这样是没问题的:

session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()
几种常见sqlalchemy查询:

    #简单查询
    print(session.query(User).all())
    print(session.query(User.name, User.fullname).all())
    print(session.query(User, User.name).all())
    
    #带条件查询
    print(session.query(User).filter_by(name='user1').all())
    print(session.query(User).filter(User.name == "user").all())
    print(session.query(User).filter(User.name.like("user%")).all())
    
    #多条件查询
    print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
    print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())
    
    #sql过滤
    print(session.query(User).filter("id>:id").params(id=1).all())
    
    #关联查询 
    print(session.query(User, Address).filter(User.id == Address.user_id).all())
    print(session.query(User).join(User.addresses).all())
    print(session.query(User).outerjoin(User.addresses).all())
    
    #聚合查询
    print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
    print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
    
    #子查询
    stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
    print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())
    
    #exists
    print(session.query(User).filter(exists().where(Address.user_id == User.id)))
    print(session.query(User).filter(User.addresses.any()))
限制返回字段查询

person = session.query(Person.name, Person.created_at,                     
             Person.updated_at).filter_by(name="zhongwei").order_by(            
             Person.created_at).first()
记录总数查询:

from sqlalchemy import func

# count User records, without
# using a subquery.
session.query(func.count(User.id))

# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).
        group_by(User.name)

from sqlalchemy import distinct

# count distinct "name" values
session.query(func.count(distinct(User.name)))