import pymysql from pymysql.cursors import DictCursor from ai.models import * from ai.models import TestCinema from ai.utils.sql import * import datetime import pandas as pd import json class GetData: def __init__(self, cinema): self.conn = pymysql.connect(**json.loads(cinema.db_config)) self.cur = self.conn.cursor(cursor=DictCursor) self.zz_code = cinema.zz_code def exit(self): try: self.cur.close() self.conn.close() except Exception as e: print(e) # 获取指定日期范围的排片数据 def get_show_data(self, start_datetime, end_datetime): # print(self.cur.mogrify(GET_SHOW_DATA, (start_datetime, end_datetime))) self.cur.execute(GET_SHOW_DATA, (start_datetime, end_datetime)) show_data = self.cur.fetchall() show_data_mapping = { 'hall_name': '影厅别名', 'hall_id': '影厅id', 'movie_name': '影片别名', 'movie_id': '本地影片id', 'language': '语言', 'show_date': '放映日期', 'start_time': '开始时间', 'end_time': '结束时间', 'length': '片长', 'duration': '场间', } return self.format_to_csv(show_data, show_data_mapping) # 获取指定范围的影片销售数据 def get_sell_data(self, start_datetime, end_datetime): # print(self.cur.mogrify(GET_SELL_DATA, (start_datetime, end_datetime))) self.cur.execute(GET_SELL_DATA, (start_datetime, end_datetime, start_datetime, end_datetime)) sell_data = self.cur.fetchall() sell_data_mapping = { 'movie_name': '影片别名', 'movie_id': '本地影片id', 'show_date': '放映日期', 'start_time': '开始时间', 'hall_name': '影厅别名', 'hall_id': '影厅id', 'total_count': '场次观影人数', 'total_income': '场次收入', 'seat_num': '影厅座位数', 'average_price': '平均价', 'rate': '上座率(单位%)', } return self.format_to_csv(sell_data, sell_data_mapping) # 获取指定范围的影片销售数据 def get_total_income(self, start_datetime, end_datetime): # print(self.cur.mogrify(GET_TOTAL_INCOME, (start_datetime, end_datetime,start_datetime, end_datetime))) self.cur.execute(GET_TOTAL_INCOME, (start_datetime, end_datetime, start_datetime, end_datetime)) return self.cur.fetchone()['total_income'] # 获取指定日期的可放映影片 def get_available_movie(self, date): start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59, seconds=59) end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=6) self.cur.execute(GET_AVAILABLE_MOVIE, (start_datetime, end_datetime)) available_movie = self.cur.fetchall() available_movie_list = [ f"\t《{m['cinema_movie_alias']}》 - 影片id:{m['cinema_movie_id']},影片时长:{m['cinema_movie_time']},语言:{m['language']},制式:{m['media_type']}, 最早排片开始时间:{m['cinema_movie_start_datetime']}, 最晚排片截止时间:{m['cinema_movie_end_datetime']};" for m in available_movie] return '\n'.join(available_movie_list) # 获取指定日期的可用于排片的影片 def get_handle_movie(self, date): start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59, seconds=59) end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=6) black_list = json.loads(TestCinema.objects.filter(zz_code=self.zz_code).first().user_config)['movie_black_list'] print(black_list) sql = GET_HANDLE_MOVIE + ','.join(['%s']*len(black_list)) + ');' self.cur.execute(sql, (start_datetime, end_datetime, *black_list)) # print(self.cur.mogrify(sql, (start_datetime, end_datetime, *black_list))) available_movie = self.cur.fetchall() available_movie_list = [ f"\t《{m['cinema_movie_alias']}》 - 影片id:{m['cinema_movie_id']},影片时长:{m['cinema_movie_time']},语言:{m['language']},制式:{m['media_type']}, 最早排片开始时间:{m['cinema_movie_start_datetime']}, 最晚排片截止时间:{m['cinema_movie_end_datetime']};" for m in available_movie] return '\n'.join(available_movie_list) # 获取新上映的影片 def get_new_movie(self, date): start_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') end_datetime = datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59, seconds=59) self.cur.execute(GET_NEW_MOVIE, (start_datetime, end_datetime)) new = self.cur.fetchall() new_list = [f"《{m['cinema_movie_alias']}》" for m in new] return new_list # 获取影厅支持的制式 def get_hall_media_type(self): self.cur.execute(GET_HALL_MEDIA_TYPE) hall_data = self.cur.fetchall() # print(hall_data) hall_dict = dict() # 处理成对应的字典格式 {"影厅别名": {"hall_id":影厅id,"media_type": 影片制式列表}} for hall in hall_data: if hall['cinema_hall_name'] not in hall_dict.keys(): hall_dict[hall['cinema_hall_name']] = { 'hall_id': hall['cinema_hall_id'], 'media_type': [hall['media_type'], ] } else: hall_dict[hall['cinema_hall_name']]['media_type'].append(hall['media_type']) # 拼装字符串 hall_str_list = [] for hall, val in hall_dict.items(): hall_str_list.append(f"\t{hall} - 影厅id:{val['hall_id']},支持制式:{','.join(val['media_type'])};") return '\n'.join(hall_str_list) # 获取模板日期 def get_template_date(self, start_datetime, end_datetime): basic_val = 4 self.cur.execute(GET_TEMPLATE_DATE, (start_datetime, end_datetime, basic_val)) template_date = self.cur.fetchall()[0]['template_date'] return template_date # 数据转csv格式字符串 @staticmethod def format_to_csv(dict_data, mapping): # print(dict_data) new_dict = [{mapping[k]: v for k, v in d.items()} for d in dict_data] return pd.DataFrame(new_dict).to_csv(index=False) def get_media_type(self): self.cur.execute(GET_HALL_MEDIA_TYPE)