写了点红球代码

main
roger 1 year ago
parent ce0bac6858
commit f95623ec06
  1. 71
      orm/blue_forecast.py
  2. 46
      orm/db.py
  3. 24
      orm/orm_db.py
  4. 107
      orm/red_forecast.py
  5. 51
      orm/util.py

@ -1,10 +1,11 @@
import json
from db import * from db import *
from util import * from util import *
# 获取全部数据 # 获取全部数据
def get_all_data(): def get_all_data():
return db_get_all_data() return db_history_get_all_data()
# 获取所有篮球的统计数据 # 获取所有篮球的统计数据
@ -55,7 +56,7 @@ def get_all_post_id_list(_all_data, _blue_id_list, _num):
# 获取蓝球统计数据 # 获取蓝球统计数据
def get_blue_statistics(_prv_blue_id_list): def get_blue_statistics(_prv_blue_id_list):
_blue_data = db_get_data_in_ids(_prv_blue_id_list) _blue_data = db_history_get_data_in_ids(_prv_blue_id_list)
_blue_dict = dict() _blue_dict = dict()
for _data in _blue_data: for _data in _blue_data:
if _data['blue'] in _blue_dict.keys(): if _data['blue'] in _blue_dict.keys():
@ -65,18 +66,6 @@ def get_blue_statistics(_prv_blue_id_list):
return _blue_dict return _blue_dict
# 获取分析后的差异数据
def get_analysis_data(_raw_blue_data):
total = 0
for _num in _raw_blue_data.values():
total += _num
average = total / len(_raw_blue_data.keys())
_delta_blue_dict = dict()
for _key, _val in _raw_blue_data.items():
_delta_blue_dict[_key] = round(_raw_blue_data[_key] - average, 2)
return _delta_blue_dict
# 根据蓝球获取全部历史中出现过此蓝球的id列表 # 根据蓝球获取全部历史中出现过此蓝球的id列表
def get_blue_id_list_by_blue(_all_data, _last_blue): def get_blue_id_list_by_blue(_all_data, _last_blue):
blue_id_list = [] blue_id_list = []
@ -88,20 +77,17 @@ def get_blue_id_list_by_blue(_all_data, _last_blue):
# 获取预测的数据 # 获取预测的数据
def get_blue_forecast(_all_data, _method, _num): def get_blue_forecast(_all_data, _method, _num):
# last_blue = db_get_last_data()['blue']
# if last_blue is False:
# return False
# blue_id_list = []
# for _data in _all_data:
# if _data['blue'] == last_blue:
# blue_id_list.append(_data['id'])
# # print(blue_id_list)
# min_id, max_id = 1, _all_data[-1]['id'] # 获取最大和最小id
""" """
prv
当_num=3 当_num=3
先取当前一期的蓝球号码然后计算历史上相同蓝球出现的后一期蓝球 先取当前一期的蓝球号码然后计算历史上相同蓝球出现的后一期蓝球
然后往前推一期取这一期蓝球号码统计这个号码后两期的蓝球不包含后一期 然后往前推一期取这一期蓝球号码统计这个号码后两期的蓝球不包含后一期
然后再往前推一期取这一期蓝球号码统计这个号码后三期的蓝球不包含后一期和后两期 然后再往前推一期取这一期蓝球号码统计这个号码后三期的蓝球不包含后一期和后两期
post
当_num=3
先取当前一期的蓝球然后从获取历史中蓝球的id
在此id的基础上累加1,2,3获取到3个id
查询这些id对应蓝球统计蓝球数据
""" """
if _method == 'prv': # 当前一期往前推num期,包含此期数据 if _method == 'prv': # 当前一期往前推num期,包含此期数据
@ -110,7 +96,6 @@ def get_blue_forecast(_all_data, _method, _num):
_raw_blue_data = get_blue_statistics(prv_blue_id_list) # 根据上步查出的id列表,统计出蓝球出现的次数 _raw_blue_data = get_blue_statistics(prv_blue_id_list) # 根据上步查出的id列表,统计出蓝球出现的次数
_result = get_analysis_data(_raw_blue_data) # 根据上步蓝球的出现次数统计,计算偏差值 _result = get_analysis_data(_raw_blue_data) # 根据上步蓝球的出现次数统计,计算偏差值
return dict_sort(_result, 'val', True) return dict_sort(_result, 'val', True)
elif _method == 'post': # 当前一期往后num期,不包含此期的数据 elif _method == 'post': # 当前一期往后num期,不包含此期的数据
last_blue = _all_data[-1]['blue'] # 获取最后一期蓝球号码 last_blue = _all_data[-1]['blue'] # 获取最后一期蓝球号码
blue_id_list = get_blue_id_list_by_blue(_all_data, last_blue) # 获取所以蓝球等于最后一期蓝球的期数id blue_id_list = get_blue_id_list_by_blue(_all_data, last_blue) # 获取所以蓝球等于最后一期蓝球的期数id
@ -118,20 +103,44 @@ def get_blue_forecast(_all_data, _method, _num):
_raw_blue_data = get_blue_statistics(post_blue_id_list) # 根据上步查出的id列表,统计出蓝球出现的次数 _raw_blue_data = get_blue_statistics(post_blue_id_list) # 根据上步查出的id列表,统计出蓝球出现的次数
_result = get_analysis_data(_raw_blue_data) # 根据上步蓝球的出现次数统计,计算偏差值 _result = get_analysis_data(_raw_blue_data) # 根据上步蓝球的出现次数统计,计算偏差值
return dict_sort(_result, 'val', True) return dict_sort(_result, 'val', True)
else: else:
return False return False
def insert_blue_data(_all_data, _history, _prv, _post, _num):
insert_data = dict()
insert_data['dateId'] = _all_data[-1]['dateId']
insert_data['history'] = _history
insert_data['prv'] = _prv
insert_data['post'] = _post
result = db_blue_insert(insert_data, _num)
if result > 0:
print(f"输入数据成功,插入{result}条数据")
else:
print("数据插入失败")
def print_blue_data(_history, _prv_five, _post_five, _prv_three, _post_three):
_history_list = list(_history.items())
_prv_five_list = list(_prv_five.items())
_post_five_list = list(_post_five.items())
_prv_three_list = list(_prv_three.items())
_post_three_list = list(_post_three.items())
for i in range(16):
print(f"{_history_list[i][0]} [{_history_list[i][1]}]\t"
f" {_prv_five_list[i][0]} [{_prv_five_list[i][1]}]\t"
f"{_post_five_list[i][0]} [{_post_five_list[i][1]}]\t"
f"{_prv_three_list[i][0]} [{_prv_three_list[i][1]}]\t"
f"{_post_three_list[i][0]} [{_post_three_list[i][1]}]")
if __name__ == "__main__": if __name__ == "__main__":
all_data = get_all_data() all_data = get_all_data()
history_total = get_delta_blue(get_all_blue(all_data)) history_total = get_delta_blue(get_all_blue(all_data))
prv_five = get_blue_forecast(get_all_data(), 'prv', 5) prv_five = get_blue_forecast(get_all_data(), 'prv', 5)
prv_three = get_blue_forecast(get_all_data(), 'prv', 3)
post_five = get_blue_forecast(get_all_data(), 'post', 5) post_five = get_blue_forecast(get_all_data(), 'post', 5)
prv_three = get_blue_forecast(get_all_data(), 'prv', 3)
post_three = get_blue_forecast(get_all_data(), 'post', 3) post_three = get_blue_forecast(get_all_data(), 'post', 3)
print(history_total) insert_blue_data(all_data, json.dumps(history_total), json.dumps(prv_five), json.dumps(post_five), 5)
print(prv_five) insert_blue_data(all_data, json.dumps(history_total), json.dumps(prv_three), json.dumps(post_three), 3)
print(prv_three) print_blue_data(history_total, prv_five, post_five, prv_three, post_three)
print(post_five)
print(post_three)

