dingxin_toolbox
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
8.0 KiB

import pymysql
from pymysql.cursors import DictCursor
from ai.models import *
from ai.models import TestCinema
from ai.utils.sql import *
from ai.utils.basic_func import *
import datetime
import pandas as pd
import json
class GetData:
def __init__(self, cinema):
self.conn = pymysql.connect(**json.loads(cinema.db_config))
self.cur = self.conn.cursor(cursor=DictCursor)
self.zz_code = cinema.zz_code
def exit(self):
try:
self.cur.close()
self.conn.close()
except Exception as e:
print(e)
# 获取指定日期范围的排片数据
def get_show_data(self, start_datetime, end_datetime):
# print(self.cur.mogrify(GET_SHOW_DATA, (start_datetime, end_datetime)))
self.cur.execute(GET_SHOW_DATA, (start_datetime, end_datetime))
show_data = self.cur.fetchall()
if len(show_data) == 0:
return False
show_data_mapping = {
'hall_name': '影厅别名',
'hall_id': '影厅id',
'movie_name': '影片别名',
'movie_id': '本地影片id',
'language': '语言',
'show_date': '放映日期',
'start_time': '开始时间',
'end_time': '结束时间',
'length': '片长',
'duration': '场间',
}
return self.format_to_csv(show_data, show_data_mapping)
# 获取指定范围的影片销售数据
def get_sell_data(self, start_datetime, end_datetime):
# print(self.cur.mogrify(GET_SELL_DATA, (start_datetime, end_datetime)))
self.cur.execute(GET_SELL_DATA, (start_datetime, end_datetime, start_datetime, end_datetime))
sell_data = self.cur.fetchall()
sell_data_mapping = {
'movie_name': '影片别名',
'movie_id': '本地影片id',
'show_date': '放映日期',
'start_time': '开始时间',
'hall_name': '影厅别名',
'hall_id': '影厅id',
'total_count': '场次观影人数',
'total_income': '场次收入',
'seat_num': '影厅座位数',
'average_price': '平均价',
'rate': '上座率(单位%',
}
return self.format_to_csv(sell_data, sell_data_mapping)
# 获取指定范围的影片销售数据
def get_total_income(self, start_datetime, end_datetime):
2 weeks ago
# print(self.cur.mogrify(GET_TOTAL_INCOME, (start_datetime, end_datetime,start_datetime, end_datetime)))
self.cur.execute(GET_TOTAL_INCOME, (start_datetime, end_datetime, start_datetime, end_datetime))
return self.cur.fetchone()['total_income']
# 获取指定日期的可放映影片
def get_available_movie(self, date):
start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59,
seconds=59)
end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=6)
self.cur.execute(GET_AVAILABLE_MOVIE, (start_datetime, end_datetime))
available_movie = self.cur.fetchall()
available_movie_list = [
f"\t{m['cinema_movie_alias']}》 - 影片id:{m['cinema_movie_id']},影片时长:{m['cinema_movie_time']},语言:{m['language']},制式:{m['media_type']}, 最早排片开始时间:{m['cinema_movie_start_datetime']}, 最晚排片截止时间:{m['cinema_movie_end_datetime']}"
for m in available_movie]
return '\n'.join(available_movie_list)
# 获取指定日期的可用于排片的影片
def get_handle_movie(self, date):
start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59,
seconds=59)
end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=6)
bl_bool, bl_str, bl_list = get_black_list(self.zz_code)
if bl_bool:
sql = GET_HANDLE_MOVIE + bl_str + ');'
self.cur.execute(sql, (start_datetime, end_datetime, *bl_list))
print(self.cur.mogrify(sql, (start_datetime, end_datetime, *bl_list)))
else:
sql = GET_HANDLE_MOVIE + bl_str + ');'
self.cur.execute(sql, (start_datetime, end_datetime))
print(self.cur.mogrify(sql, (start_datetime, end_datetime)))
available_movie = self.cur.fetchall()
available_movie_list = [
f"\t{m['cinema_movie_alias']}》 - 影片id:{m['cinema_movie_id']},影片时长:{m['cinema_movie_time']},语言:{m['language']},制式:{m['media_type']}, 最早排片开始时间:{m['cinema_movie_start_datetime']}, 最晚排片截止时间:{m['cinema_movie_end_datetime']}"
for m in available_movie]
return '\n'.join(available_movie_list)
# 获取新上映的影片
def get_new_movie(self, date):
start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d')
end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59,
seconds=59)
self.cur.execute(GET_NEW_MOVIE, (start_datetime, end_datetime))
new = self.cur.fetchall()
new_list = [f"{m['cinema_movie_alias']}" for m in new]
return new_list
# 获取影厅支持的制式
def get_hall_media_type(self):
self.cur.execute(GET_HALL_MEDIA_TYPE)
hall_data = self.cur.fetchall()
# print(hall_data)
hall_dict = dict()
# 处理成对应的字典格式 {"影厅别名": {"hall_id":影厅id,"media_type": 影片制式列表}}
for hall in hall_data:
if hall['cinema_hall_name'] not in hall_dict.keys():
hall_dict[hall['cinema_hall_name']] = {
'hall_id': hall['cinema_hall_id'],
'media_type': [hall['media_type'], ]
}
else:
hall_dict[hall['cinema_hall_name']]['media_type'].append(hall['media_type'])
# 拼装字符串
hall_str_list = []
for hall, val in hall_dict.items():
hall_str_list.append(f"\t{hall} - 影厅id:{val['hall_id']},支持制式:{''.join(val['media_type'])}")
return '\n'.join(hall_str_list)
# 获取模板日期
def get_template_date(self, start_datetime, end_datetime):
basic_val = 4
self.cur.execute(GET_TEMPLATE_DATE, (start_datetime, end_datetime, basic_val))
template_date = self.cur.fetchall()[0]['template_date']
return template_date
# 数据转csv格式字符串
@staticmethod
def format_to_csv(dict_data, mapping):
# print(dict_data)
new_dict = [{mapping[k]: v for k, v in d.items()} for d in dict_data]
return pd.DataFrame(new_dict).to_csv(index=False)
def get_media_type(self):
self.cur.execute(GET_HALL_MEDIA_TYPE)
def get_movie_info(self, date):
start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59,
seconds=59)
end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=6)
bl_bool, bl_str, bl_list = get_black_list(self.zz_code)
if bl_bool:
sql = GET_MOVIE_INFO_1 + bl_str + GET_MOVIE_INFO_2
self.cur.execute(sql, (start_datetime, end_datetime, *bl_list))
print(self.cur.mogrify(sql, (start_datetime, end_datetime, *bl_list)))
else:
sql = GET_MOVIE_INFO_1 + bl_str + GET_MOVIE_INFO_2
self.cur.execute(sql, (start_datetime, end_datetime))
print(self.cur.mogrify(sql, (start_datetime, end_datetime)))
self.cur.execute(sql, (start_datetime, end_datetime, *bl_list))
return self.cur.fetchall()