from sqlalchemy import create_engine, text, insert, update, select, delete, engine_from_config, or_, and_, func, any_ from sqlalchemy.orm import Session, sessionmaker import configparser import sys sys.path.append(r'..') from sqlalchemy_14.db_schema import User, Student # 创建引擎 HOST = "home.rogersun.cn" USER = "root" PWD = "Sxzgx1209" PORT = "3306" DB = "orm_sqlalchemy" URI = f"mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DB}?charset=utf8mb4" engine = create_engine( URI, echo=True, # echo = 'debug' future=True, # 使用2.0接口 pool_size=5, # 连接池设置,0表示无限制 max_overflow=0, # 设置溢出 pool_timeout=30, # pool_recycle=3600, # 设置时间以限制多久自动断开 pool_logging_name='test_pool', # pool_pre_ping=True, isolation_level="AUTOCOMMIT", query_cache_size=200, ) # 从配置文件获取数据库链接 # config = configparser.ConfigParser() # config.read_file(open('config.ini', 'r')) # print(config.items('db_source')) # engine = engine_from_config(dict(config.items('db_source')), pool_size=1, max_overflow=0, pool_timeout=0.1) # 执行纯sql # 无参数 # stmt = text("SELECT * FROM user") # with engine.connect() as conn: # result = conn.execute(stmt).all() # print(result) # 有参数 # with engine.connect() as conn: # stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)") # params = [{'name': 'test1', 'password': 'test1pwd'}, {'name': 'test2', 'password': 'test2pwd'}] # result = conn.execute(stmt, params) # print(result) # conn.commit() # 绑定参数 todo 不成功 # with engine.connect() as conn: # stmt = text("INSERT INTO user (name, password) VALUES (:name, :password)").bindparams( # {'name': 'test3', 'password': 'test3pwd'}) # result = conn.execute(stmt) # print(result) # conn.commit() # 获取结果 # stmt = text("SELECT * FROM user") # with engine.connect() as conn: # result = conn.execute(stmt) # 解包赋值 # for _id, name, password in result: # print(f"name: {name}, password: {password}") # 列指定的名称 # for row in result: # name, password = row.name, row.password # print(f"name: {name}, password: {password}") # 绑定模型 # result: [User] = result.fetchall() # for row in result: # name = row.name # password = row.password # print(f"name: {name}, password: {password}") # 创建Session实例 # with Session(engine) as session: # stmt = text("SELECT * FROM user") # result = session.execute(stmt) # for row in result: # print(f"name: {row.name} - password: {row.password}") # 2.0 增删改查 db_session = sessionmaker(bind=engine)() # insert 方式1 # insert_stmt = insert(User).values({'name': 'sql2_test1', 'password': 'pwd1'}) # with db_session as sess: # result = sess.execute(insert_stmt) # print(result) # sess.commit() # insert 方式2 insert_stmt2 = insert(User) insert_data2 = {'name': 'sql2_test2', 'password': 'pwd2'} # with db_session as sess: # result = sess.execute(insert_stmt2, insert_data2) # print(result) # sess.commit() # insert 批量插入 # insert_data3 = [{'name': 'sql2_test3', 'password': 'pwd3'}, {'name': 'sql2_test4', 'password': 'pwd4'}] # with db_session as sess: # result = sess.execute(insert_stmt2, insert_data3) # sess.commit() # insert begin方式 # insert_data4 = [{'name': 'sql2_test5', 'password': 'pwd5'}, {'name': 'sql2_test6', 'password': 'pwd6'}] # with db_session.begin(): # db_session.execute(insert_stmt2, insert_data4) # db_session.commit() # # insert 对像方式 # insert_obj1 = User(name='sql2_test7', password='pwd7') # with db_session as sess: # sess.add(insert_obj1) # sess.commit() # insert 对象方式 批量 # insert_obj2 = User(name='sql2_test8', password='pwd8') # with db_session as sess: # # sess.bulk_save_objects([insert_obj2, insert_obj3]) # sess.add_all([insert_obj2, insert_obj3]) # sess.commit() # add # with db_session as sess: # user_obj = User(name='add', password='addpwd123') # sess.add(user_obj) # sess.flush() # 刷新数据库表 # print(user_obj.id) # 获取数据对应id # sess.commit() # 查 # get 通过id # result = db_session.get(User, 4) # print(result) # sqlalchemy1.4: session.query(User).all() # query_cursor = db_session.execute(select(User)) # print(query_cursor.all()) # 数据指针,当all取过数据后one将得到None # print(query_cursor.first()) # for row in query_cursor.scalars(): # print(row) # row.name格式可以取到数据 # sqlalchemy1.4: session.query(User).filter_by(name='roger').all() # sql_stmt = select(User).filter_by(name='roger') # query_result = db_session.execute(sql_stmt).one() # print(query_result) # print(query_result[0].name) # sqlalchemy1.4: session.query(User).filter(User.id == 2).all() # sql_stmt = select(User).where(User.id == 2) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id >= 2).first() # sql_stmt = select(User).where(User.id >= 2) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id >= 2).order_by(User.id.desc()).all() # sql_stmt = select(User).where(User.id >= 2).order_by(User.id.desc()) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id >= 2).filter(User.name.like('%one%')).all() # sql_stmt = select(User).where(User.id >= 2).where(User.name.like('%one%')) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id.in_([1, 2, 3])).all() # sql_stmt = select(User).where(User.id.in_([1, 2, 3])) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('he')))).all() # 连写 # sql_stmt = select(User).where(User.id.in_( # db_session.execute(select(User.id).where(User.name.startswith('he'))).scalars())) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # 拆分 # sql_stmt1 = select(User.id).where(User.name.startswith('he')) # query_result1 = db_session.execute(sql_stmt1).scalars() # sql_stmt2 = select(User).where(User.id.in_(query_result1)) # query_result2 = db_session.execute(sql_stmt2).scalars() # for row in query_result2: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.id.notin_([1, 2])).all() # sql_stmt = select(User).where(User.id.notin_([1, 2])) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.password is None).all() # sqlalchemy1.4: session.query(User).filter(User.password.is_(None)).all() # sql_stmt = select(User).where(User.password.is_(None)) # 此处不支持 is None # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(User.password is not None).all() # sqlalchemy1.4: session.query(User).filter(User.password.isnot(None)).all() # sql_stmt = select(User).where(User.password.isnot(None)) # isnot() # sql_stmt = select(User).where(User.password.is_not(None)) # is_not() # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(or_(User.id > 2, User.name.ilike('Rog%'))).all() # sql_stmt = select(User).where(or_(User.id > 2, User.name.ilike('ROG%'))) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User).filter(and_(User.id >= 2, User.name.endswith('ei'))) # sql_stmt = select(User).where(and_(User.id >= 2, User.name.endswith('ei'))) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(f"{row.id} - {row.name} - {row.password}") # sqlalchemy1.4: session.query(User.name.label('user_name'), User.password).filter(User.id >= 2).all() # sql_stmt = select(User.name.label('user_name_label'), User.password.label('user_pwd_label')).where(User.id >= 2) # query_result = db_session.execute(sql_stmt).all() # 返回 [('honeyhoney', '2222'), ('haha', '123321'), ('hehe', '3333')] # query_result = db_session.execute(sql_stmt).scalars() # 返回 scalars 对象, 仅包含name # query_result = db_session.execute(sql_stmt) # 返回 {'user_name_label': 'roger', 'user_pwd_label': 'rogerpwd123'} 建议此情况 # print(query_result) # for row in query_result: # print(row) # for row in query_result.mappings(): # print(row['user_name_label'], row['user_pwd_label']) # sqlalchemy1.4: session.query(User).filter(User.id > 1).count() # sql_stmt = select(func.count()).select_from(User).where(User.id <= 2) # query_result = db_session.execute(sql_stmt).scalar() # print(query_result) # sqlalchemy1.4: # try: # result = session.query(User).filter(User.id > 1).one() # one 当仅查询到一个结果时返回,否则抛出异常 # except Exception as e: # print(e) # try: # sql_stmt = select(User).where(User.id > 2) # query_result = db_session.execute(sql_stmt).one() # except Exception as e: # print(e) # sqlalchemy1.4: session.query(User).filter(User.id == 1).one_or_none() # try: # # sql_stmt = select(User).where(User.id >= 1) # 多结果 # # sql_stmt = select(User).where(User.id == 1) # 一个结果 # sql_stmt = select(User).where(User.id == 100) # 无结果 # query_result = db_session.execute(sql_stmt).one_or_none() # print(query_result) # except Exception as e: # print(e) # sqlalchemy1.4: session.query(User).filter(User.id == 1).scalar() # sql_stmt = select(User).where(User.id >= 1) # query_result = db_session.execute(sql_stmt).scalar() # print(query_result.name) # sqlalchemy1.4: session.query(User).filter(User.id > 1).group_by(User.password) # sql_stnt = select(User.password).where(User.id > 1).group_by(User.password) # query_result = db_session.execute(sql_stnt).scalars() # for row in query_result: # print(row) # sqlalchemy1.4: session.query(func.max(User.id)).all() # sql_stmt = select(func.max(User.id)) # query_result = db_session.execute(sql_stmt).scalars() # for row in query_result: # print(row) # func # func.now() # func.current_timestamp() # func.current_date() # func.current_time() # func.min() # func.localtime() # func.localtimestamp() # func.random() # func.sum() # select all scalars # result = db_session.execute(select(User)).scalars().all() # [<1 name:rogersun>, <2 name:honeyhoney>, <3 name:haha>] # print(result) # for row in result: # print(f"{row.name} - {row.password}") # select all # result = db_session.execute(select(User)).all() # [(<1 name:rogersun>,), (<2 name:honeyhoney>,), (<3 name:haha>,)] # print(result) # for row in result: # print(f"{row[0].name} - {row[0].password}") # select filter_by # result = db_session.execute(select(User).filter_by(name='roger')).fetchone() # print(result[0].name) # select filter # result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() # print(result) # print(r[0][0].name) # select from_statement # result = db_session.execute(select(User).from_statement(text('SELECT * FROM user'))).scalars().all() # for row in result: # print(row.name) # 遍历数据 # 元组赋值 all # result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)).all() # print(result) # for _id, name, password in result: # print(f"{_id} - {name} - {password}") # # 索引 all # result = db_session.execute(select(User).where(User.id <= 4)).all() # print(result) # for row in result: # _id, name, password = row[0].id, row[0].name, row[0].password # print(f"{_id} - {name} - {password}") # # 属性 scalers # result = db_session.execute(select(User).where(User.id <= 4)).scalars().all() # print(result) # for row in result: # _id, name, password = row.id, row.name, row.password # print(f"{_id} - {name} - {password}") # # 映射 # result = db_session.execute(select(User.id, User.name, User.password).where(User.id <= 4)) # print(result) # for dict_row in result.mappings(): # _id, name, password = dict_row['id'], dict_row['name'], dict_row['password'] # print(f"{_id} - {name} - {password}") # update # r = db_session.execute(update(User).filter_by(name='roger').values(password='rogerpwd123').execution_options( # synchronize_session='evaluate')) # print(r) # db_session.commit() # synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate # false 表示完全不更新 Python 中的对象 # fetch 表示重新从数据库中加载一份对象 # evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作