@ -1,7 +1,9 @@
import json
from orm_db import History, BlueForecastFiveAll, BlueForecastThreeAll, RedForecastAll from orm_db import History, BlueForecastFiveAll, BlueForecastThreeAll, RedForecastAll
from sqlalchemy import engine from sqlalchemy import engine
from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import select, desc, asc from sqlalchemy import select, insert, desc, asc
# from sqlalchemy import Column, Integer, String, DATE # from sqlalchemy import Column, Integer, String, DATE
@ -11,7 +13,7 @@ db_engine = engine.create_engine(DB_URI, echo=False)
db_session = sessionmaker(bind=db_engine)() db_session = sessionmaker(bind=db_engine)()
def db_get_last_data(): def db_history_get_last_data():
try: try:
get_last_data_stmt = select(History).order_by(desc('id')).limit(1) get_last_data_stmt = select(History).order_by(desc('id')).limit(1)
last_data_result = db_session.execute(get_last_data_stmt).scalar() last_data_result = db_session.execute(get_last_data_stmt).scalar()
@ -21,7 +23,7 @@ def db_get_last_data():
return False return False
def db_get_all_data(): def db_history_get_all_data():
try: try:
get_all_data_stmt = select(History).order_by(asc(History.id)) get_all_data_stmt = select(History).order_by(asc(History.id))
all_data_result = db_session.execute(get_all_data_stmt).scalars() all_data_result = db_session.execute(get_all_data_stmt).scalars()
@ -31,7 +33,7 @@ def db_get_all_data():
return False return False
def db_get_data_by_date_id(_date_id): def db_history_get_data_by_date_id(_date_id):
try: try:
get_by_open_date_stmt = select(History).where(History.dateId == _date_id) get_by_open_date_stmt = select(History).where(History.dateId == _date_id)
open_date_result = db_session.execute(get_by_open_date_stmt).scalar() open_date_result = db_session.execute(get_by_open_date_stmt).scalar()
@ -41,7 +43,7 @@ def db_get_data_by_date_id(_date_id):
return False return False
def db_get_data_in_ids(_ids): def db_history_get_data_in_ids(_ids):
try: try:
get_in_ids_stmt = select(History).where(History.id.in_(_ids)) get_in_ids_stmt = select(History).where(History.id.in_(_ids))
ids_result = db_session.execute(get_in_ids_stmt).scalars() ids_result = db_session.execute(get_in_ids_stmt).scalars()
@ -52,6 +54,40 @@ def db_get_data_in_ids(_ids):
return False return False
def db_blue_insert(_data, _num):
try:
if _num == 5:
_table = BlueForecastFiveAll
elif _num == 3:
_table = BlueForecastThreeAll
else:
print('_num值错误')
blue_insert_stmt = insert(_table).values(_data)
insert_result = db_session.execute(blue_insert_stmt).rowcount
if insert_result > 0:
db_session.commit()
else:
db_session.rollback()
return insert_result
except Exception as e:
print(e)
def db_red_index(_mode):
try:
if _mode == 'last':
index_stmt = select(RedForecastAll.last_random_index_group)
else:
index_stmt = select(RedForecastAll.history_random_index_group)
index_result = db_session.execute(index_stmt).scalars()
index_result_list = []
for index in index_result:
index_result_list.append(json.loads(index))
return index_result_list
except Exception as e:
print(e)
def history_scalar(_scalar): def history_scalar(_scalar):
_data = dict() _data = dict()
_data['id'] = _scalar.id _data['id'] = _scalar.id

