基本完成mock的后台逻辑 盲写

main
RogerWork 11 months ago
parent 28e47abe07
commit 7f005cc298
  1. 23
      mock/migrations/0002_zzmockmodel_auditsessioncode_zzmockmodel_auditstatus.py
  2. 5
      mock/mock_templates/__init__.py
  3. 142
      mock/mock_templates/download_film_info_tmp.py
  4. 125
      mock/mock_templates/get_overtime_ticket_status_tmp.py
  5. 42
      mock/mock_templates/get_screen_info_tmp.py
  6. 3
      mock/mock_templates/valid_error_tmp.py
  7. 3
      mock/models.py
  8. 5
      mock/urls.py
  9. 14
      mock/utils/mock_service.py
  10. 86
      mock/views.py

@ -0,0 +1,23 @@
# Generated by Django 4.2.7 on 2023-12-22 03:37
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mock', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='zzmockmodel',
name='auditSessionCode',
field=models.CharField(help_text='需要审核的过场场次编码', max_length=20, null=True, verbose_name='需要审核的过场场次编码'),
),
migrations.AddField(
model_name='zzmockmodel',
name='auditStatus',
field=models.CharField(help_text='需要审核的过场场次状态', max_length=3, null=True, verbose_name='需要审核的过场场次状态'),
),
]

@ -2,4 +2,7 @@ __all__ = ['get_cinema_info_tmp',
'report_ticket_tmp',
'report_film_schedule_tmp',
'upload_screen_seat_info_tmp',
'get_screen_info_tmp']
'get_screen_info_tmp',
'download_film_info_tmp',
'get_overtime_ticket_status_tmp',
'valid_error_tmp']

@ -0,0 +1,142 @@
import pymysql
from pymysql.cursors import DictCursor
def get():
mock_data = {
'message': '操作成功',
'data':
{
'filmList':
[
{
'filmCode': '001299992022',
'duration': 90,
'aliasName': None,
'keyEndtime': '2050-12-31',
'keyStarttime': '2022-01-01',
'reshowFlag': False,
'publishDate': '2023-01-01',
'filmName': 'Mock测试影片(不要排片!)',
'producer': '',
'publisher': '暂空',
'language': None,
'version': '0'
}
],
'pageable':
{
'size': 20,
'totalPages': 1,
'page': 1
}
},
'code': '200',
'status': 'success'
}
return mock_data
"""
国家编码3+ 发行版本编码1+ 当年流水编码4+ 年份4
例如 001d12342023
国家编码
001 中国
002 中国香港
003 中国台湾
011 朝鲜
012 日本
013 越南
014 泰国
015 缅甸
016 新加坡
017 印尼
018 斯里兰卡
019 印度
020 巴基斯坦
021 蒙古
022 伊拉克
023 黎巴嫩
024 土尔其
025 菲律宾
026 尼泊尔
027 叙利亚
035 澳大利亚
036 新西兰
041 埃及
042 阿尔及利亚
043 突尼斯
044 摩洛哥
051 美国
052 墨西哥
053 委内瑞拉
054 古巴
055 哥伦比亚
056 玻利维亚
058 阿根廷
059 加拿大
060 巴西
073 丹麦
076 荷兰
077 西班牙
078 意大利
079 西德
080 东德
081 瑞士
082 奥地利
083 波兰
084 捷克
085 匈牙利
086 罗马尼亚
087 保加利亚
088 南斯拉夫
089 阿尔巴尼亚
090 希腊
091 俄罗斯
092 比利时
093 瑞典
其他未列出的三位编码由影片编码原则保留
发行版本
0故事片观摩影片
1故事片普通
2故事片普通立体
3故事片IMAX
4故事片IMAX立体
5故事片胶片(进口)
6故事片其他特种电影
7故事片其他
8故事片中国巨幕
9故事片中国巨幕立体
a动画片观摩影片
b动画片普通
o动画片普通立体
d动画片IMAX
e动画片IMAX立体
f动画片胶片(进口)
g动画片其他特种电影
h动画片其他
i动画片中国巨幕
j动画片中国巨幕立体
k纪录片观摩影片
l纪录片普通
m纪录片普通立体
n纪录片IMAX
o纪录片IMAX立体
p纪录片胶片(进口
q纪录片其他特种电影
r纪录片其他
s纪录片中国巨幕
t纪录片中国巨幕立体
u科教片观摩影片
v科教片普逋
w科教片普通立体
x科教片IMAX
y科教片IMAX立体
z科教片胶片(进口)
A科教片其他特种电影
B科教片其他
C科教片中国巨幕
D科教片中国巨幕立体
"""

