from django.http import JsonResponse from django.db.models import Q from django.views.decorators.csrf import csrf_exempt from ai.models import * import datetime import json import pandas as pd import csv from io import StringIO from ai.utils.show_database import GetData from ai.utils.show_process import show_main_process from django_redis import get_redis_connection # Create your views here. # 手动触发ai排片 @csrf_exempt def manual_general_show(request): result = show_main_process() result_dict = { 'status': 'success' if result else 'fail', 'message': '生成成功' if result else '生成失败', } return JsonResponse(result_dict, json_dumps_params={'ensure_ascii': False}) # 更新指定日期的排片 @csrf_exempt def get_cinema_show_result(request): zz_code = request.GET.dict().get('cinema_code') show_date = request.GET.dict().get('show_date') print(zz_code, show_date) show, income = get_cinema_show_result_func(zz_code, show_date) result_dict = { 'status': 'success', 'message': '生成成功', 'show': show, 'income': income, } return JsonResponse(result_dict, json_dumps_params={'ensure_ascii': False}) def get_cinema_show_result_func(_zz_code, _show_date): cinema = TestCinema.objects.filter(zz_code=_zz_code).first() start = datetime.datetime.strftime(datetime.datetime.strptime(_show_date, '%Y-%m-%d') + datetime.timedelta(hours=6), '%Y-%m-%d %H:%M:%S') end = datetime.datetime.strftime( datetime.datetime.strptime(_show_date, '%Y-%m-%d') + datetime.timedelta(hours=29, minutes=59, seconds=59), '%Y-%m-%d %H:%M:%S') show = '' income = '' if cinema: print(cinema) data = GetData(cinema) show = data.get_show_data(start, end) income = data.get_total_income(start, end) print(show, income) try: AiShow.objects.create( cinema=cinema.name, zz_code=cinema.zz_code, show_date=_show_date, is_ai_show=False, show=show, sales=income, prompt='', result='', message='', take_times=0, take_tokens='0' ) except Exception as e: print(e) return show, income # 清除redis锁 @csrf_exempt def clear_lock(request): redis_conn = get_redis_connection() redis_key = f'ai_show{datetime.date.today().strftime("%Y%m%d")}' if redis_conn.exists(redis_key): redis_conn.delete(redis_key) result_dict = { 'status': 'success', 'message': '完成', } return JsonResponse(result_dict, json_dumps_params={'ensure_ascii': False}) @csrf_exempt def report(request): zz_code = request.GET.dict().get('cinema_code') show_date = request.GET.dict().get('show_date') print(zz_code, show_date) cinema = TestCinema.objects.filter(zz_code=zz_code).first() # 获取真实排片和销售数据 check_point = datetime.datetime.strftime( datetime.datetime.strptime(show_date, '%Y-%m-%d') + datetime.timedelta(hours=30), '%Y-%m-%d %H:%M:%S') last_real_data = AiShow.objects.filter(Q(is_ai_show=False) & Q(zz_code=zz_code) & Q(show_date=show_date) & Q(created_at__gt=check_point)).order_by('-id').first() if not last_real_data: get_cinema_show_result_func(zz_code, show_date) last_ai_data = AiShow.objects.filter(Q(is_ai_show=True) & Q(zz_code=zz_code) & Q(show_date=show_date)).order_by('-id').first() last_real_data = AiShow.objects.filter(Q(is_ai_show=False) & Q(zz_code=zz_code) & Q(show_date=show_date)).order_by('-id').first() ai_dict = scv_to_obj(last_ai_data.show) real_dict = scv_to_obj(last_real_data.show) print(ai_dict, real_dict) # 获取影厅数据 hall = CinemaHall.objects.filter(cinema_code=zz_code).all() # 准备数据 data = dict() for h in hall: hall_dict = {'hall_name': h.hall_name, 'hall_id': h.hall_id, 'ai': [], 'real': []} for ai in ai_dict: if ai['影厅id'] == str(h.hall_id): hall_dict['ai'].append({ 'movie_name': ai['影片别名'], 'movie_id': ai['本地影片id'], 'language': ai['语言'], 'show_date': ai['放映日期'], 'start': ai['开始时间'], 'end': ai['结束时间'], 'length': ai['片长'], 'duration': ai['场间'] }) for real in real_dict: if real['影厅id'] == str(h.hall_id): hall_dict['real'].append({ 'movie_name': real['影片别名'], 'movie_id': real['本地影片id'], 'language': real['语言'], 'show_date': real['放映日期'], 'start': real['开始时间'], 'end': real['结束时间'], 'length': real['片长'], 'duration': real['场间'] }) data[h.hall_id] = hall_dict # 格式化输出 output_list = [] output_csv = [ ['影厅别名', '影片别名', '语言', '开始时间', '结束时间', '片长', '场间', '', '影厅别名', '影片别名', '语言', '开始时间', '结束时间', '片长', '场间']] for v in data.values(): if len(v['ai']) >= len(v['real']): for i in range(len(v['ai'])): if i < len(v['real']): a = v['ai'][i] r = v['real'][i] output_list.append( [v['hall_name'], a['show_date'], a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) else: a = v['ai'][i] output_list.append( [v['hall_name'], a['show_date'], a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', '', '', '', '', '', ''] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', '', '', '', '', '', '', ''] ) else: for i in range(len(v['real'])): if i < len(v['ai']): a = v['ai'][i] r = v['real'][i] output_list.append( [v['hall_name'], r['show_date'], a['movie_name'], a['movie_id'], a['language'], a['start'], a['end'], a['length'], a['duration'], r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], a['movie_name'], a['language'], a['start'], a['end'], a['length'], a['duration'], '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) else: r = v['real'][i] output_list.append( [v['hall_name'], r['show_date'], '', '', '', '', '', '', '', r['movie_name'], r['movie_id'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) output_csv.append( [v['hall_name'], '', '', '', '', '', '', '', v['hall_name'], r['movie_name'], r['language'], r['start'], r['end'], r['length'], r['duration']] ) # 返回 result_dict = { 'status': 'success', 'message': '完成', 'data': { 'cinema_name': last_ai_data.cinema, 'zz_code': last_ai_data.zz_code, 'real_sales': last_real_data.sales, 'ai_sales': last_ai_data.sales, 'take_times': last_ai_data.take_times, 'take_tokens': last_ai_data.take_tokens, 'csv': list_to_csv(output_csv), 'objects': output_list, 'prompt': last_ai_data.prompt } } print(result_dict) return JsonResponse(result_dict, json_dumps_params={'ensure_ascii': False}) # 将csv转化成字典 def scv_to_obj(csv_data): f = StringIO(csv_data) reader = csv.DictReader(f) return list(reader) def list_to_csv(rows, include_header=True): """ 将嵌套列表转换为 CSV 字符串 :param rows: 输入数据,格式为 [['col1','col2'], ['val1','val2'], ...] :param include_header: 若为 True,则 rows[0] 作为表头;否则全部当作数据行 """ output = StringIO() writer = csv.writer(output, lineterminator='\r\n') if include_header and rows: writer.writerow(rows[0]) writer.writerows(rows[1:]) else: writer.writerows(rows) return output.getvalue()