first commit

main
roger_home_pc 2 years ago
commit 5243b564d0
  1. 1
      .gitignore
  2. 8
      .idea/.gitignore
  3. 10
      .idea/Lottery.iml
  4. 6
      .idea/inspectionProfiles/profiles_settings.xml
  5. 4
      .idea/misc.xml
  6. 8
      .idea/modules.xml
  7. 6
      .idea/vcs.xml
  8. 153
      blue_forecast.py
  9. 112
      history_result.py
  10. 31
      new_record_db.py
  11. 54
      random.py
  12. 46
      scrapy.py

1
.gitignore vendored

@ -0,0 +1 @@
/venv/

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (Lottery)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (Lottery)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Lottery.iml" filepath="$PROJECT_DIR$/.idea/Lottery.iml" />
</modules>
</component>
</project>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -0,0 +1,153 @@
import pymysql
import json
from functools import reduce
from pymysql.cursors import DictCursor
db = {
'host': 'home.rogersun.online',
'user': 'root',
'password': 'Sxzgx1209',
'database': 'lottery'
}
# 获取篮球为指定数值_blue时,未来range期出现各个蓝球号码的次数
def get_rate_one(_blue, _range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history WHERE blue = %s', (_blue,))
result = cursor.fetchall()
id_list = []
for re in result:
for r in range(_range):
id_list.append(re['id'] + r + 1)
id_list = set(id_list)
cursor.execute('SELECT * FROM history WHERE id IN %(ids)s', {'ids': id_list})
result = cursor.fetchall()
cursor.close()
conn.commit()
result_dict = {}
for r in result:
b = r['blue']
if b in result_dict.keys():
result_dict[b] += 1
else:
result_dict[b] = 1
return dict_sort(result_dict, 'val', True)
def dict_sort(_dict, _key, _reverse=False):
if _key == "key":
result = sorted(_dict.items(), key=lambda x: x[0], reverse=_reverse)
else:
result = sorted(_dict.items(), key=lambda x: x[1], reverse=_reverse)
result_val_dict = {}
for rv in result:
result_val_dict[rv[0]] = rv[1]
return result_val_dict
def dict_rate(_dict):
_sum = reduce((lambda x, y: x + y), _dict.values())
for k, v in _dict.items():
_dict[k] = str(round(v / _sum * 100, 2)) + '%'
return _dict
def get_rate(_range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,))
result = cursor.fetchall()
cursor.close()
conn.commit()
final_dict = {}
for i in range(_range):
blue = result[i]['blue']
_sub_range = i + 1
for k, v in get_rate_one(blue, _sub_range).items():
if k in final_dict:
final_dict[k] += v
else:
final_dict[k] = v
return dict_sort(final_dict, 'val', True)
def update_db(_range):
result = get_rate(_range)
rj = json.dumps(result)
print(result)
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT * FROM history ORDER BY id DESC LIMIT %s', (_range,))
db_result = cursor.fetchall()
date_id = db_result[0]['dateId']
# print(date_id, str(_range), rj)
cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range)))
if len(cursor.fetchall()) == 0:
cursor.execute('INSERT INTO blue_forecast (dateId, params, blue) VALUES (%s, %s, %s)',
(date_id, str(_range), rj))
else:
print('已经存在对应数据,请检查:')
print(rj)
cursor.close()
conn.commit()
return result
def get_data(_range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
cursor.execute('SELECT dateId FROM blue_forecast ORDER BY id DESC LIMIT 1')
date_id = cursor.fetchall()[0]['dateId']
cursor.execute('SELECT * FROM blue_forecast WHERE dateId = %s AND params = %s', (date_id, str(_range)))
data = json.loads(cursor.fetchall()[0]['blue'])
count = 0
for d in data.values():
count += d
for key, val in data.items():
print("%s: %2.2f%%" % (key, (val / count) * 100))
def get_blue_suggest(_range):
result_list = []
for r in range(_range):
result_list.append(update_db(r + 1))
keys = [str(i + 1).rjust(2, '0') for i in range(16)]
# print(keys)
# print(result_list)
result = {}
for k in keys:
for data in result_list:
if k in result.keys() and k in data.keys():
result[k] += data[k]
elif k in result.keys():
result[k] += 0
else:
result[k] = data[k]
print('推荐的篮球结果:')
for k,v in dict_rate(dict_sort(result, 'val', True)).items():
print(f"{k} - {v}")
if __name__ == "__main__":
range_num = 5
get_blue_suggest(range_num)
# get_data(range_num)

@ -0,0 +1,112 @@
import pymysql
from functools import reduce
import json
from pymysql.cursors import DictCursor
from blue_forecast import dict_sort
db = {
'host': 'home.rogersun.online',
'user': 'root',
'password': 'Sxzgx1209',
'database': 'lottery'
}
def dict_rate(_dict):
_sum = reduce((lambda x, y: x + y), _dict.values())
for k, v in _dict.items():
_dict[k] = str(round(v / _sum * 100, 2)) + '%'
return _dict
def get_blue_rate(_date_id, _range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
# 获取指定一期的id和篮球号码
cursor.execute('SELECT id, blue FROM history WHERE dateId = %s', (_date_id,))
d = cursor.fetchall()[0]
data_id = d['id']
blue = d['blue']
# 获取指定一期之前的全部篮球相同的数据
cursor.execute('SELECT * FROM history WHERE id <= %s AND blue = %s', (data_id, blue))
history_data = cursor.fetchall()
id_list = []
for data in history_data:
for r in range(_range):
id_list.append(data['id'] + r + 1)
id_list = set(id_list)
cursor.execute('SELECT * FROM history WHERE id IN %(ids)s', {'ids': id_list})
result = cursor.fetchall()
cursor.close()
conn.commit()
result_dict = {}
for r in result:
b = r['blue']
if b in result_dict.keys():
result_dict[b] += 1
else:
result_dict[b] = 1
return dict_sort(result_dict, 'val', True)
def get_result(_date_id, _range):
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
# 获取指定一期的id和篮球号码
cursor.execute('SELECT id, blue FROM history WHERE dateId = %s', (_date_id,))
d = cursor.fetchall()[0]
data_id = d['id']
data_blue = d['blue']
cursor.execute('SELECT * FROM history WHERE id = %s', (data_id + 1,))
result = cursor.fetchall()[0]
result_blue = result['blue']
blue_list = get_blue_rate(_date_id, _range)
index = [blue[0] for blue in blue_list.items()].index(result_blue) + 1
print(
f"{_date_id}: 篮球为{data_blue} 预测下期{json.dumps(dict_rate(blue_list))} 下期篮球实际为{result_blue} 预测索引位置 {index}")
return index
def get_all_history_result(_date_id, _range):
index_rate = {}
conn = pymysql.connect(**db)
cursor = conn.cursor(DictCursor)
# 获取指定一期的id和篮球号码
cursor.execute('SELECT id FROM history WHERE dateId = %s', (_date_id,))
data_id = cursor.fetchall()[0]['id']
if data_id > 1000:
cursor.execute('SELECT dateId FROM history WHERE id <= %s AND id > 1000', (data_id,))
all_date_id_list = cursor.fetchall()
print(len(all_date_id_list))
for _id in all_date_id_list:
_index = str(get_result(_id['dateId'], _range))
if _index in index_rate.keys():
index_rate[_index] += 1
else:
index_rate[_index] = 1
else:
print('采样数据不足')
index_rate_result = dict_sort(index_rate, 'val', True)
print('篮球出现在每次预测的索引位置规律:')
print(json.dumps(dict_rate(index_rate_result)))
if __name__ == "__main__":
# get_result('2006136', 1)
get_all_history_result('2019101', 1)

@ -0,0 +1,31 @@
import json
import pymysql
new = {
'id': '2023015',
'date': '2023-02-12',
'red': '["02", "03", "14", "21", "29", "32"]',
'blue': '08'
}
db = {
'host': 'home.rogersun.online',
'user': 'root',
'password': 'Sxzgx1209',
'database': 'lottery'
}
conn = pymysql.connect(**db)
cursor = conn.cursor()
# with open('num_new.txt', 'r') as f:
# lines = f.readlines()
# for line in lines:
# j = json.loads(line)
# print(j)
# cursor.execute('INSERT INTO history (`dateId`,`openDate`,`red`,`blue`) VALUES (%s,%s,%s,%s)',
# (j['id'], j['date'], json.dumps(j['red']), j['blue']))
# f.close()
cursor.execute('INSERT INTO history (`dateId`,`openDate`,`red`,`blue`) VALUES (%s,%s,%s,%s)',
(new['id'], new['date'], new['red'], new['blue']))
cursor.close()
conn.commit()

@ -0,0 +1,54 @@
import numpy
import datetime
def new_num(max_num, random_num):
num = {}
d = [i for i in range(1, max_num + 1)]
n = 100000
while n > 0:
result = numpy.random.choice(d, random_num, replace=False)
# print(sorted(r))
for r in result:
if r in num.keys():
num[r] += 1
else:
num[r] = 0
n -= 1
sorted_result = sorted(num.items(), key=lambda x: x[0])
red_result_dict = {}
for item in sorted_result:
red_result_dict[item[0]] = item[1]
# print(red_result_dict)
sorted_result_by_value = sorted(num.items(), key=lambda x: x[1], reverse=True)
# print(sorted_result_by_value)
return sorted_result_by_value
def get_num(num, choice, count):
choice_list = [c[0] for c in choice[0:count]]
red_list = [num[c - 1][0] for c in choice_list]
return sorted(red_list)
def print_num(color, num):
if color == 'red':
print('红球')
elif color == 'blue':
print('篮球')
else:
print('啥也不是!')
print(', '.join([str(i) for i in num]))
def print_time():
print('时间:' + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
if __name__ == "__main__":
print_time()
print_num('red', get_num(new_num(33, 6), new_num(33, 6), 6))
print_num('blue', get_num(new_num(16, 1), new_num(16, 1), 1))

@ -0,0 +1,46 @@
import json
import time
import requests as req
from bs4 import BeautifulSoup
import lxml
import copy
base_url = 'http://kaijiang.zhcw.com/zhcw/html/ssq/list'
data = {'date': '', 'id': '', 'red': [], 'blue': ''}
data_list = []
for n in range(0, 148):
time.sleep(0.5)
if n == 0:
u = base_url + '.html'
else:
u = base_url + '_' + str(n + 1) + '.html'
print(u)
response = req.get(u)
# print(response.text)
soup = BeautifulSoup(response.text, 'lxml')
table = soup.find('table')
item = table.findAll('tr')
for i in item:
# print(i)
tds = i.findAll('td')
# print(tds)
ems = i.findAll('em')
# print(ems)
if len(tds) and len(ems):
d = copy.deepcopy(data)
d['date'] = tds[0].text
d['id'] = tds[1].text
d['red'] = [e.text for e in ems[0:6]]
d['blue'] = ems[6].text
data_list.append(json.dumps(d)+'\n')
data_list.reverse()
with open('num_new.txt', 'a') as f:
f.writelines(data_list)
f.close()
Loading…
Cancel
Save