import datetime import csv import json from ai.models import * from io import StringIO from django_redis import get_redis_connection from ai.models import CinemaHall # 清除redis锁 def clear_lock_func(): redis_conn = get_redis_connection() redis_key = f'ai_show{datetime.date.today().strftime("%Y%m%d")}' if redis_conn.exists(redis_key): redis_conn.delete(redis_key) print(f'清除{redis_key}成功') # 将csv转化成字典 def csv_to_obj(csv_data): f = StringIO(csv_data) reader = csv.DictReader(f) return list(reader) # 列表转csv def list_to_csv(rows, include_header=True): """ 将嵌套列表转换为 CSV 字符串 :param rows: 输入数据,格式为 [['col1','col2'], ['val1','val2'], ...] :param include_header: 若为 True,则 rows[0] 作为表头;否则全部当作数据行 """ output = StringIO() writer = csv.writer(output, lineterminator='\r\n') if include_header and rows: writer.writerow(rows[0]) writer.writerows(rows[1:]) else: writer.writerows(rows) return output.getvalue() # 格式化数据 def format_show_data(cinema_code: str, show_date: str, data_left: dict, data_right: dict): # 获取影厅数据 hall = CinemaHall.objects.filter(cinema_code=cinema_code).all() # 准备数据 data = dict() for h in hall: hall_dict = {'hall_name': h.hall_name, 'hall_id': h.hall_id, 'ai': [], 'real': []} for ai in data_left: if ai['影厅id'] == str(h.hall_id): hall_dict['ai'].append({ 'movie_name': ai['影片别名'], 'movie_id': ai['本地影片id'], 'language': ai['语言'], 'show_date': ai['放映日期'], 'start': ai['开始时间'], 'end': ai['结束时间'], 'length': ai['片长'], 'duration': ai['场间'] }) for real in data_right: if real['影厅id'] == str(h.hall_id): hall_dict['real'].append({ 'movie_name': real['影片别名'], 'movie_id': real['本地影片id'], 'language': real['语言'], 'show_date': real['放映日期'], 'start': real['开始时间'], 'end': real['结束时间'], 'length': real['片长'], 'duration': real['场间'] }) data[h.hall_id] = hall_dict # 格式化输出 output_list = [] output_csv = [ ['影厅别名', '影片别名', '语言', '开始时间', '结束时间', '片长', '场间', '', '影厅别名', '影片别名', '语言', '开始时间', '结束时间', '片长', '场间']] for v in data.values(): if len(v['ai']) == 0 and len(v['real']) == 0: output_list.append([v['hall_name'], show_date, '', '', '', '', '', '', '', '', '', '', '', '', '', '']) output_csv.append([v['hall_name'], '', '', '', '', '', '', '', v['hall_name'], '', '', '', '', '', '']) if len(v['ai']) >= len(v['real']): for i in range(len(v['ai'])): if i < len(v['real']): a = v['ai'][i] r = v['real'][i] output_list.append( [v['hall_name'], show_date, a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) else: a = v['ai'][i] output_list.append( [v['hall_name'], show_date, a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', '', '', '', '', '', ''] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', '', '', '', '', '', '', ''] ) else: for i in range(len(v['real'])): if i < len(v['ai']): a = v['ai'][i] r = v['real'][i] output_list.append( [v['hall_name'], show_date, a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) else: r = v['real'][i] output_list.append( [v['hall_name'], show_date, '', '', '', '', '', '', '', r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], '', '', '', '', '', '', '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) return output_list, output_csv # 获取影厅制式 def get_hall_type(cinema_code): halls = CinemaHall.objects.filter(cinema_code=cinema_code).all() hall_dict = dict() for h in halls: hall_dict[h.hall_name] = h.hall_type return hall_dict # 获取排片占比 def count_show(show_data): print('count_show', show_data) show_dict = dict() for show in show_data: if show['影片别名'] in show_dict.keys(): show_dict[show['影片别名']] += 1 else: show_dict[show['影片别名']] = 1 show_dict = dict(sorted(show_dict.items(), key=lambda item: item[1], reverse=True)) show_count = [] for show, count in show_dict.items(): show_count.append(f"{show}:{str(round(count / len(show_data) * 100, 2))}%,共排{str(count)}场") return '\n'.join(show_count) # 处理黑名单 def get_black_list(cinema_code:str): black_list = json.loads(TestCinema.objects.filter(zz_code=cinema_code).first().user_config)['movie_black_list'] black_list_bool = False black_list_str = '\' \'' if len(black_list) > 0: black_list_bool = True black_list_str = ','.join(['%s'] * len(black_list)) return black_list_bool, black_list_str, black_list