完成基本架构

main
roger 1 year ago
parent f1bbed6066
commit 5c73d4c0cf
  1. 78
      orm/blue_forecast.py
  2. 62
      orm/db.py
  3. 4
      orm/orm_db.py
  4. 10
      orm/util.py

@ -1,12 +1,66 @@
from orm_db import *
from sqlalchemy import select, desc
try:
get_last_data_stmt = select(History).order_by(desc('id')).limit(1)
last_data_result = db_session.execute(get_last_data_stmt).scalar()
print(last_data_result)
except Exception as e:
print(e)
last_blue = last_data_result.blue
print(last_blue)
from db import *
from util import *
# last_data = get_last_data()
# print(last_data)
#
# date_id_data = get_data_by_date_id('2023068')
# print(date_id_data)
# 获取全部数据
def get_all_data():
return db_get_all_data()
# 获取所有篮球的统计数据
def get_all_blue():
all_blue = dict()
all_data = db_get_all_data()
if all_data is not False:
for data in all_data:
if data['blue'] in all_blue.keys():
all_blue[data['blue']] += 1
else:
all_blue[data['blue']] = 1
return dict_sort(all_blue)
else:
return False
# 获取蓝球统计次数与平均值的额偏差
def get_delta_blue(_blue_data):
total = 0
if _blue_data is not False:
for _num in _blue_data.values():
total += _num
average = total / len(_blue_data.keys())
print(average)
_delta_blue_dict = dict()
for _key, _val in _blue_data.items():
_delta_blue_dict[_key] = _blue_data[_key] - average
return _delta_blue_dict
print(get_delta_blue(get_all_blue()))
# 获取预测的数据
def get_blue_forecast(_blue_data, _method, _num):
last_blue = db_get_last_data()['blue']
if last_blue is False:
return False
if _method == 'prv': # 当前一期往前推num期,包含此期数据
prv_blue_id_list = []
for _data in _blue_data:
if _data['blue'] == last_blue:
print(_data)
prv_blue_id_list.append(_data['id'])
print(prv_blue_id_list)
elif _method == 'post': # 当前一期往后num期,不包含此期的数据
pass
else:
return False
get_blue_forecast(get_all_data(), 'prv', 5)

@ -0,0 +1,62 @@
from orm_db import History, BlueForecastFiveAll, BlueForecastThreeAll, RedForecastAll
from sqlalchemy import engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, desc, asc
# 初始化数据库实例
DB_URI = 'mysql+pymysql://root:Sxzgx1209@home.rogersun.cn:3306/lottery'
db_engine = engine.create_engine(DB_URI, encoding='utf8', echo=True)
db_session = sessionmaker(bind=db_engine)()
def db_get_last_data():
try:
get_last_data_stmt = select(History).order_by(desc('id')).limit(1)
last_data_result = db_session.execute(get_last_data_stmt).scalar()
return history_scalar(last_data_result)
except Exception as e:
print(e)
return False
def db_get_all_data():
try:
get_all_data_stmt = select(History).order_by(asc(History.id))
all_data_result = db_session.execute(get_all_data_stmt).scalars()
return history_scalars(all_data_result)
except Exception as e:
print(e)
return False
def db_get_data_by_date_id(_date_id):
try:
get_by_open_date_stmt = select(History).where(History.dateId == _date_id)
open_date_result = db_session.execute(get_by_open_date_stmt).scalar()
return history_scalar(open_date_result)
except Exception as e:
print(e)
return False
def history_scalar(_scalar):
_data = dict()
_data['id'] = _scalar.id
_data['dateId'] = _scalar.dateId
_data['openDate'] = _scalar.openDate
_data['red'] = _scalar.red
_data['blue'] = _scalar.blue
return _data
def history_scalars(_scalars):
_list = []
for row in _scalars:
_data = dict()
_data['id'] = row.id
_data['dateId'] = row.dateId
_data['openDate'] = row.openDate
_data['red'] = row.red
_data['blue'] = row.blue
_list.append(_data)
return _list

@ -72,5 +72,5 @@ class RedForecastAll(Base):
# 创建数据表,下面部分没有联想
Base.metadata.create_all(engine)
Session_class = sessionmaker(bind=engine) # 创建与数据引擎的绑定会话类
db_session = Session_class() # 实例化session连接
# Session_class = sessionmaker(bind=engine) # 创建与数据引擎的绑定会话类
# db_session = Session_class() # 实例化session连接

@ -0,0 +1,10 @@
def dict_sort(_dict, _sort_by='key', _reverse=False):
if _sort_by == 'key':
_result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse)
else:
_result = sorted(_dict.items(), key=lambda x: x[1], reverse=_reverse)
_result_dict = dict()
for r in _result:
_result_dict[r[0]] = r[1]
return _result_dict
Loading…
Cancel
Save