@ -30,11 +30,11 @@ class BlueForecastFiveAll(Base):
id = Column(Integer, autoincrement=True, primary_key=True) id = Column(Integer, autoincrement=True, primary_key=True)
dateId = Column(String(20), nullable=False, comment='期号') dateId = Column(String(20), nullable=False, comment='期号')
real_blue = Column(String(10), comment='真实的蓝球数据') real_blue = Column(String(10), comment='真实的蓝球数据')
prv_five = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往前推5期,计算蓝球出现概率') prv = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往前推5期,计算蓝球出现概率')
prv_index = Column(Integer, comment='根据实际结果计算蓝球的index位置') prv_index = Column(Integer, comment='根据实际结果计算蓝球的index位置')
post_five = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往后推5期,计算蓝球出现概率') post = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往后推5期,计算蓝球出现概率')
post_index = Column(Integer, comment='根据实际结果计算蓝球的index位置') post_index = Column(Integer, comment='根据实际结果计算蓝球的index位置')
history_total = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中计算每个蓝球出现的概率') history = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中计算每个蓝球出现的概率')
history_index = Column(Integer, comment='历史中的蓝球索引') history_index = Column(Integer, comment='历史中的蓝球索引')
@ -45,12 +45,12 @@ class BlueForecastThreeAll(Base):
id = Column(Integer, autoincrement=True, primary_key=True) id = Column(Integer, autoincrement=True, primary_key=True)
dateId = Column(String(20), nullable=False, comment='期号') dateId = Column(String(20), nullable=False, comment='期号')
real_blue = Column(String(10), comment='真实的蓝球数据') real_blue = Column(String(10), comment='真实的蓝球数据')
prv_three = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往前推5期,计算蓝球出现概率') prv = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中往前推5期,计算蓝球出现概率')
prv_index = Column(Integer, comment='根据实际结果计算蓝球的index位置') prv_index = Column(Integer, comment='根据实际结果计算蓝球的index位置')
post_three = Column(String(500), nullable=False, post = Column(String(500), nullable=False,
comment='根据当前一期的蓝球,从历史数据中往后推5期,计算蓝球出现概率') comment='根据当前一期的蓝球,从历史数据中往后推5期,计算蓝球出现概率')
post_index = Column(Integer, comment='根据实际结果计算蓝球的index位置') post_index = Column(Integer, comment='根据实际结果计算蓝球的index位置')
history_total = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中计算每个蓝球出现的概率') history = Column(String(500), nullable=False, comment='根据当前一期的蓝球,从历史数据中计算每个蓝球出现的概率')
history_index = Column(Integer, comment='历史中的蓝球索引') history_index = Column(Integer, comment='历史中的蓝球索引')
@ -60,10 +60,14 @@ class RedForecastAll(Base):
# 定义表结构 # 定义表结构
id = Column(Integer, autoincrement=True, primary_key=True) id = Column(Integer, autoincrement=True, primary_key=True)
dateId = Column(String(20), nullable=False, comment='期号-指预测数据根据哪一期数据计算') dateId = Column(String(20), nullable=False, comment='期号-指预测数据根据哪一期数据计算')
red_rate = Column(String(500), nullable=False, comment='根据上一期红球结果计算历史中红球的出现概率') history = Column(String(500), nullable=False, comment='历史中红球的出现概率偏差')
random_red_num = Column(String(256), comment='随机计算的红球数据') history_random = Column(String(256), comment='根据历史差异随机计算的红球数据')
random_red_index = Column(String(256), comment='预测红球数据对应的index') history_random_index = Column(String(256), comment='预测红球数据对应的index')
random_red_index_group = Column(String(256), comment='预测红球数据对应的index分布') history_random_index_group = Column(String(256), comment='预测红球数据对应的index分布')
last = Column(String(500), nullable=False, comment='根据上一期红球结果计算历史中红球的出现概率')
last_random = Column(String(256), comment='随机计算的红球数据')
last_random_index = Column(String(256), comment='预测红球数据对应的index')
last_random_index_group = Column(String(256), comment='预测红球数据对应的index分布')
real_red = Column(String(256), comment='下一期的真实的红球数据') real_red = Column(String(256), comment='下一期的真实的红球数据')
real_red_index = Column(String(100), comment='真实红球在此次预测中的索引位置') real_red_index = Column(String(100), comment='真实红球在此次预测中的索引位置')
real_red_index_group = Column(String(100), comment='真实红球在此次预测中的索引位置的分布') real_red_index_group = Column(String(100), comment='真实红球在此次预测中的索引位置的分布')

