You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
296 lines
11 KiB
296 lines
11 KiB
import datetime |
|
import json |
|
|
|
import requests |
|
from django.shortcuts import render |
|
from django.http.response import JsonResponse |
|
from mock.utils import mock_service |
|
from mock.models import ZZMockModel |
|
from update.models import Cinema |
|
from django.views.decorators.csrf import csrf_exempt |
|
from pymysql.cursors import DictCursor |
|
import pymysql |
|
from django.core.management.commands.runserver import Command as runserver |
|
import socket |
|
from mock.mock_templates import * |
|
from mock.cinema_sql import * |
|
from dingxin_toolbox_drf.settings import CONFIG |
|
from env import ENV |
|
|
|
ZZ_URL = 'https://zzcs.yinghezhong.com' |
|
mock_config = dict() |
|
json_data = dict() |
|
|
|
# 用于将前端的接口名转换成后端的 |
|
api_dict = { |
|
'download_film_info': 'downloadFilmInfo', |
|
'get_cinema_info': 'getCinemaInfo', |
|
'get_screen_info': 'getScreenInfo', |
|
'report_ticket': 'reportTicket', |
|
'report_film_schedule': 'reportFilmSchedule', |
|
'upload_screen_seat_info': 'uploadScreenSeatInfo', |
|
'get_overtime_ticket_status': 'getOvertimeTicketStatus', |
|
'valid_error': 'validError' |
|
} |
|
|
|
|
|
# 数据上报 |
|
# 1、票房数据上报接口 POST /report/reportTicket |
|
@csrf_exempt |
|
def report_ticket(request): |
|
print(request.GET) |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.reportTicket is True: |
|
if customer_json := json_data.get('reportTicket'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'reportTicket') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 2、排片数据上报接口 POST /report/reportFilmSchedule |
|
@csrf_exempt |
|
def report_film_schedule(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.reportFilmSchedule is True: |
|
if customer_json := json_data.get('reportFilmSchedule'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'reportFilmSchedule') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 3、座位信息上报接口 POST /report/uploadScreenSeatInfo |
|
@csrf_exempt |
|
def upload_screen_seat_info(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.uploadScreenSeatInfo is True: |
|
if customer_json := json_data.get('uploadScreenSeatInfo'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'uploadScreenSeatInfo') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 数据下载 |
|
# 1、影片信息下载接口 GET /data/downloadFilmInfo |
|
def download_film_info(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
print(ip) |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
print(mock_service_switch) |
|
if mock_service_switch.downloadFilmInfo is True: |
|
if customer_json := json_data.get('downloadFilmInfo'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'downloadFilmInfo') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 2、影院信息下载接口 GET /data/getCinemaInfo |
|
def get_cinema_info(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.getCinemaInfo is True: |
|
if customer_json := json_data.get('getCinemaInfo'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'getCinemaInfo') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 3、影厅信息下载接口 GET /data/getScreenInfo |
|
def get_screen_info(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.getScreenInfo is True: |
|
if customer_json := json_data.get('getScreenInfo'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'getScreenInfo') |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 超时票务审批 |
|
# 1、超时票务受理情况查询接口 GET /data/getOvertimeTicketStatus |
|
# 参数 场次id 审核状态 0不通过 1通过 2待审核 99待提交 |
|
def get_overtime_ticket_status(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.getOvertimeTicketStatus is True: |
|
if customer_json := json_data.get('getOvertimeTicketStatus'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'getOvertimeTicketStatus', request=request) |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 数据查询 |
|
# 2、数据清洗错误查询接口 POST /query/validError |
|
@csrf_exempt |
|
def valid_error(request): |
|
ip = request.META.get('REMOTE_ADDR') |
|
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first() |
|
if mock_service_switch.validError is True: |
|
if mock_service_switch.errorCode == '0': |
|
if customer_json := json_data.get('validError'): |
|
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False}) |
|
return mock_service.mock(ip, 'validError', request=request) |
|
else: |
|
return mock_service.bypass(request) |
|
|
|
|
|
# 封装数据库操作 |
|
def db(ip, sql, params=None): |
|
cinema_data = Cinema.objects.filter(ip=ip).first() |
|
db_config = { |
|
'host': cinema_data.ip, |
|
'user': cinema_data.db_user, |
|
'password': cinema_data.db_pwd, |
|
'database': 'cine', |
|
'connect_timeout': 5, |
|
} |
|
db_conn = pymysql.Connect(**db_config) |
|
db_cursor = db_conn.cursor(cursor=DictCursor) |
|
result, data = 0, [] |
|
if sql.startswith('UPDATE'): |
|
result = db_cursor.execute(sql, params) |
|
data = db_cursor.fetchall() |
|
db_conn.commit() |
|
if sql.startswith('SELECT'): |
|
if params is None: |
|
result = db_cursor.execute(sql) |
|
else: |
|
result = db_cursor.execute(sql, params) |
|
data = db_cursor.fetchall() |
|
return {'result': result, 'data': data} |
|
|
|
|
|
# 获取过场信息 |
|
def get_overtime_show(request): |
|
ip = request.GET.dict().get('ip') |
|
# 获取过场售票 |
|
db_result = db(ip, FROM_SELL_ADD_GET_SHOW_SQL) |
|
sell_show_data = db_result['data'] |
|
sell_show_list = [] |
|
for show in sell_show_data: |
|
show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S') |
|
hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失' |
|
sell_show_list.append({'id': show['show_id'], |
|
'show': show['name'] + ' ' + show_time + ' ' + hall |
|
}) |
|
# 获取过场退票场次 |
|
|
|
db_result = db(ip, FROM_REFUND_GET_SHOW_SQL) |
|
refund_show_data = db_result['data'] |
|
refund_show_list = [] |
|
for show in refund_show_data: |
|
show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S') |
|
hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失' |
|
refund_show_list.append({'id': show['show_id'], |
|
'show': show['name'] + ' ' + show_time + ' ' + hall |
|
}) |
|
# 专资接口 # operation 1售票 2退票 # status 0不通过 1通过 2待审核 99待提交 |
|
audit_status = [{'key': '0', 'status': '不通过'}, {'key': '1', 'status': '通过'}, {'key': '2', 'status': '待审核'}, |
|
{'key': '99', 'status': '待提交'}] |
|
return JsonResponse({'sell': sell_show_list, 'refund': refund_show_list, 'status': audit_status}, |
|
json_dumps_params={'ensure_ascii': False}) |
|
|
|
|
|
# 获取mock数据的json |
|
def get_response(request): |
|
_ip = request.GET.dict().get('ip') |
|
_api = request.GET.dict().get('api') |
|
_api = api_dict[_api] |
|
if _api == 'getOvertimeTicketStatus': |
|
req = mock_service.mock(_ip, _api, request=request, type='getJson') |
|
elif _api == 'validError': |
|
req = mock_service.mock(_ip, _api, request=request, type='getJson') |
|
else: |
|
req = mock_service.mock(_ip, _api) |
|
return req |
|
|
|
|
|
# 获取本机 IP 地址: |
|
def get_local_ip(): |
|
ip = CONFIG[ENV]['SERVER_IP'] |
|
if CONFIG[ENV]['SERVER_IP'] == '': |
|
r = runserver() |
|
ip = socket.gethostbyname(socket.gethostname()) |
|
print(ip) |
|
return ip |
|
|
|
|
|
# { |
|
# 'reportTicket': '', |
|
# 'reportFilmSchedule': '', |
|
# 'uploadScreenSeatInfo': '', |
|
# 'downloadFilmInfo': '', |
|
# 'getCinemaInfo': '', |
|
# 'getScreenInfo': '', |
|
# 'getOvertimeTicketStatus': '', |
|
# 'validError': '', |
|
# 'auditShowId': '', |
|
# 'auditStatus': '' |
|
# } |
|
|
|
# 设置超时售退票设置 |
|
def set_overtime_config(request): |
|
ip = request.GET.dict().get('ip') |
|
cinema = ZZMockModel.objects.filter(ip=ip).first() |
|
cinema.auditShowId = request.GET.dict().get('show_id') |
|
cinema.auditStatus = request.GET.dict().get('audit_status') |
|
cinema.save() |
|
return JsonResponse({'success': True}) |
|
|
|
|
|
# 设置验票接口的状态 |
|
def set_error_code_config(request): |
|
ip = request.GET.dict().get('ip') |
|
cinema = ZZMockModel.objects.filter(ip=ip).first() |
|
cinema.errorCode = request.GET.dict().get('error_code') |
|
cinema.save() |
|
return JsonResponse({'success': True}) |
|
|
|
|
|
# 启动mock |
|
@csrf_exempt |
|
def enable_mock(request): |
|
ip = request.GET.get('ip') |
|
req = json.loads(request.body) |
|
req_mock_config = req['mock_config'] |
|
req_json_data = req['json_data'] |
|
for k, v in req_mock_config.items(): |
|
mock_config[api_dict[k]] = v |
|
for k, v in req_json_data.items(): |
|
json_data[api_dict[k]] = v |
|
print(mock_config) |
|
print(json_data) |
|
server_ip = get_local_ip() |
|
current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val'] |
|
if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com': |
|
db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, (f'http://{server_ip}:8000',)) |
|
if db_result['result'] > 0: |
|
ZZMockModel.objects.filter(ip=ip).update(**mock_config) |
|
return JsonResponse({'status': 'success'}) |
|
if current_host == f'http://{server_ip}:8000': |
|
ZZMockModel.objects.filter(ip=ip).update(**mock_config) |
|
return JsonResponse({'status': 'success'}) |
|
return JsonResponse({'status': 'fail'}) |
|
|
|
|
|
# 停用mock服务 |
|
def disable_mock(request): |
|
ip = request.GET.get('ip') |
|
server_ip = get_local_ip() |
|
current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val'] |
|
if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com': |
|
return JsonResponse({'status': 'success'}) |
|
if current_host == f'http://{server_ip}:8000': |
|
db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, ('https://zzcs.yinghezhong.com',)) |
|
if db_result['result'] > 0: |
|
return JsonResponse({'status': 'success'}) |
|
return JsonResponse({'status': 'fail'})
|
|
|