from sqlalchemy import create_engine, text, insert, update, select, delete, engine_from_config from sqlalchemy.orm import Session, sessionmaker import configparser import sys sys.path.append(r'..') from sqlalchemy_14.db_schema import User, Student # 创建引擎 HOST = "home.rogersun.cn" USER = "root" PWD = "Sxzgx1209" PORT = "3306" DB = "orm_sqlalchemy" URI = f"mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DB}?charset=utf8" engine = create_engine( URI, echo=True, # echo = 'debug' future=True, # 使用2.0接口 pool_size=5, # 连接池设置,0表示无限制 max_overflow=0, # 设置溢出 pool_timeout=30, # pool_recycle=3600, # 设置时间以限制多久自动断开 pool_logging_name='test_pool', # pool_pre_ping=True, isolation_level="AUTOCOMMIT", query_cache_size=200, ) # 从配置文件获取数据库链接 # config = configparser.ConfigParser() # config.read_file(open('config.ini', 'r')) # print(config.items('db_source')) # engine = engine_from_config(dict(config.items('db_source')), pool_size=1, max_overflow=0, pool_timeout=0.1) # 执行纯sql # 无参数 # stmt = text("SELECT * FROM user") # with engine.connect() as conn: # result = conn.execute(stmt).all() # print(result) # 有参数 # with engine.connect() as conn: # stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)") # params = [{'name': 'test1', 'password': 'test1pwd'}, {'name': 'test2', 'password': 'test2pwd'}] # result = conn.execute(stmt, params) # print(result) # conn.commit() # 绑定参数 todo 不成功 # with engine.connect() as conn: # stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)").bindparams( # {'name': 'test3', 'password': 'test3pwd'}) # result = conn.execute(stmt) # print(result) # conn.commit() # 获取结果 stmt = text("SELECT * FROM user") with engine.connect() as conn: result = conn.execute(stmt) # 解包赋值 # for _id, name, password in result: # print(f"name: {name}, password: {password}") # 列指定的名称 # for row in result: # name, password = row.name, row.password # print(f"name: {name}, password: {password}") # 绑定模型 # result: [User] = result.fetchall() # for row in result: # name = row.name # password = row.password # print(f"name: {name}, password: {password}") # 创建Session实例 # with Session(engine) as session: # stmt = text("SELECT * FROM user") # result = session.execute(stmt) # for row in result: # print(f"name: {row.name} - password: {row.password}") # 2.0 增删改查 db_session = sessionmaker(bind=engine)() # insert 方式1 # insert_stmt = insert(User).values({'name': 'sql2_test1', 'password': 'pwd1'}) # with db_session as sess: # result = sess.execute(insert_stmt) # print(result) # sess.commit() # insert 方式2 insert_stmt2 = insert(User) insert_data2 = {'name': 'sql2_test2', 'password': 'pwd2'} # with db_session as sess: # result = sess.execute(insert_stmt2, insert_data2) # print(result) # sess.commit() # insert 批量插入 # insert_data3 = [{'name': 'sql2_test3', 'password': 'pwd3'}, {'name': 'sql2_test4', 'password': 'pwd4'}] # with db_session as sess: # result = sess.execute(insert_stmt2, insert_data3) # sess.commit() # insert begin方式 # insert_data4 = [{'name': 'sql2_test5', 'password': 'pwd5'}, {'name': 'sql2_test6', 'password': 'pwd6'}] # with db_session.begin(): # db_session.execute(insert_stmt2, insert_data4) # db_session.commit() # # insert 对像方式 # insert_obj1 = User(name='sql2_test7', password='pwd7') # with db_session as sess: # sess.add(insert_obj1) # sess.commit() # insert 对象方式 批量 # insert_obj2 = User(name='sql2_test8', password='pwd8') # insert_obj3 = User(name='sql2_test9', password='pwd9') # with db_session as sess: # # sess.bulk_save_objects([insert_obj2, insert_obj3]) # sess.add_all([insert_obj2, insert_obj3]) # sess.commit() # 查 # get 通过id # result = db_session.get(User, 4) # print(result) # select all scalars # result = db_session.execute(select(User)).scalars().all() # [<1 name:rogersun>, <2 name:honeyhoney>, <3 name:haha>] # print(result) # for row in result: # print(f"{row.name} - {row.password}") # select all # result = db_session.execute(select(User)).all() # [(<1 name:rogersun>,), (<2 name:honeyhoney>,), (<3 name:haha>,)] # print(result) # for row in result: # print(f"{row[0].name} - {row[0].password}") # select filter_by # result = db_session.execute(select(User).filter_by(name='roger')).fetchone() # print(result[0].name) # select filter # result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() # print(result) # print(r[0][0].name) # select from_statement # result = db_session.execute(select(User).from_statement(text('SELECT * FROM user'))).scalars().all() # for row in result: # print(row.name) # 遍历数据 # 元组赋值 all # result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)).all() # print(result) # for _id, name, password in result: # print(f"{_id} - {name} - {password}") # # 索引 all # result = db_session.execute(select(User).where(User.id <= 4)).all() # print(result) # for row in result: # _id, name, password = row[0].id, row[0].name, row[0].password # print(f"{_id} - {name} - {password}") # # 属性 scalers # result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() # print(result) # for row in result: # _id, name, password = row.id, row.name, row.password # print(f"{_id} - {name} - {password}") # # 映射 # result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)) # print(result) # for dict_row in result.mappings(): # _id, name, password = dict_row['id'], dict_row['name'], dict_row['password'] # print(f"{_id} - {name} - {password}") # update # r = db_session.execute(update(User).filter_by(name='roger').values(password='rogerpwd123').execution_options( # synchronize_session='evaluate')) # print(r) # db_session.commit() # synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate # false 表示完全不更新 Python 中的对象 # fetch 表示重新从数据库中加载一份对象 # evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作