You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
325 lines
12 KiB
325 lines
12 KiB
from sqlalchemy import create_engine, text, insert, update, select, delete, engine_from_config, or_, and_, func |
|
from sqlalchemy.orm import Session, sessionmaker |
|
import configparser |
|
import sys |
|
|
|
sys.path.append(r'..') |
|
from sqlalchemy_14.db_schema import User, Student |
|
|
|
# 创建引擎 |
|
HOST = "home.rogersun.cn" |
|
USER = "root" |
|
PWD = "Sxzgx1209" |
|
PORT = "3306" |
|
DB = "orm_sqlalchemy" |
|
|
|
URI = f"mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DB}?charset=utf8mb4" |
|
|
|
engine = create_engine( |
|
URI, |
|
echo=True, # echo = 'debug' |
|
future=True, # 使用2.0接口 |
|
pool_size=5, # 连接池设置,0表示无限制 |
|
max_overflow=0, # 设置溢出 |
|
pool_timeout=30, # |
|
pool_recycle=3600, # 设置时间以限制多久自动断开 |
|
pool_logging_name='test_pool', # |
|
pool_pre_ping=True, |
|
isolation_level="AUTOCOMMIT", |
|
query_cache_size=200, |
|
) |
|
|
|
# 从配置文件获取数据库链接 |
|
# config = configparser.ConfigParser() |
|
# config.read_file(open('config.ini', 'r')) |
|
# print(config.items('db_source')) |
|
# engine = engine_from_config(dict(config.items('db_source')), pool_size=1, max_overflow=0, pool_timeout=0.1) |
|
|
|
# 执行纯sql |
|
# 无参数 |
|
# stmt = text("SELECT * FROM user") |
|
# with engine.connect() as conn: |
|
# result = conn.execute(stmt).all() |
|
# print(result) |
|
|
|
# 有参数 |
|
# with engine.connect() as conn: |
|
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)") |
|
# params = [{'name': 'test1', 'password': 'test1pwd'}, {'name': 'test2', 'password': 'test2pwd'}] |
|
# result = conn.execute(stmt, params) |
|
# print(result) |
|
# conn.commit() |
|
|
|
# 绑定参数 todo 不成功 |
|
# with engine.connect() as conn: |
|
# stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)").bindparams( |
|
# {'name': 'test3', 'password': 'test3pwd'}) |
|
# result = conn.execute(stmt) |
|
# print(result) |
|
# conn.commit() |
|
|
|
# 获取结果 |
|
# stmt = text("SELECT * FROM user") |
|
# with engine.connect() as conn: |
|
# result = conn.execute(stmt) |
|
# 解包赋值 |
|
# for _id, name, password in result: |
|
# print(f"name: {name}, password: {password}") |
|
# 列指定的名称 |
|
# for row in result: |
|
# name, password = row.name, row.password |
|
# print(f"name: {name}, password: {password}") |
|
# 绑定模型 |
|
# result: [User] = result.fetchall() |
|
# for row in result: |
|
# name = row.name |
|
# password = row.password |
|
# print(f"name: {name}, password: {password}") |
|
|
|
# 创建Session实例 |
|
# with Session(engine) as session: |
|
# stmt = text("SELECT * FROM user") |
|
# result = session.execute(stmt) |
|
# for row in result: |
|
# print(f"name: {row.name} - password: {row.password}") |
|
|
|
# 2.0 增删改查 |
|
db_session = sessionmaker(bind=engine)() |
|
# insert 方式1 |
|
# insert_stmt = insert(User).values({'name': 'sql2_test1', 'password': 'pwd1'}) |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt) |
|
# print(result) |
|
# sess.commit() |
|
|
|
# insert 方式2 |
|
insert_stmt2 = insert(User) |
|
insert_data2 = {'name': 'sql2_test2', 'password': 'pwd2'} |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt2, insert_data2) |
|
# print(result) |
|
# sess.commit() |
|
|
|
# insert 批量插入 |
|
# insert_data3 = [{'name': 'sql2_test3', 'password': 'pwd3'}, {'name': 'sql2_test4', 'password': 'pwd4'}] |
|
# with db_session as sess: |
|
# result = sess.execute(insert_stmt2, insert_data3) |
|
# sess.commit() |
|
|
|
# insert begin方式 |
|
# insert_data4 = [{'name': 'sql2_test5', 'password': 'pwd5'}, {'name': 'sql2_test6', 'password': 'pwd6'}] |
|
# with db_session.begin(): |
|
# db_session.execute(insert_stmt2, insert_data4) |
|
# db_session.commit() |
|
# |
|
# insert 对像方式 |
|
# insert_obj1 = User(name='sql2_test7', password='pwd7') |
|
# with db_session as sess: |
|
# sess.add(insert_obj1) |
|
# sess.commit() |
|
|
|
# insert 对象方式 批量 |
|
# insert_obj2 = User(name='sql2_test8', password='pwd8') |
|
# with db_session as sess: |
|
# # sess.bulk_save_objects([insert_obj2, insert_obj3]) |
|
# sess.add_all([insert_obj2, insert_obj3]) |
|
# sess.commit() |
|
|
|
# add |
|
# with db_session as sess: |
|
# user_obj = User(name='add', password='addpwd123') |
|
# sess.add(user_obj) |
|
# sess.flush() # 刷新数据库表 |
|
# print(user_obj.id) # 获取数据对应id |
|
# sess.commit() |
|
|
|
# 查 |
|
|
|
# get 通过id |
|
# result = db_session.get(User, 4) |
|
# print(result) |
|
|
|
# sqlalchemy1.4: session.query(User).all() |
|
# query_cursor = db_session.execute(select(User)) |
|
# print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None |
|
# print(query_cursor.first()) |
|
# for row in query_cursor.scalars(): |
|
# print(row) # row.name格式可以取到数据 |
|
|
|
# sqlalchemy1.4: session.query(User).filter_by(name='roger').all() |
|
# sql_stmt = select(User).filter_by(name='roger') |
|
# query_result = db_session.execute(sql_stmt).one() |
|
# print(query_result) |
|
# print(query_result[0].name) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id == 2).all() |
|
# sql_stmt = select(User).where(User.id == 2) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).first() |
|
# sql_stmt = select(User).where(User.id >= 2) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).order_by(User.id.desc()).all() |
|
# sql_stmt = select(User).where(User.id >= 2).order_by(User.id.desc()) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id >= 2).filter(User.name.like('%one%')).all() |
|
# sql_stmt = select(User).where(User.id >= 2).where(User.name.like('%one%')) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.in_([1, 2, 3])).all() |
|
# sql_stmt = select(User).where(User.id.in_([1, 2, 3])) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('he')))).all() |
|
# 连写 |
|
# sql_stmt = select(User).where(User.id.in_( |
|
# db_session.execute(select(User.id).where(User.name.startswith('he'))).scalars())) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
# 拆分 |
|
# sql_stmt1 = select(User.id).where(User.name.startswith('he')) |
|
# query_result1 = db_session.execute(sql_stmt1).scalars() |
|
# sql_stmt2 = select(User).where(User.id.in_(query_result1)) |
|
# query_result2 = db_session.execute(sql_stmt2).scalars() |
|
# for row in query_result2: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id.notin_([1, 2])).all() |
|
# sql_stmt = select(User).where(User.id.notin_([1, 2])) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.password is None).all() |
|
# sqlalchemy1.4: session.query(User).filter(User.password.is_(None)).all() |
|
# sql_stmt = select(User).where(User.password.is_(None)) # 此处不支持 is None |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.password is not None).all() |
|
# sqlalchemy1.4: session.query(User).filter(User.password.isnot(None)).all() |
|
# sql_stmt = select(User).where(User.password.isnot(None)) # isnot() |
|
# sql_stmt = select(User).where(User.password.is_not(None)) # is_not() |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(or_(User.id > 2, User.name.ilike('Rog%'))).all() |
|
# sql_stmt = select(User).where(or_(User.id > 2, User.name.ilike('ROG%'))) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User).filter(and_(User.id >= 2, User.name.endswith('ei'))) |
|
# sql_stmt = select(User).where(and_(User.id >= 2, User.name.endswith('ei'))) |
|
# query_result = db_session.execute(sql_stmt).scalars() |
|
# for row in query_result: |
|
# print(f"{row.id} - {row.name} - {row.password}") |
|
|
|
# sqlalchemy1.4: session.query(User.name.label('user_name'), User.password).filter(User.id >= 2).all() |
|
# sql_stmt = select(User.name.label('user_name_label'), User.password.label('user_pwd_label')).where(User.id >= 2) |
|
# query_result = db_session.execute(sql_stmt).all() # 返回 [('honeyhoney', '2222'), ('haha', '123321'), ('hehe', '3333')] |
|
# query_result = db_session.execute(sql_stmt).scalars() # 返回 scalars 对象, 仅包含name |
|
# query_result = db_session.execute(sql_stmt) # 返回 {'user_name_label': 'roger', 'user_pwd_label': 'rogerpwd123'} 建议此情况 |
|
# print(query_result) |
|
# for row in query_result: |
|
# print(row) |
|
# for row in query_result.mappings(): |
|
# print(row['user_name_label'], row['user_pwd_label']) |
|
|
|
# sqlalchemy1.4: session.query(User).filter(User.id > 1).count() |
|
# sql_stmt = select(func.count()).select_from(User).where(User.id <= 2) |
|
# query_result = db_session.execute(sql_stmt).scalar() |
|
# print(query_result) |
|
|
|
|
|
# try: |
|
# result = session.query(User).filter(User.id > 1).one() # one 当仅查询到一个结果时返回,否则抛出异常 |
|
# except Exception as e: |
|
# print(e) |
|
# result = session.query(User).filter(User.id == 1).one_or_none() # one_or_none 当查询结果仅一条时显示返回结果,当没有结果时返回None,否则抛出异常 |
|
# result = session.query(User).filter(User.id == 1).scalar() # 调用one方法,并在成功时返回行的第一列 |
|
# result = session.query(User).filter(User.id > 1).group_by(User.password) |
|
# result = session.query(func.max(User.id)).all() # func.max |
|
# print(result[0].name, result[0].password) |
|
# print(result) # 定义 __repr__ 后可以打印 |
|
|
|
|
|
# select all scalars |
|
# result = db_session.execute(select(User)).scalars().all() # [<1 name:rogersun>, <2 name:honeyhoney>, <3 name:haha>] |
|
# print(result) |
|
# for row in result: |
|
# print(f"{row.name} - {row.password}") |
|
|
|
# select all |
|
# result = db_session.execute(select(User)).all() # [(<1 name:rogersun>,), (<2 name:honeyhoney>,), (<3 name:haha>,)] |
|
# print(result) |
|
# for row in result: |
|
# print(f"{row[0].name} - {row[0].password}") |
|
|
|
# select filter_by |
|
# result = db_session.execute(select(User).filter_by(name='roger')).fetchone() |
|
# print(result[0].name) |
|
|
|
# select filter |
|
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() |
|
# print(result) |
|
# print(r[0][0].name) |
|
|
|
# select from_statement |
|
# result = db_session.execute(select(User).from_statement(text('SELECT * FROM user'))).scalars().all() |
|
# for row in result: |
|
# print(row.name) |
|
|
|
# 遍历数据 |
|
|
|
# 元组赋值 all |
|
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)).all() |
|
# print(result) |
|
# for _id, name, password in result: |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 索引 all |
|
# result = db_session.execute(select(User).where(User.id <= 4)).all() |
|
# print(result) |
|
# for row in result: |
|
# _id, name, password = row[0].id, row[0].name, row[0].password |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 属性 scalers |
|
# result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() |
|
# print(result) |
|
# for row in result: |
|
# _id, name, password = row.id, row.name, row.password |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# # 映射 |
|
# result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)) |
|
# print(result) |
|
# for dict_row in result.mappings(): |
|
# _id, name, password = dict_row['id'], dict_row['name'], dict_row['password'] |
|
# print(f"{_id} - {name} - {password}") |
|
|
|
# update |
|
# r = db_session.execute(update(User).filter_by(name='roger').values(password='rogerpwd123').execution_options( |
|
# synchronize_session='evaluate')) |
|
# print(r) |
|
# db_session.commit() |
|
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate |
|
# false 表示完全不更新 Python 中的对象 |
|
# fetch 表示重新从数据库中加载一份对象 |
|
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
|
|
|