用于处理彩票的大数据算法
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
5.7 KiB

import random
from db import *
from util import *
import json
# 获取所有数据
def get_all_data():
return db_history_get_all_data()
# 获取最后一期红球号码
def get_last_red_data(_all_data):
return json.loads(_all_data[-1]['red'])
# 获取历史上全部红球出现次数
def get_all_red(_all_data):
_all_red_dict = dict()
for _red_list in _all_data:
for _red in json.loads(_red_list['red']):
if _red in _all_red_dict.keys():
_all_red_dict[_red] += 1
else:
_all_red_dict[_red] = 1
return _all_red_dict
# 获取历史红球的偏差值数据
def get_all_red_delta(_all_data):
return dict_sort(get_analysis_data(get_all_red(_all_data)), 'val', True)
def get_all_red_in_last(_all_data):
last_red_list = get_last_red_data(_all_data)
_include_last_red_next_id_list = []
for _data in _all_data:
for _red in last_red_list:
if _red in json.loads(_data['red']):
_include_last_red_next_id_list.append(_data['id'] + 1)
break
_last_red_dict = dict()
for _data in db_history_get_data_in_ids(_include_last_red_next_id_list):
for _red in json.loads(_data['red']):
if _red in json.loads(_data['red']):
if _red in _last_red_dict.keys():
_last_red_dict[_red] += 1
else:
_last_red_dict[_red] = 1
return _last_red_dict
def get_last_red_delta(_last_red):
return dict_sort(get_analysis_data(_last_red), 'val', True)
def get_index_from_db(_index_type):
index_list = db_index_get_index_group(_index_type)
if len(index_list) > 20:
return random.choice(index_list)
else:
return get_random_index_group()
def get_random_index_group():
random_high, random_middle, random_low = -1, -1, -1
while random_high < 0 or random_middle < 0 or random_low < 0:
random_high = random.randint(0, 2)
random_low = random.randint(1, 6 - random_high)
random_middle = 6 - random_high - random_low
return [random_high, random_middle, random_low]
def get_random_index(_mode='history', _index_type='history'):
# 产生随机数
if _mode == 'history':
random_high, random_middle, random_low = get_index_from_db(_index_type)
else:
random_high, random_middle, random_low = get_random_index_group()
print('分布区间(高概率区,中概率区,低概率区):', random_high, random_middle, random_low)
red_index_group = [int(random_high), int(random_middle), int(random_low)]
high_red_index_list = [(n[0] - 1) for n in new_num(1, 11, random_high)[0:random_high]]
middle_red_index_list = [(n[0] - 1) for n in new_num(12, 22, random_middle)[0:random_middle]]
low_red_index_list = [(n[0] - 1) for n in new_num(23, 33, random_low)[0:random_low]]
print(high_red_index_list)
print(middle_red_index_list)
print(low_red_index_list)
red_index_list = sorted(high_red_index_list + middle_red_index_list + low_red_index_list)
red_index_list = [int(n) for n in red_index_list]
print(f"排序后索引")
print(red_index_list)
return red_index_group, red_index_list
def get_forecast_red(_red_data, _index_type):
index_group, index_list = get_random_index('history', _index_type)
forecast_red_list = []
_red_data_list = list(_red_data.items())
for i in index_list:
forecast_red_list.append(_red_data_list[i][0])
forecast_red_list = sorted(forecast_red_list)
print('随机红球推荐:')
print(forecast_red_list)
return {'forecast': forecast_red_list, 'group': index_group, 'index': index_list}
def insert_red_data(_all_data, _history, _history_random, _last, _last_random):
_insert_data = dict()
_insert_data['dateId'] = _all_data[-1]['dateId']
_insert_data['history'] = json.dumps(_history)
_insert_data['history_random'] = json.dumps(_history_random['forecast'])
_insert_data['history_random_index'] = json.dumps(_history_random['index'])
_insert_data['history_random_index_group'] = json.dumps(_history_random['group'])
_insert_data['last'] = json.dumps(_last)
_insert_data['last_random'] = json.dumps(_last_random['forecast'])
_insert_data['last_random_index'] = json.dumps(_last_random['index'])
_insert_data['last_random_index_group'] = json.dumps(_last_random['group'])
result = db_red_insert(_insert_data)
print('预测红球数据:')
print(_insert_data['history_random'])
print(_insert_data['last_random'])
if result > 0:
print(f"输入数据成功,插入{result}条数据")
else:
print("数据插入失败")
def red_forecast():
all_data = get_all_data()
history_red_delta = get_all_red_delta(all_data)
# print(all_red_delta)
last_red_delta = get_last_red_delta(get_all_red_in_last(all_data))
# print(last_red_delta)
print_data([history_red_delta, last_red_delta])
forecast_red_history_data = get_forecast_red(history_red_delta, 'history')
forecast_red_last_data = get_forecast_red(last_red_delta, 'last')
insert_red_data(all_data, history_red_delta, forecast_red_history_data, last_red_delta, forecast_red_last_data)
if __name__ == "__main__":
# all_data = get_all_data()
# history_red_delta = get_all_red_delta(all_data)
# # print(all_red_delta)
# last_red_delta = get_last_red_delta(get_all_red_in_last(all_data))
# # print(last_red_delta)
# print_data([history_red_delta, last_red_delta])
# forecast_red_history_data = get_forecast_red(history_red_delta, 'history')
# forecast_red_last_data = get_forecast_red(last_red_delta, 'last')
# insert_red_data(all_data, history_red_delta, forecast_red_history_data, last_red_delta, forecast_red_last_data)
pass