orm_sqlalchemy
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

515 lines
18 KiB

from sqlalchemy import create_engine, text, insert, update, select, delete, intersect, engine_from_config, or_, and_, \
func, any_
from sqlalchemy import MetaData, Table, Column, Integer, String, Enum, DATE
from sqlalchemy.orm import Session, sessionmaker
import configparser
import sys
sys.path.append(r'..')
from sqlalchemy_14.db_schema import User, Student
# 创建引擎
HOST = "home.rogersun.cn"
USER = "root"
PWD = "Sxzgx1209"
PORT = "3306"
DB = "orm_sqlalchemy"
URI = f"mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DB}?charset=utf8mb4"
engine = create_engine(
URI,
echo=True, # echo = 'debug'
future=True, # 使用2.0接口
pool_size=5, # 连接池设置,0表示无限制
max_overflow=0, # 设置溢出
pool_timeout=30, #
pool_recycle=3600, # 设置时间以限制多久自动断开
pool_logging_name='test_pool', #
pool_pre_ping=True,
isolation_level="AUTOCOMMIT",
query_cache_size=200,
)
# 从配置文件获取数据库链接
# config = configparser.ConfigParser()
# config.read_file(open('config.ini', 'r'))
# print(config.items('db_source'))
# engine = engine_from_config(dict(config.items('db_source')), pool_size=1, max_overflow=0, pool_timeout=0.1)
# 执行纯sql
# 无参数
# stmt = text("SELECT * FROM user")
# with engine.connect() as conn:
# result = conn.execute(stmt).all()
# print(result)
# 有参数
# with engine.connect() as conn:
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)")
# params = [{'name': 'test1', 'password': 'test1pwd'}, {'name': 'test2', 'password': 'test2pwd'}]
# result = conn.execute(stmt, params)
# print(result)
# conn.commit()
# 绑定参数 todo 不成功
# with engine.connect() as conn:
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)").bindparams(
# {'name': 'test3', 'password': 'test3pwd'})
# result = conn.execute(stmt)
# print(result)
# conn.commit()
# 获取结果
# stmt = text("SELECT * FROM user")
# with engine.connect() as conn:
# result = conn.execute(stmt)
# 解包赋值
# for _id, name, password in result:
# print(f"name: {name}, password: {password}")
# 列指定的名称
# for row in result:
# name, password = row.name, row.password
# print(f"name: {name}, password: {password}")
# 绑定模型
# result: [User] = result.fetchall()
# for row in result:
# name = row.name
# password = row.password
# print(f"name: {name}, password: {password}")
# 创建Session实例
# with Session(engine) as session:
# stmt = text("SELECT * FROM user")
# result = session.execute(stmt)
# for row in result:
# print(f"name: {row.name} - password: {row.password}")
# 2.0 增删改查
db_session = sessionmaker(bind=engine)()
# insert 方式1
# insert_stmt = insert(User).values({'name': 'sql2_test1', 'password': 'pwd1'})
# with db_session as sess:
# result = sess.execute(insert_stmt)
# print(result)
# sess.commit()
# insert 方式2
insert_stmt2 = insert(User)
insert_data2 = {'name': 'sql2_test2', 'password': 'pwd2'}
# with db_session as sess:
# result = sess.execute(insert_stmt2, insert_data2)
# print(result)
# sess.commit()
# insert 批量插入
# insert_data3 = [{'name': 'sql2_test3', 'password': 'pwd3'}, {'name': 'sql2_test4', 'password': 'pwd4'}]
# with db_session as sess:
# result = sess.execute(insert_stmt2, insert_data3)
# sess.commit()
# insert begin方式
# insert_data4 = [{'name': 'sql2_test5', 'password': 'pwd5'}, {'name': 'sql2_test6', 'password': 'pwd6'}]
# with db_session.begin():
# db_session.execute(insert_stmt2, insert_data4)
# db_session.commit()
#
# insert 对像方式
# insert_obj1 = User(name='sql2_test7', password='pwd7')
# with db_session as sess:
# sess.add(insert_obj1)
# sess.commit()
# insert 对象方式 批量
# insert_obj2 = User(name='sql2_test8', password='pwd8')
# with db_session as sess:
# # sess.bulk_save_objects([insert_obj2, insert_obj3]) # 推荐
# sess.add_all([insert_obj2, insert_obj3])
# sess.commit()
# insert 字典方式 批量
# insert_dict = [{'name': 'dict1', 'password': 'dict1pwd1'}, {'name': 'dict2', 'password': 'dict2pwd2'}]
# with db_session as sess:
# sess.bulk_insert_mappings(User, insert_dict)
# sess.commit()
# add
# with db_session as sess:
# user_obj = User(name='add', password='addpwd123')
# sess.add(user_obj)
# sess.flush() # 刷新数据库表
# print(user_obj.id) # 获取数据对应id
# sess.commit()
# 查
# get 通过id
# result = db_session.get(User, 4)
# print(result)
# sqlalchemy1.4: session.query(User).all()
# query_cursor = db_session.execute(select(User))
# print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None
# print(query_cursor.first())
# for row in query_cursor.scalars():
# print(row) # row.name格式可以取到数据
# limit
# query_cursor = db_session.execute(select(User).limit(10))
# print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None
# for row in query_cursor.scalars():
# print(row) # row.name格式可以取到数据
# distinct
# sqlalchemy1.4: session.query(User).filter_by(name='roger').all()
# sql_stmt = select(User).filter_by(name='roger')
# query_result = db_session.execute(sql_stmt).one()
# print(query_result)
# print(query_result[0].name)
# sqlalchemy1.4: session.query(User).filter(User.id == 2).all()
# sql_stmt = select(User).where(User.id == 2)
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).first()
# sql_stmt = select(User).where(User.id >= 2)
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).order_by(User.id.desc()).all()
# sql_stmt = select(User).where(User.id >= 2).order_by(User.id.desc())
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).filter(User.name.like('%one%')).all()
# sql_stmt = select(User).where(User.id >= 2).where(User.name.like('%one%'))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id.in_([1, 2, 3])).all()
# sql_stmt = select(User).where(User.id.in_([1, 2, 3]))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('he')))).all()
# 连写
# sql_stmt = select(User).where(User.id.in_(
# db_session.execute(select(User.id).where(User.name.startswith('he'))).scalars()))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# 拆分
# sql_stmt1 = select(User.id).where(User.name.startswith('he'))
# query_result1 = db_session.execute(sql_stmt1).scalars()
# sql_stmt2 = select(User).where(User.id.in_(query_result1))
# query_result2 = db_session.execute(sql_stmt2).scalars()
# for row in query_result2:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.id.notin_([1, 2])).all()
# sql_stmt = select(User).where(User.id.notin_([1, 2]))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.password is None).all()
# sqlalchemy1.4: session.query(User).filter(User.password.is_(None)).all()
# sql_stmt = select(User).where(User.password.is_(None)) # 此处不支持 is None
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(User.password is not None).all()
# sqlalchemy1.4: session.query(User).filter(User.password.isnot(None)).all()
# sql_stmt = select(User).where(User.password.isnot(None)) # isnot()
# sql_stmt = select(User).where(User.password.is_not(None)) # is_not()
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(or_(User.id > 2, User.name.ilike('Rog%'))).all()
# sql_stmt = select(User).where(or_(User.id > 2, User.name.ilike('ROG%')))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User).filter(and_(User.id >= 2, User.name.endswith('ei')))
# sql_stmt = select(User).where(and_(User.id >= 2, User.name.endswith('ei')))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(f"{row.id} - {row.name} - {row.password}")
# sqlalchemy1.4: session.query(User.name.label('user_name'), User.password).filter(User.id >= 2).all()
# sql_stmt = select(User.name.label('user_name_label'), User.password.label('user_pwd_label')).where(User.id >= 2)
# query_result = db_session.execute(sql_stmt).all() # 返回 [('honeyhoney', '2222'), ('haha', '123321'), ('hehe', '3333')]
# query_result = db_session.execute(sql_stmt).scalars() # 返回 scalars 对象, 仅包含name
# query_result = db_session.execute(sql_stmt) # 返回 {'user_name_label': 'roger', 'user_pwd_label': 'rogerpwd123'} 建议此情况
# print(query_result)
# for row in query_result:
# print(row)
# for row in query_result.mappings():
# print(row['user_name_label'], row['user_pwd_label'])
# sqlalchemy1.4: session.query(User).filter(User.id > 1).count()
# sql_stmt = select(func.count()).select_from(User).where(User.id <= 2)
# query_result = db_session.execute(sql_stmt).scalar()
# print(query_result)
# sqlalchemy1.4:
# try:
# result = session.query(User).filter(User.id > 1).one() # one 当仅查询到一个结果时返回,否则抛出异常
# except Exception as e:
# print(e)
# try:
# sql_stmt = select(User).where(User.id > 2)
# query_result = db_session.execute(sql_stmt).one()
# except Exception as e:
# print(e)
# sqlalchemy1.4: session.query(User).filter(User.id == 1).one_or_none()
# try:
# # sql_stmt = select(User).where(User.id >= 1) # 多结果
# # sql_stmt = select(User).where(User.id == 1) # 一个结果
# sql_stmt = select(User).where(User.id == 100) # 无结果
# query_result = db_session.execute(sql_stmt).one_or_none()
# print(query_result)
# except Exception as e:
# print(e)
# sqlalchemy1.4: session.query(User).filter(User.id == 1).scalar()
# sql_stmt = select(User).where(User.id >= 1)
# query_result = db_session.execute(sql_stmt).scalar()
# print(query_result.name)
# sqlalchemy1.4: session.query(User).filter(User.id > 1).group_by(User.password)
# sql_stnt = select(User.password).where(User.id > 1).group_by(User.password)
# query_result = db_session.execute(sql_stnt).scalars()
# for row in query_result:
# print(row)
# sqlalchemy1.4: session.query(func.max(User.id)).all()
# sql_stmt = select(func.max(User.id))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(row)
# func
# func.now()
# func.current_timestamp()
# func.current_date()
# func.current_time()
# func.min()
# func.localtime()
# func.localtimestamp()
# func.random()
# func.sum()
# sqlalchemy1.4 join filter形式
# result = session.query(User, Student).filter(User.name == Student.name).all()
# sql_stmt = select(User, Student).where(User.name == Student.name)
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(row)
# print(f'{row.id}-{row.name}-{row.password}')
# sqlalchemy1.4 join形式 无外键关联的情况
# result = session.query(User).join(Student, User.name == Student.name).all()
# sql_stmt = select(User).join(Student, User.name == Student.name).where(User.name.like('roger%'))
# query_result = db_session.execute(sql_stmt).scalars()
# for row in query_result:
# print(row)
# print(f'{row.id}-{row.name}-{row.password}')
# join有外键关联的情况(Todo)
# left_join outerjoin (Todo)
# union
# sqlalchemy1.4 union
# q1 = session.query(User.name).filter(User.id > 1)
# q2 = session.query(Student.name).filter(Student.id <= 2)
# result = q1.union(q2).all()
# sql_stmt1 = select(User.name).where(User.id <= 2)
# sql_stmt2 = select(Student.name).where(Student.id <= 4)
# query_result = db_session.execute(sql_stmt1.union(sql_stmt2)).scalars()
# for row in query_result:
# print(row)
# sqlalchemy1.4 union_all
# q1 = session.query(User.id, User.name)
# q2 = session.query(Student.id.label('id'), Student.name.label('name'))
# result = q1.union_all(q2).filter(User.id <= 2)
# print(dict(result))
# sql_stmt1 = select(User.name).where(User.id <= 2)
# sql_stmt2 = select(Student.name).where(Student.id <= 4)
# query_result = db_session.execute(sql_stmt1.union_all(sql_stmt2)).scalars()
# for row in query_result:
# print(row)
# select all scalars
# result = db_session.execute(select(User)).scalars().all() # [<1 name:rogersun>, <2 name:honeyhoney>, <3 name:haha>]
# print(result)
# for row in result:
# print(f"{row.name} - {row.password}")
# select all
# result = db_session.execute(select(User)).all() # [(<1 name:rogersun>,), (<2 name:honeyhoney>,), (<3 name:haha>,)]
# print(result)
# for row in result:
# print(f"{row[0].name} - {row[0].password}")
# select filter_by
# result = db_session.execute(select(User).filter_by(name='roger')).fetchone()
# print(result[0].name)
# select filter
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all()
# print(result)
# print(r[0][0].name)
# select from_statement
# result = db_session.execute(select(User).from_statement(text('SELECT * FROM user'))).scalars().all()
# for row in result:
# print(row.name)
# 遍历数据
# 元组赋值 all
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)).all()
# print(result)
# for _id, name, password in result:
# print(f"{_id} - {name} - {password}")
# # 索引 all
# result = db_session.execute(select(User).where(User.id <= 4)).all()
# print(result)
# for row in result:
# _id, name, password = row[0].id, row[0].name, row[0].password
# print(f"{_id} - {name} - {password}")
# # 属性 scalers
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all()
# print(result)
# for row in result:
# _id, name, password = row.id, row.name, row.password
# print(f"{_id} - {name} - {password}")
# # 映射
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4))
# print(result)
# for dict_row in result.mappings():
# _id, name, password = dict_row['id'], dict_row['name'], dict_row['password']
# print(f"{_id} - {name} - {password}")
# update
# r = db_session.execute(update(User).filter_by(name='roger').values(password='rogerpwd123').execution_options(
# synchronize_session='evaluate'))
# print(r)
# db_session.commit()
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
# delete
# 插入测试数据
# insert_stmt = insert(User).values([{'name': 'add', 'password': 'pwd1234'}, {'name': 'add', 'password': 'pwd1234'}])
# insert_result = db_session.execute(insert_stmt)
# db_session.flush()
# db_session.commit()
# print(type(insert_result))
# print(insert_result.rowcount)
# print(insert_result.is_insert)
# print(insert_result.inserted_primary_key_rows) # 2.x上返回None 这个bug会在未来版本修复
# inserted_primary_key_rows 可用方式
# conn = engine.connect()
# ins = insert(User).values({'name': 'add', 'password': 'pwd1234'})
# res = conn.execute(ins)
# print(type(res))
# print(res.inserted_primary_key_rows)
# 抛异常 不能使用endswith 或 like这种 只能用 ==
# del_stmt = delete(User).where(and_(User.name == 'add', User.password == 'pwd1234'))
# del_result = db_session.execute(del_stmt)
# db_session.commit()
# print(type(del_result))
# print(del_result.rowcount)
# 结合select的删除
# sel_stmt = select(User.id).where(and_(User.name == 'add', User.password.endswith('1234')))
# sel_result = db_session.execute(sel_stmt).first()
# for row in sel_result:
# print(row)
# sql_stmt = delete(User).where(User.id.in_(sel_result))
# query_result = db_session.execute(sql_stmt)
# db_session.commit()
# print(query_result)
# CursorResult
# all(), close(), columns(), fetchall(), fetchmany(), fetchone(), first(), freeze(), inserted_primary_key,
# inserted_primary_key_rows, is_insert, keys(), last_inserted_params(), last_updated_params(), lastrow_has_defaults(),
# lastrowid, mappings(), merge(), one(), one_or_none(), partitions(), postfetch_cols(), prefetch_cols(),
# returned_defaults, returned_defaults_rows, returns_rows, rowcount, scalar(), scalar_one(), scalar_one_or_none(),
# scalars(), splice_horizontally(), splice_vertically(), supports_sane_multi_rowcount(), supports_sane_rowcount(),
# t, tuples(), unique(), yield_per()
# connection
# connection = engine.connect()
# trans = connection.begin()
# try:
# ins1 = insert(User).values({'name': 'add1', 'password': 'pwd1234'})
# ins2 = insert(User).values({'name': 'add2', 'password': 'pwd1234'})
# connection.execute(ins1)
# connection.execute(ins2)
# trans.commit()
# except Exception as e:
# trans.rollback()
# raise e
# 元数据表操作
metadata = MetaData() # 生成metadata
user = Table(
'user',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(32)),
Column('password', String(64))
)
student = Table(
'student',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(32), nullable=False),
Column('gender', Enum('m', 'f'), nullable=False),
Column('date', DATE, nullable=False)
)
conn = engine.connect()
# 插入
# ins = user.insert().values({'name': 'metadata_add1', 'password': 'pwd1234'})
ins = insert(user).values({'name': 'metadata_add2', 'password': 'pwd1234'})
print(str(ins))
ins_result = conn.execute(ins)
print(ins_result.inserted_primary_key)