@ -0,0 +1,125 @@
import pymysql
from pymysql.cursors import DictCursor
from mock.models import ZZMockModel
# 转换sessionCode为show_id
def handle_session_code(session_code):
while session_code.startswith('0'):
session_code = session_code[1:]
return session_code
def get(cinema_data, request):
# 审核状态映射
# 退票状态 1-未提交 2-审核中 3-审核通过 4-审核失败
# 售票状态 1-已审核 2-已驳回 3-审核中 4-已补登 5-未提交 6-提交失败
# 专资接口 # operation 1售票 2退票 # status 0不通过 1通过 2待审核 99待提交
refund_status_dict = {
'3': '1',
'4': '0'
}
sell_status_dict = {
'1': '1',
'2': '0',
'4': '1',
}
# 定义影院数据库
db_config = {
'host': cinema_data.ip,
'user': cinema_data.db_user,
'password': cinema_data.db_pwd,
'database': 'cine',
'connect_timeout': 5,
}
# 获取审核状态
audit_config = ZZMockModel.objects.filter(ip=cinema_data.ip).first()
target_show_id = audit_config.auditShowId
target_status = audit_config.auditStatus
# 获取请求数据
session_code = request.query_params.get('sessionCode')
overtime_type = request.query_params.get('overtimeType')
# session_code装show_id
request_show_id = handle_session_code(session_code)
# 如果不是要测试的场次直接返回None, 从而执行bypass
if request_show_id != target_show_id:
return None
db_conn = pymysql.Connect(**db_config)
db_cursor = db_conn.cursor(cursor=DictCursor)
ticket_list = []
# 处理过场收票
if overtime_type == '0':
# zz_audit_status 专资审核状态 1-已审核 2-已驳回 3-审核中 4-已补登 5-未提交 6-提交失败
# cinema_sell_add_status 补登状态 1-已审核 2-已驳回 3-审核中,
sell_sql = """
SELECT csad.id,
csad.ticket_no AS ticket_no,
csa.cinema_sell_add_status AS add_status,
csa.zz_audit_status AS audit_status,
csa.cinema_sell_add_showid AS show_id
FROM cinema_sell_add_detail csad
LEFT JOIN cinema_sell_add csa ON csad.cinema_sell_add_id = csa.cinema_sell_add_id
WHERE csad.ticket_no <> ''
AND csa.cinema_sell_add_showid = %s
ORDER BY csad.id DESC;
"""
db_cursor.execute(sell_sql, (target_show_id,))
sell_tickets = db_cursor.fetchall()
for ticket in sell_tickets:
sell_status = str(ticket['audit_status'])
if sell_status in ('5', '6'):
continue
elif sell_status == '3':
ticket_list.append({
'ticketNo': ticket['ticket_no'],
'operation': 2,
'status': target_status
})
else:
ticket_list.append({
'ticketNo': ticket['ticket_no'],
'operation': 2,
'status': sell_status_dict[sell_status]
})
# 处理过场退票
if overtime_type == '1':
# cinema_refund_order_detail
refund_sql = """
SELECT crod.id, crod.cinema_sell_id, crod.refund_status, cslei.ticket_no
FROM cinema_refund_order_detail crod
LEFT JOIN cinema_sell_log_ext_info cslei ON crod.cinema_sell_id = cslei.cinema_sell_id
WHERE refund_order_id IN (SELECT cinema_refund_order.refund_order_id FROM cinema_refund_order WHERE show_id = %s)
ORDER BY id DESC;
"""
db_cursor.execute(refund_sql, (target_show_id,))
refund_tickets = db_cursor.fetchall()
for ticket in refund_tickets:
refund_status = str(ticket['refund_status'])
if refund_status == '1':
continue
elif refund_status == '2':
ticket_list.append({
'ticketNo': ticket['ticket_no'],
'operation': 2,
'status': target_status
})
else:
ticket_list.append({
'ticketNo': ticket['ticket_no'],
'operation': 2,
'status': refund_status_dict[refund_status]
})
print(ticket_list)
mock_data = {'message': '操作成功',
'data': {'ticketList': ticket_list},
'code': '200',
'status': 'success'}
return mock_data