@ -0,0 +1,107 @@
import random
from db import *
from util import *
import json
# 获取所有数据
def get_all_data():
return db_history_get_all_data()
# 获取最后一期红球号码
def get_last_red_data(_all_data):
return json.loads(_all_data[-1]['red'])
# 获取历史上全部红球出现次数
def get_all_red(_all_data):
_all_red_dict = dict()
for _red_list in _all_data:
for _red in json.loads(_red_list['red']):
if _red in _all_red_dict.keys():
_all_red_dict[_red] += 1
else:
_all_red_dict[_red] = 1
return _all_red_dict
# 获取历史红球的偏差值数据
def get_all_red_delta(_all_data):
return dict_sort(get_analysis_data(get_all_red(_all_data)), 'val', True)
def get_all_red_in_last(_all_data):
last_red_list = get_last_red_data(_all_data)
_include_last_red_next_id_list = []
for _data in _all_data:
for _red in last_red_list:
if _red in json.loads(_data['red']):
_include_last_red_next_id_list.append(_data['id'] + 1)
break
_last_red_dict = dict()
for _data in db_history_get_data_in_ids(_include_last_red_next_id_list):
for _red in json.loads(_data['red']):
if _red in json.loads(_data['red']):
if _red in _last_red_dict.keys():
_last_red_dict[_red] += 1
else:
_last_red_dict[_red] = 1
return _last_red_dict
def get_last_red_delta(_last_red):
return dict_sort(get_analysis_data(_last_red), 'val', True)
def get_random_index():
# 产生随机数
random_high, random_middle, random_low = -1, -1, -1
while random_high < 0 or random_middle < 0 or random_low < 0:
random_high = random.randint(0, 2)
random_low = random.randint(1, 6 - random_high)
random_middle = 6 - random_high - random_low
print('分布区间(高概率区,中概率区,低概率区):', random_high, random_middle, random_low)
high_red_index_list = [(n[0] - 1) for n in new_num(1, 11, random_high)[0:random_high]]
middle_red_index_list = [(n[0] - 1) for n in new_num(12, 22, random_middle)[0:random_middle]]
low_red_index_list = [(n[0] - 1) for n in new_num(23, 33, random_low)[0:random_low]]
print(high_red_index_list)
print(middle_red_index_list)
print(low_red_index_list)
red_index_list = sorted(high_red_index_list + middle_red_index_list + low_red_index_list)
print(f"排序后结果")
print(red_index_list)
return red_index_list
def get_index_from_db(_mode):
index_list = db_red_index(_mode)
def get_forecast_red(_red_data):
index_list = get_random_index()
forecast_red_list = []
_red_data_list = list(_red_data.items())
for i in index_list:
forecast_red_list.append(_red_data_list[i][0])
forecast_red_list = sorted(forecast_red_list)
print('随机红球推荐:')
print(forecast_red_list)
return forecast_red_list
def get_forecast_red_by_db_index(_red_data, _mode='history'):
pass
if __name__ == "__main__":
all_data = get_all_data()
all_red_delta = get_all_red_delta(all_data)
# print(all_red_delta)
last_red_delta = get_last_red_delta(get_all_red_in_last(all_data))
# print(last_red_delta)
print_data([all_red_delta, last_red_delta])
forecast_red_history = get_forecast_red(all_red_delta)
forecast_red_last = get_forecast_red(last_red_delta)

