dingxin_toolbox
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

297 lines
11 KiB

11 months ago
import datetime
11 months ago
import json
11 months ago
import requests
from django.shortcuts import render
from django.http.response import JsonResponse
from mock.utils import mock_service
from mock.models import ZZMockModel
from update.models import Cinema
from django.views.decorators.csrf import csrf_exempt
from pymysql.cursors import DictCursor
import pymysql
from django.core.management.commands.runserver import Command as runserver
import socket
from mock.mock_templates import *
from mock.cinema_sql import *
from dingxin_toolbox_drf.settings import CONFIG
from env import ENV
ZZ_URL = 'https://zzcs.yinghezhong.com'
mock_config = dict()
json_data = dict()
11 months ago
# 用于将前端的接口名转换成后端的
api_dict = {
'download_film_info': 'downloadFilmInfo',
'get_cinema_info': 'getCinemaInfo',
'get_screen_info': 'getScreenInfo',
'report_ticket': 'reportTicket',
'report_film_schedule': 'reportFilmSchedule',
'upload_screen_seat_info': 'uploadScreenSeatInfo',
'get_overtime_ticket_status': 'getOvertimeTicketStatus',
'valid_error': 'validError'
}
# 数据上报
# 1、票房数据上报接口 POST /report/reportTicket
@csrf_exempt
def report_ticket(request):
11 months ago
print(request.GET)
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.reportTicket is True:
if customer_json := json_data.get('reportTicket'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
return mock_service.mock(ip, 'reportTicket')
else:
return mock_service.bypass(request)
# 2、排片数据上报接口 POST /report/reportFilmSchedule
11 months ago
@csrf_exempt
def report_film_schedule(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.reportFilmSchedule is True:
if customer_json := json_data.get('reportFilmSchedule'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
11 months ago
return mock_service.mock(ip, 'reportFilmSchedule')
else:
return mock_service.bypass(request)
# 3、座位信息上报接口 POST /report/uploadScreenSeatInfo
11 months ago
@csrf_exempt
def upload_screen_seat_info(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.uploadScreenSeatInfo is True:
if customer_json := json_data.get('uploadScreenSeatInfo'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
11 months ago
return mock_service.mock(ip, 'uploadScreenSeatInfo')
else:
return mock_service.bypass(request)
# 数据下载
# 1、影片信息下载接口 GET /data/downloadFilmInfo
11 months ago
def download_film_info(request):
ip = request.META.get('REMOTE_ADDR')
8 months ago
print('REMOTE_ADDR', ip)
11 months ago
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
8 months ago
print(mock_service_switch)
11 months ago
if mock_service_switch.downloadFilmInfo is True:
if customer_json := json_data.get('downloadFilmInfo'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
11 months ago
return mock_service.mock(ip, 'downloadFilmInfo')
else:
return mock_service.bypass(request)
# 2、影院信息下载接口 GET /data/getCinemaInfo
def get_cinema_info(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.getCinemaInfo is True:
if customer_json := json_data.get('getCinemaInfo'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
return mock_service.mock(ip, 'getCinemaInfo')
else:
return mock_service.bypass(request)
11 months ago
# 3、影厅信息下载接口 GET /data/getScreenInfo
def get_screen_info(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.getScreenInfo is True:
if customer_json := json_data.get('getScreenInfo'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
11 months ago
return mock_service.mock(ip, 'getScreenInfo')
else:
return mock_service.bypass(request)
# 超时票务审批
# 1、超时票务受理情况查询接口 GET /data/getOvertimeTicketStatus
# 参数 场次id 审核状态 0不通过 1通过 2待审核 99待提交
def get_overtime_ticket_status(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.getOvertimeTicketStatus is True:
if customer_json := json_data.get('getOvertimeTicketStatus'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
return mock_service.mock(ip, 'getOvertimeTicketStatus', request=request)
else:
return mock_service.bypass(request)
# 数据查询
# 2、数据清洗错误查询接口 POST /query/validError
@csrf_exempt
def valid_error(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.validError is True:
if mock_service_switch.errorCode == '0':
if customer_json := json_data.get('validError'):
return JsonResponse(customer_json, json_dumps_params={'ensure_ascii': False})
return mock_service.mock(ip, 'validError', request=request)
else:
return mock_service.bypass(request)
# 封装数据库操作
def db(ip, sql, params=None):
cinema_data = Cinema.objects.filter(ip=ip).first()
db_config = {
'host': cinema_data.ip,
'user': cinema_data.db_user,
'password': cinema_data.db_pwd,
'database': 'cine',
'connect_timeout': 5,
}
db_conn = pymysql.Connect(**db_config)
db_cursor = db_conn.cursor(cursor=DictCursor)
result, data = 0, []
if sql.startswith('UPDATE'):
result = db_cursor.execute(sql, params)
data = db_cursor.fetchall()
db_conn.commit()
if sql.startswith('SELECT'):
if params is None:
result = db_cursor.execute(sql)
else:
result = db_cursor.execute(sql, params)
data = db_cursor.fetchall()
return {'result': result, 'data': data}
# 获取过场信息
def get_overtime_show(request):
11 months ago
ip = request.GET.dict().get('ip')
# 获取过场售票
db_result = db(ip, FROM_SELL_ADD_GET_SHOW_SQL)
sell_show_data = db_result['data']
sell_show_list = []
for show in sell_show_data:
11 months ago
show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S')
hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失'
sell_show_list.append({'id': show['show_id'],
11 months ago
'show': show['name'] + ' ' + show_time + ' ' + hall
})
# 获取过场退票场次
db_result = db(ip, FROM_REFUND_GET_SHOW_SQL)
refund_show_data = db_result['data']
refund_show_list = []
for show in refund_show_data:
11 months ago
show_time = datetime.datetime.strftime(show['start_time'], '%Y-%m-%d %H:%M:%S')
hall = show['hall_name'] if show['hall_name'] is not None else '影厅信息缺失'
refund_show_list.append({'id': show['show_id'],
11 months ago
'show': show['name'] + ' ' + show_time + ' ' + hall
})
11 months ago
# 专资接口 # operation 1售票 2退票 # status 0不通过 1通过 2待审核 99待提交
audit_status = [{'key': '0', 'status': '不通过'}, {'key': '1', 'status': '通过'}, {'key': '2', 'status': '待审核'},
{'key': '99', 'status': '待提交'}]
return JsonResponse({'sell': sell_show_list, 'refund': refund_show_list, 'status': audit_status},
json_dumps_params={'ensure_ascii': False})
# 获取mock数据的json
11 months ago
def get_response(request):
_ip = request.GET.dict().get('ip')
_api = request.GET.dict().get('api')
_api = api_dict[_api]
if _api == 'getOvertimeTicketStatus':
11 months ago
req = mock_service.mock(_ip, _api, request=request, type='getJson')
elif _api == 'validError':
req = mock_service.mock(_ip, _api, request=request, type='getJson')
11 months ago
else:
req = mock_service.mock(_ip, _api)
return req
# 获取本机 IP 地址:
def get_local_ip():
ip = CONFIG[ENV]['WEB_SERVER_IP']
if CONFIG[ENV]['WEB_SERVER_IP'] == '':
r = runserver()
ip = socket.gethostbyname(socket.gethostname())
print(ip)
return ip
# {
# 'reportTicket': '',
# 'reportFilmSchedule': '',
# 'uploadScreenSeatInfo': '',
# 'downloadFilmInfo': '',
# 'getCinemaInfo': '',
# 'getScreenInfo': '',
# 'getOvertimeTicketStatus': '',
# 'validError': '',
# 'auditShowId': '',
# 'auditStatus': ''
# }
11 months ago
# 设置超时售退票设置
def set_overtime_config(request):
ip = request.GET.dict().get('ip')
cinema = ZZMockModel.objects.filter(ip=ip).first()
cinema.auditShowId = request.GET.dict().get('show_id')
cinema.auditStatus = request.GET.dict().get('audit_status')
cinema.save()
return JsonResponse({'success': True})
# 设置验票接口的状态
def set_error_code_config(request):
ip = request.GET.dict().get('ip')
cinema = ZZMockModel.objects.filter(ip=ip).first()
cinema.errorCode = request.GET.dict().get('error_code')
cinema.save()
return JsonResponse({'success': True})
# 启动mock
11 months ago
@csrf_exempt
def enable_mock(request):
11 months ago
ip = request.GET.get('ip')
req = json.loads(request.body)
req_mock_config = req['mock_config']
req_json_data = req['json_data']
11 months ago
for k, v in req_mock_config.items():
mock_config[api_dict[k]] = v
for k, v in req_json_data.items():
json_data[api_dict[k]] = v
11 months ago
print(mock_config)
print(json_data)
server_ip = get_local_ip()
current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val']
if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com':
db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, (f'http://{server_ip}',))
if db_result['result'] > 0:
ZZMockModel.objects.filter(ip=ip).update(**mock_config)
return JsonResponse({'status': 'success'})
if current_host == f'http://{server_ip}':
ZZMockModel.objects.filter(ip=ip).update(**mock_config)
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'fail'})
# 停用mock服务
def disable_mock(request):
11 months ago
ip = request.GET.get('ip')
server_ip = get_local_ip()
current_host = db(ip, GET_ZZ_PLATFORM_HOST_SQL)['data'][0]['cinema_common_info_val']
if current_host == 'https://zzcs.yinghezhong.com' or current_host == 'https://dy.yinghezhong.com':
return JsonResponse({'status': 'success'})
if current_host == f'http://{server_ip}:8000':
db_result = db(ip, UPDATE_ZZ_PLATFORM_HOST_SQL, ('https://zzcs.yinghezhong.com',))
if db_result['result'] > 0:
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'fail'})