@ -13,10 +13,6 @@ def get(cinema_data):
db_conn = pymysql.Connect(**db_config)
db_cursor = db_conn.cursor(cursor=DictCursor)
# cinema_info
cinema_info_sql_str = 'SELECT * FROM cinema_info;'
db_cursor.execute(cinema_info_sql_str)
cinema_info_data = db_cursor.fetchone()
# cinema_set
cinema_set_sql_str = 'SELECT * FROM cinema_set;'
db_cursor.execute(cinema_set_sql_str)
@ -25,21 +21,27 @@ def get(cinema_data):
cinema_hall_info_sql_str = 'SELECT * FROM cinema_hall_info;'
db_cursor.execute(cinema_hall_info_sql_str)
cinema_hall_info_data = db_cursor.fetchall()
hall_num = len(cinema_hall_info_data)
db_conn.close()
mock_data = {'message': '操作成功',
'data': {
'cinemaChainName': cinema_info_data['cinema_chain_name'],
'manager': cinema_info_data['cinema_manager'],
'screens': hall_num,
'managerTel': cinema_info_data['cinema_telephone'],
'cinemaName': cinema_set_data['cinema_platform_name'],
'cinemaCode': cinema_set_data['cinema_num'], # 适配字段
'cinemaLevel': cinema_info_data['cinema_level'],
'businessStatus': '12', # 定义字段
'officialName': cinema_info_data['official_name'],
'fax': cinema_info_data['cinema_fax']
},
'code': '200',
'status': 'success'}
hall_list = []
for hall in cinema_hall_info_data:
hall_dict = {
'isArt': False if hall['cinema_zz_hall_is_art'] == 0 else True,
'hallType': hall['cinema_zz_hall_type'],
'showType': hall['cinema_zz_hall_show_type'],
'isRed': False if hall['cinema_zz_hall_is_red'] == 0 else True,
'screenName': hall['cinema_hall_real_name'],
'seatCount': int(hall['cinema_hall_seat_num']),
'screenCode': hall['cinema_zz_hall_code']
}
hall_list.append(hall_dict)
mock_data = {
'message': '操作成功',
'data':
{
'screenList': hall_list,
'cinemaCode': cinema_set_data['cinema_num']
},
'code': '200',
'status': 'success'
}
return mock_data

@ -0,0 +1,3 @@
def get():
mock_data = {'message': '操作成功', 'data': [], 'code': '200', 'status': 'success'}
return mock_data

@ -15,6 +15,9 @@ class ZZMockModel(models.Model):
getOvertimeTicketStatus = models.BooleanField(verbose_name='超时票务受理情况查询接口', null=False,
help_text='超时票务受理情况查询接口')
validError = models.BooleanField(verbose_name='数据清洗错误查询接口', null=False, help_text='数据清洗错误查询接口')
auditShowId = models.CharField(verbose_name='需要审核的过场场次编码', null=True, max_length=20, help_text='需要审核的过场场次编码')
auditStatus = models.CharField(verbose_name='需要审核的过场场次状态', null=True, max_length=3, help_text='需要审核的过场场次状态')
def __str__(self):
return self.ip

@ -25,4 +25,9 @@ urlpatterns = [
path('report/downloadFilmInfo', download_film_info),
path('data/getCinemaInfo', get_cinema_info),
path('data/getScreenInfo', get_screen_info),
path('data/downloadFilmInfo', download_film_info),
path('query/validError', valid_error),
path('data/getOvertimeTicketStatus', get_overtime_ticket_status),
]

@ -6,7 +6,7 @@ from mock.mock_templates import *
ZZ_URL = 'https://zzcs.yinghezhong.com'
def mock(_ip, _api, *args, **kwargs):
def mock(_ip, _api, **kwargs):
print('mock')
cinema_obj = Cinema.objects.filter(ip=_ip).first()
if _api == 'reportTicket':
@ -24,6 +24,18 @@ def mock(_ip, _api, *args, **kwargs):
if _api == 'getScreenInfo':
mock_data = get_screen_info_tmp.get(cinema_obj)
return JsonResponse(mock_data, json_dumps_params={'ensure_ascii': False})
if _api == 'downloadFilmInfo':
mock_data = download_film_info_tmp.get()
return JsonResponse(mock_data, json_dumps_params={'ensure_ascii': False})
if _api == 'getOvertimeTicketStatus':
mock_data = get_overtime_ticket_status_tmp.get(cinema_obj, kwargs.get('request'))
if mock_data is not None:
return JsonResponse(mock_data, json_dumps_params={'ensure_ascii': False})
else:
bypass(kwargs.get('request'))
if _api == 'validError':
mock_data = valid_error_tmp.get()
return JsonResponse(mock_data, json_dumps_params={'ensure_ascii': False})
def bypass(_r):

