import datetime import json import requests from django.shortcuts import render from django.http.response import JsonResponse from mock.utils import mock_service from mock.models import ZZMockModel from update.models import Cinema from django.views.decorators.csrf import csrf_exempt from pymysql.cursors import DictCursor import pymysql from django.core.management.commands.runserver import Command as runserver import socket from mock.mock_templates import * from mock.cinema_sql import * from dingxin_toolbox_drf.settings import CONFIG from env import ENV ZZ_URL = 'https://zzcs.yinghezhong.com' mock_config = dict() json_data = dict() # 用于将前端的接口名转换成后端的 api_dict = { 'download_film_info': 'downloadFilmInfo', 'get_cinema_info': 'getCinemaInfo', 'get_screen_info': 'getScreenInfo', 'report_ticket': 'reportTicket', 'report_film_schedule': 'reportFilmSchedule', 'upload_screen_seat_info': 'uploadScreenSeatInfo', 'get_overtime_ticket_status': 'getOvertimeTicketStatus', 'valid_error': 'validError' } # 数据上报 # 1、票房数据上报接口 POST /report/reportTicket @csrf_exempt def report_ticket(request): print(request.GET) ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.reportTicket is True: if customer_json := json_data.get('reportTicket'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'reportTicket') else: return mock_service.bypass(request) # 2、排片数据上报接口 POST /report/reportFilmSchedule @csrf_exempt def report_film_schedule(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.reportFilmSchedule is True: if customer_json := json_data.get('reportFilmSchedule'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'reportFilmSchedule') else: return mock_service.bypass(request) # 3、座位信息上报接口 POST /report/uploadScreenSeatInfo @csrf_exempt def upload_screen_seat_info(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.uploadScreenSeatInfo is True: if customer_json := json_data.get('uploadScreenSeatInfo'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'uploadScreenSeatInfo') else: return mock_service.bypass(request) # 数据下载 # 1、影片信息下载接口 GET /data/downloadFilmInfo def download_film_info(request): ip = request.META.get('REMOTE_ADDR') print('REMOTE_ADDR', ip) mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() print(mock_service_switch) if mock_service_switch.downloadFilmInfo is True: if customer_json := json_data.get('downloadFilmInfo'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'downloadFilmInfo') else: return mock_service.bypass(request) # 2、影院信息下载接口 GET /data/getCinemaInfo def get_cinema_info(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.getCinemaInfo is True: if customer_json := json_data.get('getCinemaInfo'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'getCinemaInfo') else: return mock_service.bypass(request) # 3、影厅信息下载接口 GET /data/getScreenInfo def get_screen_info(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.getScreenInfo is True: if customer_json := json_data.get('getScreenInfo'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'getScreenInfo') else: return mock_service.bypass(request) # 超时票务审批 # 1、超时票务受理情况查询接口 GET /data/getOvertimeTicketStatus # 参数 场次id 审核状态 0不通过 1通过 2待审核 99待提交 def get_overtime_ticket_status(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.getOvertimeTicketStatus is True: if customer_json := json_data.get('getOvertimeTicketStatus'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'getOvertimeTicketStatus', request=request) else: return mock_service.bypass(request) # 数据查询 # 2、数据清洗错误查询接口 POST /query/validError @csrf_exempt def valid_error(request): ip = request.META.get('REMOTE_ADDR') mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() if mock_service_switch.validError is True: if mock_service_switch.errorCode == '0': if customer_json := json_data.get('validError'): return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) return mock_service.mock(ip, 'validError', request=request) else: return mock_service.bypass(request) # 封装数据库操作 def db(ip, sql, params=None): cinema_data = Cinema.objects.filter(ip=ip).first() db_config = { 'host': cinema_data.ip, 'user': cinema_data.db_user, 'password': cinema_data.db_pwd, 'database': 'cine', 'connect_timeout': 5, } db_conn = pymysql.Connect(**db_config) db_cursor = db_conn.cursor(cursor=DictCursor) result, data = 0, [] if sql.startswith('UPDATE'): result = db_cursor.execute(sql, params) data = db_cursor.fetchall() db_conn.commit() if sql.startswith('SELECT'): if params is None: result = db_cursor.execute(sql) else: result = db_cursor.execute(sql, params) data = db_cursor.fetchall() return {'result': result, 'data': data} # 获取过场信息 def get_overtime_show(request): ip = request.GET.dict().get('ip') # 获取过场售票 db_result = db(ip, FROM_SELL_ADD_GET_SHOW_SQL) sell_show_data = db_result['data'] sell_show_list = [] for show in sell_show_data: show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S') hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失' sell_show_list.append({'id': show['show_id'], 'show': show['name'] + ' ' + show_time + ' ' + hall }) # 获取过场退票场次 db_result = db(ip, FROM_REFUND_GET_SHOW_SQL) refund_show_data = db_result['data'] refund_show_list = [] for show in refund_show_data: show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S') hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失' refund_show_list.append({'id': show['show_id'], 'show': show['name'] + ' ' + show_time + ' ' + hall }) # 专资接口 # operation 1售票 2退票 # status 0不通过 1通过 2待审核 99待提交 audit_status = [{'key': '0', 'status': '不通过'}, {'key': '1', 'status': '通过'}, {'key': '2', 'status': '待审核'}, {'key': '99', 'status': '待提交'}] return JsonResponse({'sell': sell_show_list, 'refund': refund_show_list, 'status': audit_status}, json_dumps_params={'ensure_ascii': False}) # 获取mock数据的json def get_response(request): _ip = request.GET.dict().get('ip') _api = request.GET.dict().get('api') _api = api_dict[_api] if _api == 'getOvertimeTicketStatus': req = mock_service.mock(_ip, _api, request=request, type='getJson') elif _api == 'validError': req = mock_service.mock(_ip, _api, request=request, type='getJson') else: req = mock_service.mock(_ip, _api) return req # 获取本机 IP 地址: def get_local_ip(): ip = CONFIG[ENV]['SERVER_IP'] if CONFIG[ENV]['SERVER_IP'] == '': r = runserver() ip = socket.gethostbyname(socket.gethostname()) print(ip) return ip # { # 'reportTicket': '', # 'reportFilmSchedule': '', # 'uploadScreenSeatInfo': '', # 'downloadFilmInfo': '', # 'getCinemaInfo': '', # 'getScreenInfo': '', # 'getOvertimeTicketStatus': '', # 'validError': '', # 'auditShowId': '', # 'auditStatus': '' # } # 设置超时售退票设置 def set_overtime_config(request): ip = request.GET.dict().get('ip') cinema = ZZMockModel.objects.filter(ip=ip).first() cinema.auditShowId = request.GET.dict().get('show_id') cinema.auditStatus = request.GET.dict().get('audit_status') cinema.save() return JsonResponse({'success': True}) # 设置验票接口的状态 def set_error_code_config(request): ip = request.GET.dict().get('ip') cinema = ZZMockModel.objects.filter(ip=ip).first() cinema.errorCode = request.GET.dict().get('error_code') cinema.save() return JsonResponse({'success': True}) # 启动mock @csrf_exempt def enable_mock(request): ip = request.GET.get('ip') req = json.loads(request.body) req_mock_config = req['mock_config'] req_json_data = req['json_data'] for k, v in req_mock_config.items(): mock_config[api_dict[k]] = v for k, v in req_json_data.items(): json_data[api_dict[k]] = v print(mock_config) print(json_data) server_ip = get_local_ip() current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val'] if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com': db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, (f'http://{server_ip}:8000',)) if db_result['result'] > 0: ZZMockModel.objects.filter(ip=ip).update(**mock_config) return JsonResponse({'status': 'success'}) if current_host == f'http://{server_ip}:8000': ZZMockModel.objects.filter(ip=ip).update(**mock_config) return JsonResponse({'status': 'success'}) return JsonResponse({'status': 'fail'}) # 停用mock服务 def disable_mock(request): ip = request.GET.get('ip') server_ip = get_local_ip() current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val'] if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com': return JsonResponse({'status': 'success'}) if current_host == f'http://{server_ip}:8000': db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, ('https://zzcs.yinghezhong.com',)) if db_result['result'] > 0: return JsonResponse({'status': 'success'}) return JsonResponse({'status': 'fail'})