@ -1,3 +1,7 @@
import numpy
# 字典排序
def dict_sort(_dict, _sort_by='key', _reverse=False): def dict_sort(_dict, _sort_by='key', _reverse=False):
if _sort_by == 'key': if _sort_by == 'key':
_result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse) _result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse)
@ -8,3 +12,50 @@ def dict_sort(_dict, _sort_by='key', _reverse=False):
for r in _result: for r in _result:
_result_dict[r[0]] = r[1] _result_dict[r[0]] = r[1]
return _result_dict return _result_dict
# 计算偏差值
def get_analysis_data(_raw_dict):
total = 0
for _num in _raw_dict.values():
total += _num
average = total / len(_raw_dict.keys())
_delta_dict = dict()
for _key, _val in _raw_dict.items():
_delta_dict[_key] = round(_raw_dict[_key] - average, 2)
return _delta_dict
def print_data(_data_list):
new_data_list = [list(d.items()) for d in _data_list]
for i in range(len(new_data_list[0])):
print_str = f""
for data in new_data_list:
print_str += f"{data[i][0]} [{data[i][1]}]\t"
print(print_str)
def new_num(min_num, max_num, random_num):
num = {}
d = [i for i in range(min_num, max_num + 1)]
n = 100000
while n > 0:
result = numpy.random.choice(d, random_num, replace=False)
# print(sorted(r))
for r in result:
if r in num.keys():
num[r] += 1
else:
num[r] = 1
n -= 1
sorted_result = sorted(num.items(), key=lambda x: x[0])
red_result_dict = {}
for item in sorted_result:
red_result_dict[item[0]] = item[1]
# print(red_result_dict)
sorted_result_by_value = sorted(num.items(), key=lambda x: x[1], reverse=True)
# print(sorted_result_by_value)
return sorted_result_by_value

Loading…
Cancel
Save