@ -3,7 +3,11 @@ from django.shortcuts import render
from django.http.response import JsonResponse
from mock.utils import mock_service
from mock.models import ZZMockModel
from update.models import Cinema
from django.views.decorators.csrf import csrf_exempt
from pymysql.cursors import DictCursor
import pymysql
from mock.mock_templates import *
ZZ_URL = 'https://zzcs.yinghezhong.com'
@ -75,8 +79,88 @@ def get_screen_info(request):
else:
return mock_service.bypass(request)
# 超时票务审批
# 1、超时票务受理情况查询接口 GET /data/getOvertimeTicketStatus
#
# 参数 场次id 审核状态 0不通过 1通过 2待审核 99待提交
def get_overtime_ticket_status(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.getOvertimeTicketStatus is True:
return mock_service.mock(ip, 'getOvertimeTicketStatus', request=request)
else:
return mock_service.bypass(request)
# 数据查询
# 2、数据清洗错误查询接口 POST /query/validError
def valid_error(request):
ip = request.META.get('REMOTE_ADDR')
mock_service_switch = ZZMockModel.objects.filter(ip=ip).first()
if mock_service_switch.validError is True:
return mock_service.mock(ip, 'validError')
else:
return mock_service.bypass(request)
# 获取过场信息
def get_overtime_show(request):
query_params = request.query_params.dict()
ip = query_params.get('ip')
cinema_data = Cinema.objects.filter(ip=ip).first()
db_config = {
'host': cinema_data.ip,
'user': cinema_data.db_user,
'password': cinema_data.db_pwd,
'database': 'cine',
'connect_timeout': 5,
}
# 获取过场售票
sql_str = '''
SELECT csa.cinema_sell_add_showid AS show_id,
cms.cinema_movie_name AS name,
cms.cinema_movie_show_start_time AS start_time,
chi.cinema_hall_name AS hall_name
FROM cinema_sell_add csa
LEFT JOIN cinema_hall_info chi ON csa.cinema_sell_add_hall = chi.cinema_hall_id
LEFT JOIN cinema_movie_show cms ON csa.cinema_sell_add_showid = cms.cinema_movie_show_id
GROUP BY csa.cinema_sell_add_showid ORDER BY cms.cinema_movie_show_start_time DESC;
'''
db_conn = pymysql.Connect(**db_config)
db_cursor = db_conn.cursor(cursor=DictCursor)
db_cursor.execute(sql_str)
show_data = db_cursor.fetchall()
sell_show_list = []
for show in show_data:
sell_show_list.append({'id': show['show_id'],
'show': show['name'] + ' ' + show['start_time'] + ' ' + show[
'hall_name'] if show['hall_name'] is not None else ''
})
# 获取过场退票场次
sql_str = '''
SELECT csl.cinema_movie_show_id AS show_id,
csl.cinema_movie_name AS name,
csl.cinema_movie_show_start_time AS start_time,
chi.cinema_hall_name AS hall_name
FROM cinema_sell_log csl
LEFT JOIN cinema_hall_info chi ON csl.cinema_hall_id = chi.cinema_hall_id
WHERE csl.cinema_sell_id IN (SELECT cinema_sell_id FROM cinema_refund_order_detail)
GROUP BY cinema_movie_id ORDER BY csl.cinema_movie_show_start_time DESC;
'''
db_conn = pymysql.Connect(**db_config)
db_cursor = db_conn.cursor(cursor=DictCursor)
db_cursor.execute(sql_str)
show_data = db_cursor.fetchall()
refund_show_list = []
for show in show_data:
refund_show_list.append({'id': show['show_id'],
'show': show['name'] + ' ' + show['start_time'] + ' ' + show[
'hall_name'] if show['hall_name'] is not None else ''
})
return JsonResponse({'sell': sell_show_list, 'refund': refund_show_list}, json_dumps_params={'ensure_ascii': False})
# 获取mock数据的json
def get_response(request, _api):
_ip = request.query_params.dict().get('ip')
return mock_service.mock(_ip, _api)

Loading…
Cancel
Save