1. Sqlalchemy 查询¶
sqlachemy的查询是非常强大,越是强大的东西越是复杂。 查询是通过 session 的 query() 实现。它可以接受任何数量的任何类和描述符的组合。
这里也只有一些常用的,需要更详细的,查看 官方文档 Query
1.1. query的 参数控制返回¶
如果跟的是模型类,那返回的就是这个这个类的实例或是列表。如:
session.query(User)如果都给了表字段,那结果是元祖列表,如:
session.query(User.name, User.fullname)如果同时给了模型类和表字段,那返回的是named tuples,如:
session.query(User, User.name)你可以 label 方法控制列返回的名称。如:
session.query(User.name.label('name_label'))你可以用aliased方法控制类返回的名称,如:
session.query(aliased(User, name='user_alias'))可以用 limit,offset来控制返回的条数。如:
session.query(User).limit(10), session.query(User).offset(1,20) # offset跟list的切片效果类似。过滤数据使用 filter_by,filter
filter_by 参数为关键字参数,这种过滤功能有限 如:
filter_by(id=1)filter 的参数是更像python的操作符,过滤功能很强大,如:
filter(User.id.in_([1,2])``可以有多个过滤,也就是可以多个filter或是filter_by连着写。如:
session.query(User).filter(type=1).filter_by(User.id.in_([1,2,3]))
1.2. filter - 基本的操作符¶
数据过滤是通过filter来实现的,支持数据库里所有的操作符。
等于:
query.filter(User.name == 'ed')
不等于:
query.filter(User.name != 'ed')
like:
query.filter(User.name.like('%ed'))
in:
query.filter(User.id.in_([1,2,3]))
not in:
query.filter(User.id.notin_([1,2,3]))
is NULL:
query.filter(User.name == None) query.filter(User.name.is_(None))
is not NULL:
query.filter(User.name != None) query.filter(User.name.isnot(None))
and:
from sqlalchemy import and_ query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) query.filter(User.name == 'ed', User.fullname == 'Ed Jones') query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
or:
from sqlalchemy import or_ query.filter(or_(User.name == 'ed', User.name == 'wendy'))
match or contains:
query.filter(User.name.match('wendy'))
1.3. order_by - 排序¶
很简单的排序:
query.filter(User.name.match('wendy')).order_by(User.id) query.filter(User.name.match('wendy')).order_by('id desc')
1.4. group_by - 分组¶
Hint
SQL 的 group by 语句用于结合合计函数,根据一个或多个列对结果集进行分组。
多和统计函数一起使用,如 count(计数),sum(求和),avg(平均)
下面统计每个user_id 有多少个地址:
from sqlalchemy import func query(Address.user_id, func.count('*')).group_by(Address.user_id)
having 过滤统计数据,必须和
goup_by一起使用,下面返回了user 地址大于1的user:from sqlalchemy import func query(Address.user_id, func.count('*')).group_by(Address.user_id).having(func.count('*'))
1.5. text - 直接写sql¶
在text里写sql语句,并在
filter和order_by中使用。看了下面几个例子就知道了:from sqlalchemy import text session.query(User).filter(text("id<224")).order_by(text("id")).all()
text里可以用
:name传动态参数,并params传值,如:session.query(User).filter(text("id<:value and name=:name")). \ params(value=224, name='fred').order_by(User.id).one()
text里也可以给完整的sql语句,然后传给
from_statement如下面这样匹配所有的列:session.query(User).from_statement(text("SELECT * FROM user where name=:name")). \ params(name='ed').all()
如果用from_statement中不是给的所有字段,那可用 columns 将值赋给字段,如:
stmt = text("SELECT name, id, fullname, password FROM users where name=:name") stmt = stmt.columns(User.name, User.id, User.fullname, User.password) session.query(User).from_statement(stmt).params(name='ed').all()
1.6. JOIN or OUTER JOIN - 更精简,效率更高¶
多张表联合查询的时候,可以这样写:
session.query(User, Address).filter(User.id==Address.user_id).\ filter(Address.email_address=='jack@google.com').\ all()但是用
join则更好
有外键关联:
session.query(User).join(Address).\ filter(Address.email_address=='jack@google.com').all()没有外键,则需要手动添加 join 关系:
session.query(User).join(Address,User.id==Address.user_id).\ filter(Address.email_address=='jack@google.com').all()
1.7. Aliases - 别名¶
别名可以在这样的情况下使用:
from sqlalchemy.orm import aliased adalias1 = aliased(Address) # 定义别名 adalias2 = aliased(Address) # 定义别名 for username, email1, email2 in \ session.query(User.name, adalias1.email_address, adalias2.email_address).\ join(adalias1, User.addresses).\ join(adalias2, User.addresses).\ filter(adalias1.email_address=='jack@google.com').\ filter(adalias2.email_address=='j25@yahoo.com'): print(username, email1, email2)
1.8. Subqueries - 子查询¶
要实现下面的sql:
SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN (SELECT user_id, count(*) AS address_count FROM addresses GROUP BY user_id) AS adr_count ON users.id=adr_count.user_id就需要用到子查询了:
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').\ label('address_count')).\ group_by(Address.user_id).subquery() #定义子查询 session.query(User, stmt.c.address_count).\ outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id) # 这样使用
1.9. exists - 高效的子查询¶
Hint
EXISTS用于检查子查询是否至少会返回一行数据,该子查询实际上并不返回任何数据,而是返回值True或False
那怎么在 Sqlalchemy 写出 exists的 sql呢?
直接使用
exists()方法:from sqlalchemy.sql import exists stmt = exists().where(Address.user_id==User.id) session.query(User.name).filter(stmt)
使用
any()方法,用于 一对多/多对多 关系,可在前面加~号表示not exists:session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%')))
使用
has()方法,用于 多对一,同样可在前面加~号表示not exists:session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
使用
contains()方法,用于 一对多 关系:session.query.filter(User.addresses.contains(someaddress_object))
使用
with_parent()方法,可用于 任何关系session.query(Address).with_parent(someuser, ‘addresses’)
1.10. subqueryload - 子查询加载¶
Hint
当查询的表有关联的表时,它是关联的表的字段缓一步加载,也就是分两次查询一个query的数据,多和 first() limit() offset() order_by() 一起使用。
这对于数据量大的表来说很有用:
session.query(User).\
options(subqueryload(User.addresses)).\
filter_by(name='jack').one()
1.11. 返回结果大小控制¶
- all() 返回所有
- first() 查询并返回第一条,没有数据为空
- one() 查询所有并严格返回一条数据,如果查询到多条数据或没有数据,都会报错
- one_or_none 同 one,没有数据会返回None,不会报错,其他一样。
- scalar 同 one,但是只返回那条数据的第一个字段。