用于处理彩票的大数据算法
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

153 lines
4.2 KiB

import pymysql
import json
from functools import reduce
from pymysql.cursors import DictCursor
db = {
'host': 'home.rogersun.online',
'user': 'root',
'password': 'Sxzgx1209',
'database': 'lottery'
}
# 获取篮球为指定数值_blue时,未来range期出现各个蓝球号码的次数
def get_rate_one(_blue, _range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history WHERE blue = %s', (_blue,))
result = cursor.fetchall()
id_list = []
for re in result:
for r in range(_range):
id_list.append(re['id'] + r + 1)
id_list = set(id_list)
cursor.execute('SELECT * FROM history WHERE id IN %(ids)s', {'ids': id_list})
result = cursor.fetchall()
cursor.close()
conn.commit()
result_dict = {}
for r in result:
b = r['blue']
if b in result_dict.keys():
result_dict[b] += 1
else:
result_dict[b] = 1
return dict_sort(result_dict, 'val', True)
def dict_sort(_dict, _key, _reverse=False):
if _key == "key":
result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse)
else:
result = sorted(_dict.items(), key=lambda x: x[1], reverse=_reverse)
result_val_dict = {}
for rv in result:
result_val_dict[rv[0]] = rv[1]
return result_val_dict
def dict_rate(_dict):
_sum = reduce((lambda x, y: x + y), _dict.values())
for k, v in _dict.items():
_dict[k] = str(round(v / _sum * 100, 2)) + '%'
return _dict
def get_rate(_range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,))
result = cursor.fetchall()
cursor.close()
conn.commit()
final_dict = {}
for i in range(_range):
blue = result[i]['blue']
_sub_range = i + 1
for k, v in get_rate_one(blue, _sub_range).items():
if k in final_dict:
final_dict[k] += v
else:
final_dict[k] = v
return dict_sort(final_dict, 'val', True)
def update_db(_range):
result = get_rate(_range)
rj = json.dumps(result)
print(result)
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,))
db_result = cursor.fetchall()
date_id = db_result[0]['dateId']
# print(date_id, str(_range), rj)
cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range)))
if len(cursor.fetchall()) == 0:
cursor.execute('INSERT INTO blue_forecast (dateId, params, blue) VALUES (%s, %s, %s)',
(date_id, str(_range), rj))
else:
print('已经存在对应数据,请检查:')
print(rj)
cursor.close()
conn.commit()
return result
def get_data(_range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT dateId FROM blue_forecast ORDER BY id DESC LIMIT 1')
date_id = cursor.fetchall()[0]['dateId']
cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range)))
data = json.loads(cursor.fetchall()[0]['blue'])
count = 0
for d in data.values():
count += d
for key, val in data.items():
print("%s: %2.2f%%" % (key, (val / count) * 100))
def get_blue_suggest(_range):
result_list = []
for r in range(_range):
result_list.append(update_db(r + 1))
keys = [str(i + 1).rjust(2, '0') for i in range(16)]
# print(keys)
# print(result_list)
result = {}
for k in keys:
for data in result_list:
if k in result.keys() and k in data.keys():
result[k] += data[k]
elif k in result.keys():
result[k] += 0
else:
result[k] = data[k]
print(f'推荐的篮球结果,未来{str(_range)}期历史数据统计总和:')
for k,v in dict_rate(dict_sort(result, 'val', True)).items():
print(f"{k} - {v}")
if __name__ == "__main__":
range_num = 5
get_blue_suggest(range_num)
# get_data(range_num)