import pymysql import json from functools import reduce from pymysql.cursors import DictCursor db = { 'host': 'home.rogersun.online', 'user': 'root', 'password': 'Sxzgx1209', 'database': 'lottery' } # 获取篮球为指定数值_blue时,未来range期出现各个蓝球号码的次数 def get_rate_one(_blue, _range): conn = pymysql.connect(**db) cursor = conn.cursor(DictCursor) cursor.execute('SELECT * FROM history WHERE blue = %s', (_blue,)) result = cursor.fetchall() id_list = [] for re in result: for r in range(_range): id_list.append(re['id'] + r + 1) id_list = set(id_list) cursor.execute('SELECT * FROM history WHERE id IN %(ids)s', {'ids': id_list}) result = cursor.fetchall() cursor.close() conn.commit() result_dict = {} for r in result: b = r['blue'] if b in result_dict.keys(): result_dict[b] += 1 else: result_dict[b] = 1 return dict_sort(result_dict, 'val', True) def dict_sort(_dict, _key, _reverse=False): if _key == "key": result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse) else: result = sorted(_dict.items(), key=lambda x: x[1], reverse=_reverse) result_val_dict = {} for rv in result: result_val_dict[rv[0]] = rv[1] # print(result_val_dict) return result_val_dict def dict_rate(_dict): _sum = reduce((lambda x, y: x + y), _dict.values()) for k, v in _dict.items(): _dict[k] = str(round(v / _sum * 100, 2)) + '%' # print(_dict) return _dict def get_rate(_range): conn = pymysql.connect(**db) cursor = conn.cursor(DictCursor) cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,)) result = cursor.fetchall() cursor.close() conn.commit() final_dict = {} for i in range(_range): blue = result[i]['blue'] _sub_range = i + 1 for k, v in get_rate_one(blue, _sub_range).items(): if k in final_dict: final_dict[k] += v else: final_dict[k] = v return dict_sort(final_dict, 'val', True) def update_db(_range): result = get_rate(_range) rj = json.dumps(result) print(result) conn = pymysql.connect(**db) cursor = conn.cursor(DictCursor) cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,)) db_result = cursor.fetchall() date_id = db_result[0]['dateId'] # print(date_id, str(_range), rj) cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range))) if len(cursor.fetchall()) == 0: cursor.execute('INSERT INTO blue_forecast (dateId, params, blue) VALUES (%s, %s, %s)', (date_id, str(_range), rj)) else: print('已经存在对应数据,请检查:') print(rj) cursor.close() conn.commit() return result def get_data(_range): conn = pymysql.connect(**db) cursor = conn.cursor(DictCursor) cursor.execute('SELECT dateId FROM blue_forecast ORDER BY id DESC LIMIT 1') date_id = cursor.fetchall()[0]['dateId'] cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range))) data = json.loads(cursor.fetchall()[0]['blue']) count = 0 for d in data.values(): count += d for key, val in data.items(): print("%s: %2.2f%%" % (key, (val / count) * 100)) def get_blue_suggest(_range): result_list = [] for r in range(_range): result_list.append(update_db(r + 1)) keys = [str(i + 1).rjust(2, '0') for i in range(16)] # print(keys) # print(result_list) result = {} for k in keys: for data in result_list: if k in result.keys() and k in data.keys(): result[k] += data[k] elif k in result.keys(): result[k] += 0 else: result[k] = data[k] print(f'推荐的篮球结果,未来{str(_range)}期历史数据统计总和:') for k,v in dict_rate(dict_sort(result, 'val', True)).items(): print(f"{k} - {v}") if __name__ == "__main__": range_num = 5 get_blue_suggest(range_num) # get_data(range_num)