import json from orm_db import History, BlueForecastFiveAll, BlueForecastThreeAll, RedForecastAll, IndexGroup from sqlalchemy import engine from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy import select, insert, update, desc, asc, and_ # from sqlalchemy import Column, Integer, String, DATE # 初始化数据库实例 DB_URI = 'mysql+pymysql://root:Sxzgx1209@home.rogersun.cn:3306/lottery' db_engine = engine.create_engine(DB_URI, echo=False) db_session = sessionmaker(bind=db_engine)() def db_history_get_last_data(): try: get_last_data_stmt = select(History).order_by(desc('id')).limit(1) last_data_result = db_session.execute(get_last_data_stmt).scalar() return history_scalar(last_data_result) except Exception as e: print(e) return False def db_history_get_all_data(): try: get_all_data_stmt = select(History).order_by(asc(History.id)) all_data_result = db_session.execute(get_all_data_stmt).scalars() return history_scalars(all_data_result) except Exception as e: print(e) return False def db_history_get_data_by_date_id(_date_id): try: get_by_open_date_stmt = select(History).where(History.dateId == _date_id) open_date_result = db_session.execute(get_by_open_date_stmt).scalar() return history_scalar(open_date_result) except Exception as e: print(e) return False def db_history_get_data_in_ids(_ids): try: get_in_ids_stmt = select(History).where(History.id.in_(_ids)) ids_result = db_session.execute(get_in_ids_stmt).scalars() # print(history_scalars(ids_result)) return history_scalars(ids_result) except Exception as e: print(e) return False def db_blue_get_data_by_date_id(_date_id, _num): try: if _num == 3: select_stmt = select(BlueForecastThreeAll).where(BlueForecastThreeAll.dateId == _date_id) else: select_stmt = select(BlueForecastFiveAll).where(BlueForecastFiveAll.dateId == _date_id) select_result = db_session.execute(select_stmt).scalars() return blue_scalars(select_result) except Exception as e: print(e) def db_blue_insert(_data, _num): try: if _num == 5: _table = BlueForecastFiveAll elif _num == 3: _table = BlueForecastThreeAll else: print('_num值错误') blue_insert_stmt = insert(_table).values(_data) insert_result = db_session.execute(blue_insert_stmt).rowcount if insert_result > 0: db_session.commit() else: db_session.rollback() return insert_result except Exception as e: print(e) def db_blue_update(_date_id, _data, _id, _num): try: if _num == 5: _table = BlueForecastFiveAll elif _num == 3: _table = BlueForecastThreeAll else: print('_num值错误') blue_update_stmt = update(_table).where(and_(_table.dateId == _date_id, _table.id == _id)).values(_data) update_result = db_session.execute(blue_update_stmt).rowcount if update_result > 0: db_session.commit() else: db_session.rollback() return update_result except Exception as e: print(e) # def db_red_get_index(): # try: # index_stmt = select(RedForecastAll.real_red_index_group) # index_result = db_session.execute(index_stmt).scalars() # index_result_list = [] # for index in index_result: # if index is not None: # index_result_list.append(json.loads(index)) # return index_result_list # except Exception as e: # print(e) def db_red_get_data_by_date_id(_date_id): try: select_stmt = select(RedForecastAll).where(RedForecastAll.dateId == _date_id) select_result = db_session.execute(select_stmt).scalars() return red_scalars(select_result) except Exception as e: print(e) def db_red_insert(_data): try: insert_stmt = insert(RedForecastAll).values(_data) insert_result = db_session.execute(insert_stmt).rowcount if insert_result > 0: db_session.commit() else: db_session.rollback() return insert_result except Exception as e: print(e) def db_red_update(_id, _date_id, _data): try: print(_id) print(_date_id) print(_data) update_stmt = update(RedForecastAll).where( and_(RedForecastAll.dateId == _date_id, RedForecastAll.id == _id)).values(_data) update_result = db_session.execute(update_stmt).rowcount if update_result > 0: db_session.commit() else: db_session.rollback() return update_result except Exception as e: print(e) def db_index_get_index_group(_type): index_stmt = select(IndexGroup.index_group).where(IndexGroup.type == _type) index_result = db_session.execute(index_stmt) index_list = [] for index in index_result: index_list.append(json.loads(index.index_group)) return index_list def db_index_get_last_date_id(_type): index_stmt = select(IndexGroup.dateId).where(IndexGroup.type == _type).order_by(desc(IndexGroup.id)) index_result = db_session.execute(index_stmt).scalars() date_id_list = [] for row in index_result: date_id_list.append(row.date_id) return date_id_list def db_index_insert_index_group(_data): try: index_stmt = insert(IndexGroup).values(_data) index_result = db_session.execute(index_stmt).rowcount if index_result > 0: db_session.commit() else: db_session.rollback() return index_result except Exception as e: print(e) def db_history_get_last_open_id(): date_id_stmt = select(History.dateId).order_by(desc(History.id)).limit(2) date_id_result = db_session.execute(date_id_stmt).scalars() date_id_list = [date_id for date_id in date_id_result] return date_id_list def db_red_get_last_one_open_id(): date_id_stmt = select(RedForecastAll.dateId).order_by(desc(RedForecastAll.id)).limit(1) date_id_result = db_session.execute(date_id_stmt).scalar() return date_id_result def db_blue_get_last_one_open_id(): date_id_three_stmt = select(BlueForecastThreeAll.dateId).order_by(desc(BlueForecastThreeAll.id)).limit(1) date_id_three_result = db_session.execute(date_id_three_stmt).scalar() date_id_five_stmt = select(BlueForecastThreeAll.dateId).order_by(desc(BlueForecastThreeAll.id)).limit(1) date_id_five_result = db_session.execute(date_id_five_stmt).scalar() return {'three': date_id_three_result, 'five': date_id_five_result} def history_scalar(_scalar): _data = dict() _data['id'] = _scalar.id _data['dateId'] = _scalar.dateId _data['openDate'] = _scalar.openDate _data['red'] = _scalar.red _data['blue'] = _scalar.blue return _data def history_scalars(_scalars): _list = [] for row in _scalars: _data = dict() _data['id'] = row.id _data['dateId'] = row.dateId _data['openDate'] = row.openDate _data['red'] = row.red _data['blue'] = row.blue _list.append(_data) return _list def red_scalars(_scalars): _list = [] for row in _scalars: _data = dict() _data['id'] = row.id _data['dateId'] = row.dateId _data['history'] = row.history _data['history_random'] = row.history_random _data['history_random_index'] = row.history_random_index _data['history_random_index_group'] = row.history_random_index_group _data['last'] = row.last _data['last_random'] = row.last_random _data['last_random_index'] = row.last_random_index _data['last_random_index_group'] = row.last_random_index_group _data['real_red'] = row.real_red _data['real_red_index_history'] = row.real_red_index_history _data['real_red_index_group_history'] = row.real_red_index_group_history _data['real_red_index_last'] = row.real_red_index_last _data['real_red_index_group_last'] = row.real_red_index_group_last _list.append(_data) return _list def blue_scalars(_scalars): _list = [] for row in _scalars: _data = dict() _data['id'] = row.id _data['dateId'] = row.dateId _data['real_blue'] = row.real_blue _data['prv'] = row.prv _data['prv_index'] = row.prv_index _data['post'] = row.post _data['post_index'] = row.post_index _data['history'] = row.history _data['history_index'] = row.history_index _list.append(_data) return _list