第一版代码

main
RogerWork 1 month ago
commit fad8ab3392
  1. 2
      .gitignore
  2. 53
      get_jd_timestamp.py
  3. 42
      jd_client.py
  4. 29
      jd_code.py
  5. 46
      jd_playwright.py
  6. 84
      jd_uia.py
  7. 57
      main.py
  8. 39
      main_uia.py
  9. BIN
      requirement.txt
  10. 43
      wx.py

2
.gitignore vendored

@ -0,0 +1,2 @@
/venv/
/__pycache__/

@ -0,0 +1,53 @@
import requests
import time
def get_jd_time():
url = "https://api.m.jd.com/"
payload={}
headers = {
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Accept': '*/*',
'Host': 'api.m.jd.com',
'Connection': 'keep-alive'
}
response = requests.request("GET", url, headers=headers, data=payload)
delay_time = int(response.elapsed.total_seconds()*1000)
jd_timestamp = int(response.headers['X-API-Request-Id'][-13:])
print(jd_timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(jd_timestamp/1000)), ', 响应时间:', int(response.elapsed.total_seconds()*1000), 'ms')
return jd_timestamp, delay_time
def jd_timer(target):
target_timestamp = int(time.mktime(time.strptime(target, '%Y-%m-%d %H:%M:%S'))*1000)
print(target_timestamp)
jd_ts, delay = get_jd_time()
while jd_ts < target_timestamp:
delta = target_timestamp - jd_ts
if delta > 120*1000:
time.sleep(60)
elif delta > 60*1000:
time.sleep(30)
elif delta > 30*1000:
time.sleep(30)
elif delta > 10*1000:
time.sleep(1)
elif delta > 3*1000:
time.sleep(0.5)
elif delta > 1*1000:
time.sleep(0.2)
else:
if delta <= delay:
break
else:
sleep_val = delta - delay - 5
time.sleep(sleep_val/1000)
jd_ts, delay = get_jd_time()
print('done')
if __name__ == '__main__':
target = '2025-01-21 12:17:00'
jd_timer(target)

@ -0,0 +1,42 @@
from appium import webdriver
from appium.options.android import UiAutomator2Options
import time
class JdClient:
def __init__(self, device_name):
self.desired_caps = {
'platformName': 'Android',
'deviceName': device_name,
'platformVersion': '12.0',
'appPackage': 'com.jingdong.app.mall',
'appActivity': 'main.MainActivity',
'noReset': True,
'appium:automationName': 'UiAutomator2',
'newCommandTimeout': 6000,
'udid': device_name,
}
self.device =None
def initial_device(self, port):
print(f'设备:{self.desired_caps}')
self.device = webdriver.Remote(f'http://127.0.0.1:{port}/wd/hub', options=UiAutomator2Options().load_capabilities(self.desired_caps))
time.sleep(5)
x = self.device.get_window_size()['width']
y = self.device.get_window_size()['height']
print(f'{self.desired_caps["deviceName"]}屏幕尺寸:{x}x{y}')
def do(self, msg_dict):
self.device.set_clipboard_text(msg_dict['msg'])
print(self.device.get_clipboard_text())
time.sleep(0.5)
print(f'{self.desired_caps["platformName"]}点击')
self.device.tap([(550, 1263)], 100)
if __name__ == '__main__':
jd_client = JdClient('127.0.0.1:16416')
jd_client.initial_device(4723)
jd_client.do('玩具乐器五折券')

@ -0,0 +1,29 @@
import re
from playwright.sync_api import Playwright, sync_playwright, expect
def run(playwright: Playwright) -> None:
browser = playwright.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
page.goto("https://www.jd.com/")
page.get_by_role("link", name="你好,请登录").click()
page.get_by_placeholder("账号名/手机号/邮箱").click()
page.get_by_placeholder("账号名/手机号/邮箱").fill("18611659484")
page.get_by_placeholder("账号名/手机号/邮箱").press("Tab")
page.get_by_placeholder("密码").fill("Rog129Sunx1532")
page.get_by_role("link", name="    ").click()
page.locator(".JDJRV-slide-bg > div:nth-child(3)").click()
page.locator(".JDJRV-slide-bg > div:nth-child(3)").click()
page.locator(".JDJRV-slide-bg > div:nth-child(3)").click()
page.get_by_text("向右滑动完成拼图").click()
page.locator(".JDJRV-slide-bg > div:nth-child(3)").click()
page.locator("#JDJRV-wrap-loginsubmit a").first.click()
# ---------------------
context.close()
browser.close()
with sync_playwright() as playwright:
run(playwright)

@ -0,0 +1,46 @@
import re
from playwright.sync_api import Playwright, sync_playwright, expect
import time
from get_jd_timestamp import jd_timer
target: str = '2025-01-21 15:41:00'
target_timestamp = int(time.mktime(time.strptime(target, '%Y-%m-%d %H:%M:%S'))*1000)
AC_URL = f"https://h5static.m.jd.com/mall/active/xXzWGzxuJ6HRksjXms2x3cQsh25/index.html?utm_term=Wxfriends&utm_user=plusmember&utm_source=iosapp&utm_campaign=t_335139774&utm_medium=appshare&_ts={str(target_timestamp)}&ad_od=share&gxd=RnAowmYKPGXfnp4Sq4B_W578vOMp4E7JgUugKDcomXTOIlSPI-BCnvuytD0G7kc&gx=RnAomDgLPTTeyp5Z_sA9"
def run(playwright: Playwright) -> None:
browser = playwright.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
page.goto("https://www.jd.com/")
page.get_by_role("link", name="你好,请登录").click()
page.get_by_placeholder("账号名/手机号/邮箱").click()
page.get_by_placeholder("账号名/手机号/邮箱").fill("18611659484")
page.get_by_placeholder("密码").fill("Rog129Sunx1532")
page.get_by_role("link", name="    ").click()
time.sleep(30)
if page.get_by_text("hehedehehe"):
# 打开活动页面
print('登录成功!')
print(AC_URL)
page2 = context.new_page()
page2.goto(AC_URL)
# 倒计时
jd_timer(target)
# 点击领取
page2.click(".coupon_wrapper")
target_name = target.replace(':', '-').replace(' ', '_')
page2.screenshot(path=f"{target_name}.png")
print('点击立即领取!')
time.sleep(100)
# ---------------------
context.close()
browser.close()
with sync_playwright() as playwright:
run(playwright)

@ -0,0 +1,84 @@
import uiautomator2
import threading
class JdClient:
def __init__(self, device):
self.device = device
self.dev = uiautomator2.connect(device)
def close_app(self):
self.dev.app_stop('com.jingdong.app.mall')
def set_clipboard(self, text):
self.dev.set_clipboard(text)
def open_app(self):
self.dev.app_start('com.jingdong.app.mall')
def open_and_click(self):
self.dev.app_start('com.jingdong.app.mall')
n = 30
while n > 0:
if self.dev(text='跳过').exists:
self.dev(text='跳过').click()
break
else:
self.dev.implicitly_wait(0.1)
n -= 1
while True:
if self.dev(text='立即查看').exists:
print(self.device, '点击立即查看')
self.dev.implicitly_wait(1)
self.dev(text='立即查看').click()
break
else:
self.dev.implicitly_wait(1)
# print('等待1秒')
while True:
if self.dev(text='为好友助力').exists:
print(self.device, '为好友助力')
self.dev.implicitly_wait(1)
self.dev(text='为好友助力').click()
break
else:
self.dev.implicitly_wait(1)
# print('等待1秒')
def full_steps(self, text):
self.set_clipboard(text)
self.open_and_click()
if __name__ == '__main__':
device_list = [
'127.0.0.1:16416',
'127.0.0.1:16448',
'127.0.0.1:16480',
'127.0.0.1:16512',
# '127.0.0.1:16544',
# '127.0.0.1:16576',
'127.0.0.1:16608',
# '127.0.0.1:16640',
# '127.0.0.1:16672',
# '127.0.0.1:16704',
# 'b189a9ab', # 一加
]
# device_list = ['127.0.0.1:16416',
# '127.0.0.1:16448']
jd_client_list = []
for dev in device_list:
jd_client = JdClient(dev)
jd_client_list.append(jd_client)
for jd_client in jd_client_list:
t = threading.Thread(target=jd_client.close_app, args=())
t.start()
for jd_client in jd_client_list:
t = threading.Thread(target=jd_client.full_steps, args=('16:/¥FCWK99OZFKV¥ MU5104,⇥⤴ι🆖▾東【京东超市】分享助力赢玩具乐器五折券',))
t.start()

@ -0,0 +1,57 @@
from wx import get_user_message, listen_msg
from jd_client import JdClient
import multiprocessing, threading
device_list = ['127.0.0.1:16416',
'127.0.0.1:16448']
# device_list = ['127.0.0.1:16416',
# '127.0.0.1:16448',
# '127.0.0.1:16480',
# '127.0.0.1:16512',
# '127.0.0.1:16544',
# '127.0.0.1:16576',
# '127.0.0.1:16608',
# '127.0.0.1:16640']
listen_list = ['Honey', '分身199']
# listen_list = ['Honey']
# for device in device_list:
# print(f'设备:{device}')
# jd_client = JdClient(device)
# jd_client_list.append(jd_client)
port = 4723
device_obj_list = []
for device in device_list:
print(device)
jd_client = JdClient(device)
device_obj_list.append(jd_client)
for i, jd_client in enumerate(device_obj_list):
sub_port = port + i*2
print(i, sub_port)
jd_client.initial_device(sub_port)
# t = threading.Thread(target=jd_client.initial_device, args=(sub_port,))
# t.start()
msg = listen_msg(listen_list, '玩具乐器五折券')
print('准备完成,等待消息')
process_list = []
for jd_client in device_obj_list:
# p = multiprocessing.Process(target=jd_client.do, args=({'msg':msg},))
t = threading.Thread(target=jd_client.do, args=({'msg':msg},))
t.start()
# process_list.append(p)
# for p in process_list:
# p.join()
print('-----------------------------------')

@ -0,0 +1,39 @@
from wx import get_user_message, listen_msg
from jd_uia import JdClient
import multiprocessing, threading
# device_list = ['127.0.0.1:16416',
# '127.0.0.1:16448']
device_list = device_list = [
'127.0.0.1:16416',
'127.0.0.1:16448',
'127.0.0.1:16480',
'127.0.0.1:16512',
# '127.0.0.1:16544',
# '127.0.0.1:16576',
'127.0.0.1:16608',
# '127.0.0.1:16640',
# '127.0.0.1:16672',
# '127.0.0.1:16704',
# 'b189a9ab', # 一加
]
# listen_list = ['Honey', '分身199']
listen_list = ['Honey']
jd_client_list = []
for dev in device_list:
jd_client = JdClient(dev)
jd_client_list.append(jd_client)
t = threading.Thread(target=jd_client.close_app, args=())
t.start()
msg = listen_msg(listen_list, '玩具乐器五折券')
print(f'收到消息:{msg}')
for jd_client in jd_client_list:
t = threading.Thread(target=jd_client.full_steps, args=(msg,))
t.start()

Binary file not shown.

43
wx.py

@ -0,0 +1,43 @@
from wxauto import WeChat
import time
wx_client = WeChat()
wait = 0.5
def get_user_message(name, keyword=None):
wx_client.ChatWith(name)
msgs = wx_client.GetAllMessage()
for msg in msgs:
print(msg)
if keyword in msg:
print(f"找到信息:{msg}")
return msg
def listen_msg_by_person(keyword=None):
print('准备就绪等待消息...')
while True:
msgs = wx_client.GetListenMessage()
for chat in msgs:
who = chat.who
last_msgs = msgs.get(chat)
for msg in last_msgs:
content = msg.content
print(f'{who}: {content}')
if keyword in content:
print(f"找到信息:{content}")
return content
time.sleep(wait)
def listen_msg(listen_list=['Honey'], keyword=None):
for person in listen_list:
wx_client.AddListenChat(who=person, savepic=False)
return listen_msg_by_person(keyword)
if __name__ == '__main__':
listen_list = ['Honey', '分身199']
# listen_list = ['Honey']
# get_user_message('Honey', '玩具乐器五折券')
listen_msg(listen_list, '玩具乐器五折券')
Loading…
Cancel
Save