用于处理彩票的大数据算法
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.5 KiB

import json
from orm_db import History, BlueForecastFiveAll, BlueForecastThreeAll, RedForecastAll
from sqlalchemy import engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import select, insert, desc, asc
# from sqlalchemy import Column, Integer, String, DATE
# 初始化数据库实例
DB_URI = 'mysql+pymysql://root:Sxzgx1209@home.rogersun.cn:3306/lottery'
db_engine = engine.create_engine(DB_URI, echo=False)
db_session = sessionmaker(bind=db_engine)()
def db_history_get_last_data():
try:
get_last_data_stmt = select(History).order_by(desc('id')).limit(1)
last_data_result = db_session.execute(get_last_data_stmt).scalar()
return history_scalar(last_data_result)
except Exception as e:
print(e)
return False
def db_history_get_all_data():
try:
get_all_data_stmt = select(History).order_by(asc(History.id))
all_data_result = db_session.execute(get_all_data_stmt).scalars()
return history_scalars(all_data_result)
except Exception as e:
print(e)
return False
def db_history_get_data_by_date_id(_date_id):
try:
get_by_open_date_stmt = select(History).where(History.dateId == _date_id)
open_date_result = db_session.execute(get_by_open_date_stmt).scalar()
return history_scalar(open_date_result)
except Exception as e:
print(e)
return False
def db_history_get_data_in_ids(_ids):
try:
get_in_ids_stmt = select(History).where(History.id.in_(_ids))
ids_result = db_session.execute(get_in_ids_stmt).scalars()
# print(history_scalars(ids_result))
return history_scalars(ids_result)
except Exception as e:
print(e)
return False
def db_blue_insert(_data, _num):
try:
if _num == 5:
_table = BlueForecastFiveAll
elif _num == 3:
_table = BlueForecastThreeAll
else:
print('_num值错误')
blue_insert_stmt = insert(_table).values(_data)
insert_result = db_session.execute(blue_insert_stmt).rowcount
if insert_result > 0:
db_session.commit()
else:
db_session.rollback()
return insert_result
except Exception as e:
print(e)
def db_red_get_index():
try:
index_stmt = select(RedForecastAll.real_red_index_group)
index_result = db_session.execute(index_stmt).scalars()
index_result_list = []
for index in index_result:
if index is not None:
index_result_list.append(json.loads(index))
return index_result_list
except Exception as e:
print(e)
def db_red_insert(_data):
try:
insert_stmt = insert(RedForecastAll).values(_data)
insert_result = db_session.execute(insert_stmt).rowcount
if insert_result > 0:
db_session.commit()
else:
db_session.rollback()
return insert_result
except Exception as e:
print(e)
def history_scalar(_scalar):
_data = dict()
_data['id'] = _scalar.id
_data['dateId'] = _scalar.dateId
_data['openDate'] = _scalar.openDate
_data['red'] = _scalar.red
_data['blue'] = _scalar.blue
return _data
def history_scalars(_scalars):
_list = []
for row in _scalars:
_data = dict()
_data['id'] = row.id
_data['dateId'] = row.dateId
_data['openDate'] = row.openDate
_data['red'] = row.red
_data['blue'] = row.blue
_list.append(_data)
return _list