You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
515 lines
18 KiB
515 lines
18 KiB
from sqlalchemy import create_engine, text, insert, update, select, delete, intersect, engine_from_config, or_, and_, \ |
|
func, any_ |
|
from sqlalchemy import MetaData, Table, Column, Integer, String, Enum, DATE |
|
from sqlalchemy.orm import Session, sessionmaker |
|
import configparser |
|
import sys |
|
|
|
sys.path.append(r'..') |
|
from sqlalchemy_14.db_schema import User, Student |
|
|
|
# 创建引擎 |
|
HOST = "home.rogersun.cn" |
|
USER = "root" |
|
PWD = "Sxzgx1209" |
|
PORT = "3306" |
|
DB = "orm_sqlalchemy" |
|
|
|
URI = f"mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DB}?charset=utf8mb4" |
|
|
|
engine = create_engine( |
|
URI, |
|
echo=True, # echo = 'debug' |
|
future=True, # 使用2.0接口 |
|
pool_size=5, # 连接池设置,0表示无限制 |
|
max_overflow=0, # 设置溢出 |
|
pool_timeout=30, # |
|
pool_recycle=3600, # 设置时间以限制多久自动断开 |
|
pool_logging_name='test_pool', # |
|
pool_pre_ping=True, |
|
isolation_level="AUTOCOMMIT", |
|
query_cache_size=200, |
|
) |
|
|
|
# 从配置文件获取数据库链接 |
|
# config = configparser.ConfigParser() |
|
# config.read_file(open('config.ini', 'r')) |
|
# print(config.items('db_source')) |
|
# engine = engine_from_config(dict(config.items('db_source')), pool_size=1, max_overflow=0, pool_timeout=0.1) |
|
|
|
# 执行纯sql |
|
# 无参数 |
|
# stmt = text("SELECT * FROM user") |
|
# with engine.connect() as conn: |
|
# result = conn.execute(stmt).all() |
|
# print(result) |
|
|
|
# 有参数 |
|
# with engine.connect() as conn: |
|
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)") |
|
# params = [{'name': 'test1', 'password': 'test1pwd'}, {'name': 'test2', 'password': 'test2pwd'}] |
|
# result = conn.execute(stmt, params) |
|
# print(result) |
|
# conn.commit() |
|
|
|
# 绑定参数 todo 不成功 |
|
# with engine.connect() as conn: |
|
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)").bindparams( |
|
# {'name': 'test3', 'password': 'test3pwd'}) |
|
# result = conn.execute(stmt) |
|
# print(result) |
|
# conn.commit() |
|
|
|
# 获取结果 |
|
# stmt = text("SELECT * FROM user") |
|
# with engine.connect() as conn: |
|
# result = conn.execute(stmt) |
|
# 解包赋值 |
|
# for _id, name, password in result: |
|
# print(f"name: {name}, password: {password}") |
|
# 列指定的名称 |
|
# for row in result: |
|
# name, password = row.name, row.password |
|
# print(f"name: {name}, password: {password}") |
|
# 绑定模型 |
|
# result: [User] = result.fetchall() |
|
# for row in result: |
|
# name = row.name |
|
# password = row.password |
|
# print(f"name: {name}, password: {password}") |
|
|
|
# 创建Session实例 |
|
# with Session(engine) as session: |
|
# stmt = text("SELECT * FROM user") |
|
# result = session.execute(stmt) |
|
# for row in result: |
|
# print(f"name: {row.name} - password: {row.password}") |
|
|
|
# 2.0 增删改查 |
|
db_session = sessionmaker(bind=engine)() |
|
# insert 方式1 |
|
# insert_stmt = insert(User).values({'name': 'sql2_test1', 'password': 'pwd1'}) |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt) |
|
# print(result) |
|
# sess.commit() |
|
|
|
# insert 方式2 |
|
insert_stmt2 = insert(User) |
|
insert_data2 = {'name': 'sql2_test2', 'password': 'pwd2'} |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt2, insert_data2) |
|
# print(result) |
|
# sess.commit() |
|
|
|
# insert 批量插入 |
|
# insert_data3 = [{'name': 'sql2_test3', 'password': 'pwd3'}, {'name': 'sql2_test4', 'password': 'pwd4'}] |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt2, insert_data3) |
|
# sess.commit() |
|
|
|
# insert begin方式 |
|
# insert_data4 = [{'name': 'sql2_test5', 'password': 'pwd5'}, {'name': 'sql2_test6', 'password': 'pwd6'}] |
|
# with db_session.begin(): |
|
# db_session.execute(insert_stmt2, insert_data4) |
|
# db_session.commit() |
|
# |
|
# insert 对像方式 |
|
# insert_obj1 = User(name='sql2_test7', password='pwd7') |
|
# with db_session as sess: |
|
# sess.add(insert_obj1) |
|
# sess.commit() |
|
|
|
# insert 对象方式 批量 |
|
# insert_obj2 = User(name='sql2_test8', password='pwd8') |
|
# with db_session as sess: |
|
# # sess.bulk_save_objects([insert_obj2, insert_obj3]) # 推荐 |
|
# sess.add_all([insert_obj2, insert_obj3]) |
|
# sess.commit() |
|
|
|
# insert 字典方式 批量 |
|
# insert_dict = [{'name': 'dict1', 'password': 'dict1pwd1'}, {'name': 'dict2', 'password': 'dict2pwd2'}] |
|
# with db_session as sess: |
|
# sess.bulk_insert_mappings(User, insert_dict) |
|
# sess.commit() |
|
|
|
|
|
# add |
|
# with db_session as sess: |
|
# user_obj = User(name='add', password='addpwd123') |
|
# sess.add(user_obj) |
|
# sess.flush() # 刷新数据库表 |
|
# print(user_obj.id) # 获取数据对应id |
|
# sess.commit() |
|
|
|
# 查 |
|
|
|
# get 通过id |
|
# result = db_session.get(User, 4) |
|
# print(result) |
|
|
|
# sqlalchemy1.4: session.query(User).all() |
|
# query_cursor = db_session.execute(select(User)) |
|
# print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None |
|
# print(query_cursor.first()) |
|
# for row in query_cursor.scalars(): |
|
# print(row) # row.name格式可以取到数据 |
|
|
|
# limit |
|
# query_cursor = db_session.execute(select(User).limit(10)) |
|
# print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None |
|
# for row in query_cursor.scalars(): |
|
# print(row) # row.name格式可以取到数据 |
|
|
|
# distinct |
|
|
|
|
|
# sqlalchemy1.4: session.query(User).filter_by(name='roger').all() |
|
# sql_stmt = select(User).filter_by(name='roger') |
|
# query_result = db_session.execute(sql_stmt).one() |
|
# print(query_result) |
|
# print(query_result[0].name) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id == 2).all() |
|
# sql_stmt = select(User).where(User.id == 2) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).first() |
|
# sql_stmt = select(User).where(User.id >= 2) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).order_by(User.id.desc()).all() |
|
# sql_stmt = select(User).where(User.id >= 2).order_by(User.id.desc()) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).filter(User.name.like('%one%')).all() |
|
# sql_stmt = select(User).where(User.id >= 2).where(User.name.like('%one%')) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.in_([1, 2, 3])).all() |
|
# sql_stmt = select(User).where(User.id.in_([1, 2, 3])) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('he')))).all() |
|
# 连写 |
|
# sql_stmt = select(User).where(User.id.in_( |
|
# db_session.execute(select(User.id).where(User.name.startswith('he'))).scalars())) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
# 拆分 |
|
# sql_stmt1 = select(User.id).where(User.name.startswith('he')) |
|
# query_result1 = db_session.execute(sql_stmt1).scalars() |
|
# sql_stmt2 = select(User).where(User.id.in_(query_result1)) |
|
# query_result2 = db_session.execute(sql_stmt2).scalars() |
|
# for row in query_result2: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.notin_([1, 2])).all() |
|
# sql_stmt = select(User).where(User.id.notin_([1, 2])) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.password is None).all() |
|
# sqlalchemy1.4: session.query(User).filter(User.password.is_(None)).all() |
|
# sql_stmt = select(User).where(User.password.is_(None)) # 此处不支持 is None |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.password is not None).all() |
|
# sqlalchemy1.4: session.query(User).filter(User.password.isnot(None)).all() |
|
# sql_stmt = select(User).where(User.password.isnot(None)) # isnot() |
|
# sql_stmt = select(User).where(User.password.is_not(None)) # is_not() |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(or_(User.id > 2, User.name.ilike('Rog%'))).all() |
|
# sql_stmt = select(User).where(or_(User.id > 2, User.name.ilike('ROG%'))) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(and_(User.id >= 2, User.name.endswith('ei'))) |
|
# sql_stmt = select(User).where(and_(User.id >= 2, User.name.endswith('ei'))) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User.name.label('user_name'), User.password).filter(User.id >= 2).all() |
|
# sql_stmt = select(User.name.label('user_name_label'), User.password.label('user_pwd_label')).where(User.id >= 2) |
|
# query_result = db_session.execute(sql_stmt).all() # 返回 [('honeyhoney', '2222'), ('haha', '123321'), ('hehe', '3333')] |
|
# query_result = db_session.execute(sql_stmt).scalars() # 返回 scalars 对象, 仅包含name |
|
# query_result = db_session.execute(sql_stmt) # 返回 {'user_name_label': 'roger', 'user_pwd_label': 'rogerpwd123'} 建议此情况 |
|
# print(query_result) |
|
# for row in query_result: |
|
# print(row) |
|
# for row in query_result.mappings(): |
|
# print(row['user_name_label'], row['user_pwd_label']) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id > 1).count() |
|
# sql_stmt = select(func.count()).select_from(User).where(User.id <= 2) |
|
# query_result = db_session.execute(sql_stmt).scalar() |
|
# print(query_result) |
|
|
|
# sqlalchemy1.4: |
|
# try: |
|
# result = session.query(User).filter(User.id > 1).one() # one 当仅查询到一个结果时返回,否则抛出异常 |
|
# except Exception as e: |
|
# print(e) |
|
# try: |
|
# sql_stmt = select(User).where(User.id > 2) |
|
# query_result = db_session.execute(sql_stmt).one() |
|
# except Exception as e: |
|
# print(e) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id == 1).one_or_none() |
|
# try: |
|
# # sql_stmt = select(User).where(User.id >= 1) # 多结果 |
|
# # sql_stmt = select(User).where(User.id == 1) # 一个结果 |
|
# sql_stmt = select(User).where(User.id == 100) # 无结果 |
|
# query_result = db_session.execute(sql_stmt).one_or_none() |
|
# print(query_result) |
|
# except Exception as e: |
|
# print(e) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id == 1).scalar() |
|
# sql_stmt = select(User).where(User.id >= 1) |
|
# query_result = db_session.execute(sql_stmt).scalar() |
|
# print(query_result.name) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id > 1).group_by(User.password) |
|
# sql_stnt = select(User.password).where(User.id > 1).group_by(User.password) |
|
# query_result = db_session.execute(sql_stnt).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
|
|
# sqlalchemy1.4: session.query(func.max(User.id)).all() |
|
# sql_stmt = select(func.max(User.id)) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
|
|
# func |
|
# func.now() |
|
# func.current_timestamp() |
|
# func.current_date() |
|
# func.current_time() |
|
# func.min() |
|
# func.localtime() |
|
# func.localtimestamp() |
|
# func.random() |
|
# func.sum() |
|
|
|
|
|
# sqlalchemy1.4 join filter形式 |
|
# result = session.query(User, Student).filter(User.name == Student.name).all() |
|
|
|
# sql_stmt = select(User, Student).where(User.name == Student.name) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
# print(f'{row.id}-{row.name}-{row.password}') |
|
|
|
# sqlalchemy1.4 join形式 无外键关联的情况 |
|
# result = session.query(User).join(Student, User.name == Student.name).all() |
|
|
|
# sql_stmt = select(User).join(Student, User.name == Student.name).where(User.name.like('roger%')) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
# print(f'{row.id}-{row.name}-{row.password}') |
|
|
|
# join有外键关联的情况(Todo) |
|
|
|
# left_join outerjoin (Todo) |
|
|
|
# union |
|
# sqlalchemy1.4 union |
|
# q1 = session.query(User.name).filter(User.id > 1) |
|
# q2 = session.query(Student.name).filter(Student.id <= 2) |
|
# result = q1.union(q2).all() |
|
|
|
# sql_stmt1 = select(User.name).where(User.id <= 2) |
|
# sql_stmt2 = select(Student.name).where(Student.id <= 4) |
|
# query_result = db_session.execute(sql_stmt1.union(sql_stmt2)).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
|
|
# sqlalchemy1.4 union_all |
|
# q1 = session.query(User.id, User.name) |
|
# q2 = session.query(Student.id.label('id'), Student.name.label('name')) |
|
# result = q1.union_all(q2).filter(User.id <= 2) |
|
# print(dict(result)) |
|
|
|
# sql_stmt1 = select(User.name).where(User.id <= 2) |
|
# sql_stmt2 = select(Student.name).where(Student.id <= 4) |
|
# query_result = db_session.execute(sql_stmt1.union_all(sql_stmt2)).scalars() |
|
# for row in query_result: |
|
# print(row) |
|
|
|
|
|
# select all scalars |
|
# result = db_session.execute(select(User)).scalars().all() # [<1 name:rogersun>, <2 name:honeyhoney>, <3 name:haha>] |
|
# print(result) |
|
# for row in result: |
|
# print(f"{row.name} - {row.password}") |
|
|
|
# select all |
|
# result = db_session.execute(select(User)).all() # [(<1 name:rogersun>,), (<2 name:honeyhoney>,), (<3 name:haha>,)] |
|
# print(result) |
|
# for row in result: |
|
# print(f"{row[0].name} - {row[0].password}") |
|
|
|
# select filter_by |
|
# result = db_session.execute(select(User).filter_by(name='roger')).fetchone() |
|
# print(result[0].name) |
|
|
|
# select filter |
|
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() |
|
# print(result) |
|
# print(r[0][0].name) |
|
|
|
# select from_statement |
|
# result = db_session.execute(select(User).from_statement(text('SELECT * FROM user'))).scalars().all() |
|
# for row in result: |
|
# print(row.name) |
|
|
|
# 遍历数据 |
|
|
|
# 元组赋值 all |
|
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)).all() |
|
# print(result) |
|
# for _id, name, password in result: |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 索引 all |
|
# result = db_session.execute(select(User).where(User.id <= 4)).all() |
|
# print(result) |
|
# for row in result: |
|
# _id, name, password = row[0].id, row[0].name, row[0].password |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 属性 scalers |
|
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() |
|
# print(result) |
|
# for row in result: |
|
# _id, name, password = row.id, row.name, row.password |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 映射 |
|
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)) |
|
# print(result) |
|
# for dict_row in result.mappings(): |
|
# _id, name, password = dict_row['id'], dict_row['name'], dict_row['password'] |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# update |
|
# r = db_session.execute(update(User).filter_by(name='roger').values(password='rogerpwd123').execution_options( |
|
# synchronize_session='evaluate')) |
|
# print(r) |
|
# db_session.commit() |
|
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate |
|
# false 表示完全不更新 Python 中的对象 |
|
# fetch 表示重新从数据库中加载一份对象 |
|
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作 |
|
|
|
# delete |
|
# 插入测试数据 |
|
# insert_stmt = insert(User).values([{'name': 'add', 'password': 'pwd1234'}, {'name': 'add', 'password': 'pwd1234'}]) |
|
# insert_result = db_session.execute(insert_stmt) |
|
# db_session.flush() |
|
# db_session.commit() |
|
# print(type(insert_result)) |
|
# print(insert_result.rowcount) |
|
# print(insert_result.is_insert) |
|
# print(insert_result.inserted_primary_key_rows) # 2.x上返回None 这个bug会在未来版本修复 |
|
|
|
# inserted_primary_key_rows 可用方式 |
|
# conn = engine.connect() |
|
# ins = insert(User).values({'name': 'add', 'password': 'pwd1234'}) |
|
# res = conn.execute(ins) |
|
# print(type(res)) |
|
# print(res.inserted_primary_key_rows) |
|
|
|
# 抛异常 不能使用endswith 或 like这种 只能用 == |
|
# del_stmt = delete(User).where(and_(User.name == 'add', User.password == 'pwd1234')) |
|
# del_result = db_session.execute(del_stmt) |
|
# db_session.commit() |
|
# print(type(del_result)) |
|
# print(del_result.rowcount) |
|
|
|
|
|
# 结合select的删除 |
|
# sel_stmt = select(User.id).where(and_(User.name == 'add', User.password.endswith('1234'))) |
|
# sel_result = db_session.execute(sel_stmt).first() |
|
# for row in sel_result: |
|
# print(row) |
|
# sql_stmt = delete(User).where(User.id.in_(sel_result)) |
|
# query_result = db_session.execute(sql_stmt) |
|
# db_session.commit() |
|
# print(query_result) |
|
|
|
# CursorResult |
|
# all(), close(), columns(), fetchall(), fetchmany(), fetchone(), first(), freeze(), inserted_primary_key, |
|
# inserted_primary_key_rows, is_insert, keys(), last_inserted_params(), last_updated_params(), lastrow_has_defaults(), |
|
# lastrowid, mappings(), merge(), one(), one_or_none(), partitions(), postfetch_cols(), prefetch_cols(), |
|
# returned_defaults, returned_defaults_rows, returns_rows, rowcount, scalar(), scalar_one(), scalar_one_or_none(), |
|
# scalars(), splice_horizontally(), splice_vertically(), supports_sane_multi_rowcount(), supports_sane_rowcount(), |
|
# t, tuples(), unique(), yield_per() |
|
|
|
|
|
# connection |
|
# connection = engine.connect() |
|
# trans = connection.begin() |
|
# try: |
|
# ins1 = insert(User).values({'name': 'add1', 'password': 'pwd1234'}) |
|
# ins2 = insert(User).values({'name': 'add2', 'password': 'pwd1234'}) |
|
# connection.execute(ins1) |
|
# connection.execute(ins2) |
|
# trans.commit() |
|
# except Exception as e: |
|
# trans.rollback() |
|
# raise e |
|
|
|
# 元数据表操作 |
|
|
|
metadata = MetaData() # 生成metadata |
|
|
|
user = Table( |
|
'user', |
|
metadata, |
|
Column('id', Integer, primary_key=True), |
|
Column('name', String(32)), |
|
Column('password', String(64)) |
|
) |
|
|
|
student = Table( |
|
'student', |
|
metadata, |
|
Column('id', Integer, primary_key=True), |
|
Column('name', String(32), nullable=False), |
|
Column('gender', Enum('m', 'f'), nullable=False), |
|
Column('date', DATE, nullable=False) |
|
) |
|
|
|
|
|
conn = engine.connect() |
|
# 插入 |
|
# ins = user.insert().values({'name': 'metadata_add1', 'password': 'pwd1234'}) |
|
ins = insert(user).values({'name': 'metadata_add2', 'password': 'pwd1234'}) |
|
print(str(ins)) |
|
ins_result = conn.execute(ins) |
|
print(ins_result.inserted_primary_key) |