You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
65 lines
2.8 KiB
65 lines
2.8 KiB
from random import random |
|
|
|
from django_redis import get_redis_connection |
|
import json |
|
import random |
|
import xmltodict |
|
|
|
|
|
# 封装方法处理从redis中获取接口数据 |
|
def get_data_from_redis(redis_key, raw=False): |
|
# 初始化redis |
|
redis_conn = get_redis_connection() |
|
# 如果有用户数据则处理用户数据 |
|
if redis_conn.exists(redis_key + '_user_data'): |
|
redis_data = json.loads(redis_conn.get(redis_key + '_user_data')) |
|
user_data = redis_data['user_data'] |
|
print('get_data_from_redis_user_data', json.dumps(user_data)) |
|
return True, redis_data['format'], user_data[0], redis_data['timestamp'] |
|
# 如果没用户数据时优先获取随机数据 |
|
if redis_conn.exists(redis_key + '_random'): |
|
redis_data = json.loads(redis_conn.get(redis_key + '_random')) |
|
user_data = redis_data['user_data'] |
|
print('get_data_from_redis_random', json.dumps(user_data)) |
|
return True, redis_data['format'], user_data[0], redis_data['timestamp'] |
|
# 没有用户数据时,随机选择数据 |
|
if redis_conn.exists(redis_key): |
|
redis_data = json.loads(redis_conn.get(redis_key)) |
|
resp_data = [] |
|
resp = redis_data['handled_data'] |
|
print('get_data_from_redis', json.dumps(redis_data['handled_data'])) |
|
resp_result = resp['res']['status'] |
|
print('resp_result', resp_result) |
|
print('resp_result-type', type(resp_result)) |
|
if str(resp_result) == '1': |
|
print('get_data_from_redis-1') |
|
resp_data = resp['res']['data'] if isinstance(resp['res']['data'], list) else [resp['res']['data']] |
|
print('get_data_from_redis-resp_data', resp_data) |
|
if str(resp_result) != '1': |
|
print('get_data_from_redis-2') |
|
return False, redis_data['format'], '请检查接口返回值或手动输入参数:', 0 |
|
if len(resp_data) == 0: |
|
print('get_data_from_redis-3') |
|
return False, redis_data['format'], '接口返回数据为空,请手动输入参数:', 0 |
|
if raw is True: |
|
return True, redis_data['format'], resp_data, redis_data['timestamp'] |
|
return True, redis_data['format'], random.choice(resp_data), redis_data['timestamp'] |
|
return False, 'json', '请手动输入参数,或先请求接口:', 0 |
|
|
|
def get_param_from_redis(redis_key): |
|
# 初始话redis |
|
redis_conn = get_redis_connection() |
|
if redis_conn.exists(redis_key): |
|
redis_data = json.loads(redis_conn.get(redis_key)) |
|
request_params = redis_data['params'] |
|
return request_params |
|
return False |
|
|
|
def get_timestamp_from_redis(redis_key): |
|
# 初始话redis |
|
redis_conn = get_redis_connection() |
|
if redis_conn.exists(redis_key): |
|
redis_data = json.loads(redis_conn.get(redis_key)) |
|
request_timestamp = redis_data['timestamp'] |
|
return request_timestamp |
|
return 0 |