# -*- coding: utf-8 -*- import os import argparse import csv import requests import json from datetime import datetime # from http.cookies import SimpleCookie from cachetools import TTLCache import pickle import multiprocessing from multiprocessing import Manager from multiprocessing.dummy import Pool as ThreadPool from functools import partial import time def read_csv(filename='results.csv'): data = [] try: with open(filename, mode='r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile); for row in reader: data.append(row); except IOError as e: print(f"read_csv error occurred: {e}"); # print(data); return data; def read_json(filename='results.json'): data = None; try: with open(filename, 'r', encoding='utf-8') as file: data = json.load(file); except IOError as e: print(f"read_json error occurred: {e}"); # print(data); return data; g_shared_cache = None; class SharedCache: cache_file_path = 'elife.cache.pickle'; def __init__(self): self.init(); def init(self): cache = None; try: cache_file_path = SharedCache.cache_file_path; if os.path.exists(cache_file_path): # 从文件中加载缓存对象 with open(cache_file_path, 'rb') as file: cache = pickle.load(file); else: cache = TTLCache(maxsize=100, ttl=60*60*24); # 将缓存对象持久化到文件 with open(cache_file_path, 'wb') as file: pickle.dump(cache, file); except Exception as e: cache = {}; print(e); finally: pass self.cache = cache; def save(self): try: cache_file_path = SharedCache.cache_file_path; with open(cache_file_path, 'wb') as file: pickle.dump(self.cache, file); except Exception as e: print(e); finally: pass; def set(self, key, value): self.cache[key] = value; def get(self, key): ret = None; try: ret = self.cache.get(key); except Exception as e: print(e); finally: return ret; class OfpayGrabber: FastModeEnable = True; CheckStockEnable = True; CheckBuyRepeatEnable = True; def __init__(self, accout_data, activities_data, cli_options): if cli_options.get('mode') == 'fast': OfpayGrabber.FastModeEnable = True; else: OfpayGrabber.FastModeEnable = False; if cli_options.get('check_stock') == '1': OfpayGrabber.CheckStockEnable = True; else: OfpayGrabber.CheckStockEnable = False; if cli_options.get('check_repeat') == '1': OfpayGrabber.CheckBuyRepeatEnable = True; else: OfpayGrabber.CheckBuyRepeatEnable = False; self.app_version = 602; self.award_want_discount_dict = None; self.activities_data = activities_data; self.enable = True if accout_data['enable'] == 1 else False; self.accout_data = accout_data; self.host = 'market-web.ofpay.com'; self.market_id = accout_data['market_id']; self.event_visitor_id = accout_data['event_visitor_id']; self.accout_id = accout_data['account']; uuid = accout_data['uuid']; user_agent = accout_data['user_agent']; authorization = accout_data['authorization']; self.cookies = eval(accout_data['cookies']); cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]); self.headers = { 'Host': self.host, 'UUID': uuid, 'Accept': '*/*', 'Sec-Fetch-Site': 'same-origin', 'Origin': 'https://market-web.ofpay.com', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Sec-Fetch-Mode': 'cors', 'Content-Type': 'application/json; charset=utf-8', 'Connection': 'keep-alive', 'User-Agent': user_agent, 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Authorization': authorization, 'Sec-Fetch-Dest': 'empty', 'Referer': 'https://market-web.ofpay.com/h5/union/standard/interactiveIGoChoose/index', 'Cookie': cookies_str, }; def start(self): ret_code = self.check_refresh_token(); print(f"账号[{self.accout_id}]token数据状态:{ret_code}"); if ret_code < 0: print(f"账号[{self.accout_id}]token已失效"); return; if ret_code == 1: self.sync_new_token(); cate_items_list = self.get_market_items_from_cache(); if not cate_items_list: cate_items_list = self.get_market_items_from_svr(self.market_id, self.event_visitor_id); if not cate_items_list: return; cate_items_dict = {}; for activity_data in cate_items_list: out_activity_code = activity_data['outActivityCode']; cate_items_dict[out_activity_code] = activity_data; all_buy_dict = self.get_will_market_buy_list_all(); all_ret_list = []; for cate_type in all_buy_dict: buy_list = all_buy_dict[cate_type]; if not buy_list: continue; activity_data = cate_items_dict[cate_type] if cate_type in cate_items_dict else None; if not activity_data: continue; buy_ret_list = self.check_to_buy_all(buy_list, activity_data); all_ret_list.extend(buy_ret_list); if len(all_ret_list): g_shared_cache.save(); return all_ret_list; def check_refresh_token(self, force_refresh=False): expire_time_str = self.accout_data['expire_time']; expire_dt = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S") now_dt = datetime.now(); is_valid = False; if now_dt < expire_dt: is_valid = True; if not force_refresh and is_valid: return 0; login_params = self.accout_data['login_params']; url = f'https://{self.host}/h5/union/interactiveIGoChoose/index?loginParams={login_params}'; response = self.get_request(url); # content = response.content; # print(content.decode("utf-8")); # cookie_dict = dict(response.cookies); cookie_dict = {} for cookie in response.cookies: if cookie.name: key = cookie.name; cookie_dict[key] = cookie.value; authorization = None; for key in cookie_dict: self.cookies[key] = cookie_dict[key]; if key == 'unionToken_interactiveIGoChoose': authorization = self.cookies[key]; if authorization: self.headers['Authorization'] = authorization; cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]); self.headers['Cookie'] = cookies_str; return 1; return -1; def sync_new_token(self): print('sync_new_token'); def get_will_market_buy_list_all(self): def_val = 'eSupermarket#京东E卡|eTravel#滴滴快车|eCoffee#星巴克|eFood#百果园|eTea#霸王茶姬|eMovie#|eBicycle#|eOffice#'; buy_ret_dict = {}; buy_conf_str = self.accout_data['will_buy_list'] if self.accout_data['will_buy_list'] else def_val; if len(buy_conf_str): buy_ret_dict = {}; items = buy_conf_str.strip().split('|'); for vstr in items: if len(vstr): cate_item = vstr.strip().split('#'); cate_type_key = cate_item[0]; cate_item_val = cate_item[1] if len(cate_item) > 1 else None; if cate_item_val and len(cate_item_val): vlist = cate_item_val.strip().split('+'); buy_ret_dict[cate_type_key] = vlist; else: buy_ret_dict = {}; return buy_ret_dict; def get_award_expected_discount(self, price, prize_name): if not self.award_want_discount_dict: self.award_want_discount_dict = {}; def_val = '星巴克#20.80|霸王茶姬#10.80|百果园#10.80|京东E卡#10.80|滴滴快车#10.80'; discount_str = self.accout_data['will_discount'] if self.accout_data['will_discount'] else def_val; if len(discount_str): segments = discount_str.strip().split('|'); for vstr in segments: if len(vstr): vlist = vstr.strip().split('#'); key = vlist[0]; if vlist[1]: price_val = float(vlist[1].strip()); self.award_want_discount_dict[key] = price_val; if self.award_want_discount_dict.get(prize_name) != None: return self.award_want_discount_dict[prize_name]; return price - 8.8; def get_market_items_from_cache(self): if not self.activities_data: self.activities_data = read_json('elife_activities_data.json'); return self.activities_data; def save_market_items_cache(self, market_id, event_visitor_id, data): with open('elife_activities_data.json', 'w', newline='', encoding='utf-8') as f: json_str = json.dumps(data, cls=DecimalEncoder, ensure_ascii=False); f.write(json_str); def get_market_items_from_svr(self, market_id, event_visitor_id): params = { 'marketId': market_id, 'eventVisitorId': event_visitor_id }; url = ('https://%s/h5/union/api/interactiveIGoChoose/indexConfigRebuild' % (self.host)); print("请求市场商品列表数据"); response = self.get_request(url, params); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); if params.get('code') == 'success': print("请求市场商品列表数据成功"); # self.save_market_items_cache(params['data']); return params['data']; else: print(f"请求市场商品列表数据成功,响应:{json_str}"); except Exception as e: print("请求市场商品列表发生错误"); print(e); finally: pass else: print("请求市场商品列表发生错误"); return None; def check_to_buy_all(self, buy_list, activity_data): activity_id = activity_data['activityId']; sub_activity_id = activity_data['subActivityId'] if 'subActivityId' in activity_data else None; sub_login_type = activity_data['subLoginType']; award_list = activity_data['awardList']; ret_list = []; for i in range(len(buy_list)): try: one_ret = self.check_to_buy_one(activity_id, sub_activity_id, sub_login_type, buy_list[i], award_list); if one_ret: ret_list.append(one_ret); except Exception as e: print(e); else: pass finally: pass return ret_list; def check_to_buy_one(self, activity_id, sub_activity_id, sub_login_type, item_name, award_list): one_ret = None; check_buy_repeat_key = f"lkOfPayBuyItemKey#{item_name}#{self.accout_id}"; now_string = datetime.now().strftime('%Y-%m-%d %H:%M:%S'); if OfpayGrabber.CheckBuyRepeatEnable: last_buy_succ_date = g_shared_cache.get(check_buy_repeat_key); if now_string == last_buy_succ_date: print(f"商品[{item_name}]今日已抢购成功过,跳过~~"); return one_ret; if OfpayGrabber.FastModeEnable: for award_data in award_list: if item_name in award_data['prizeName']: print(f"开始尝试抢购-{award_data['prizeName']}{award_data['prizeDesc']},价格:{award_data['price']},库存:{award_data['remainStock']}"); if OfpayGrabber.CheckStockEnable: if award_data['remainStock'] > 0: one_ret = self.item_buy_fast(activity_id, sub_activity_id, sub_login_type, award_data); if one_ret: # 抢购成功 g_shared_cache.set(check_buy_repeat_key, now_string); else: print("库存不足,跳过~"); else: one_ret = self.item_buy_fast(activity_id, sub_activity_id, sub_login_type, award_data); if one_ret: # 抢购成功 g_shared_cache.set(check_buy_repeat_key, now_string); break; else: for award_data in award_list: if item_name in award_data['prizeName']: print(f"开始尝试抢购-{award_data['prizeName']}{award_data['prizeDesc']},价格:{award_data['price']},库存:{award_data['remainStock']}"); if award_data['remainStock'] > 0: one_ret = self.item_buy_normal(activity_id, sub_activity_id, sub_login_type, award_data); if one_ret: # 抢购成功 g_shared_cache.set(check_buy_repeat_key, now_string); else: print("库存不足,跳过~"); break; return one_ret; def item_buy_fast(self, activity_id, sub_activity_id, sub_login_type, award_data): print("###item_buy_fast"); activity_id = award_data['activityId'] # 假设 ofpay_account_phone 和 event_visitor_id 在这段代码之外被定义 # 或者作为参数传递给这个函数 event_visitor_id = self.event_visitor_id; game_account = self.accout_id; third_info = json.loads(award_data['thirdInfo']); award_id = award_data['awardId']; pay_info = self.get_pay_info(activity_id, award_id, '', '', game_account, event_visitor_id); if pay_info: if 'detailId' in pay_info: pay_ret = self.pay(activity_id, event_visitor_id, pay_info['detailId']); if pay_ret: return award_data; return None; def get_pay_info(self, activity_id, award_id, goods_id, invitation_code, game_account, event_visitor_id): url = f'https://{self.host}/h5/union/api/draw/interactiveIGoChoose/{activity_id}?awardId={award_id}&goodsId={goods_id}&invitationCode={invitation_code}&gameAccount={game_account}&eventVisitorId={event_visitor_id}'; print("请求商品预支付数据"); response = self.get_request(url); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); if params.get('code') == '0': print("请求商品预支付数据成功"); print(json_str); return params; else: # 19=存在待支付订单 print(f"请求商品预支付数据失败,响应:{json_str}") except Exception as e: print("请求商品预支付数据发生错误"); print(e); finally: pass else: print("请求商品预支付数据发生错误"); return None; def pay(self, activity_id, event_visitor_id, detail_id): account_phone = self.accout_id; post_data = { "detailId": detail_id, "rechargeAccount": account_phone, "account": account_phone, "appVersion": self.app_version, }; url = f"https://{self.host}/h5/api/mobile/activity/pay/{activity_id}?eventVisitorId={event_visitor_id}"; body = json.dumps(post_data); response = self.post_request(url, body); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); if params.get('code') == 'success': print("请求下单成功"); print(json_str); return params; else: print(f"请求下单失败,响应:{json_str}"); except Exception as e: print("请求下单发生错误"); print(e); finally: pass else: print("请求下单发生错误"); return None; def check_pick_item(self, prize_name, award_list): for award_data in award_list: one_prize_name = award_data['prizeName']; if '忽略' not in one_prize_name and prize_name in one_prize_name: return award_data; return None; def item_buy_normal(self, activity_id, sub_activity_id, sub_login_type, award_data): print("###item_buy_normal"); activity_id = award_data["activityId"]; prize_name = award_data["prizeName"]; market_id = self.market_id; event_visitor_id = self.event_visitor_id; act_data_list = self.get_activity_items(market_id, activity_id, event_visitor_id); the_act_data = self.get_activity_data(activity_id, event_visitor_id); des_info = self.get_des_decode_info(activity_id, event_visitor_id); cate_act_type = sub_login_type; if des_info: if des_info["code"] != '0': # 5=已享这周首单优惠 7=人数过多稍后重试 cate_act_type = 'subChoose' for act_data in act_data_list: act_type = act_data["type"] if act_type == cate_act_type: award_list = act_data["awardList"] new_award_item = self.check_pick_item(prize_name, award_list); if new_award_item: award_data = new_award_item; break; game_account = self.accout_id; third_info = json.loads(award_data["thirdInfo"]); award_id = award_data["awardId"]; award_price = float(award_data["price"]); award_face_value = float(third_info["faceValue"]); award_data["faceValue"] = award_face_value; discount_price = self.get_award_expected_discount(award_face_value, prize_name); print(f"商品{prize_name}(面值:{award_face_value})\n匹配的最终价格:{award_price}\n预设折扣价格:{discount_price}#{award_id}"); if award_price <= discount_price: activity_id = award_data["activityId"]; pay_info = self.get_pay_info(activity_id, award_id, '', '', game_account, event_visitor_id); if pay_info: if "detailId" in pay_info: pay_ret = self.pay(activity_id, event_visitor_id, pay_info["detailId"]); if pay_ret: return award_data; else: print(f"未到预设折扣价格{discount_price},跳过~"); return None; def get_activity_items(self, market_id, activity_id, event_visitor_id): url = f"https://{self.host}/h5/union/interactiveIGoChoose/marketIndexRebuild?marketId={market_id}&activityId={activity_id}&eventVisitorId={event_visitor_id}"; print("请求活动商品列表数据"); response = self.get_request(url); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); if params.get('code') == 'success': print("请求活动商品列表数据成功"); return params['data']; else: print(f"请求活动商品列表数据失败,响应:{json_str}"); except Exception as e: print("请求活动商品列表发生错误"); print(e); finally: pass else: print("请求活动商品列表发生错误"); return None; def get_activity_data(self, activity_id, event_visitor_id): url = f"https://{self.host}/h5/api/mobile/activity/data?activityNo={activity_id}&eventVisitorId={event_visitor_id}"; print("请求活动状态数据"); response = self.get_request(url); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); if params.get('code') == 'success': print("请求活动状态数据成功"); return params['data']; else: print(f"请求活动状态数据失败,响应:{json_str}"); except Exception as e: print("请求活动状态发生错误"); print(e); finally: pass else: print("请求活动状态发生错误"); return None; def get_des_decode_info(self, activity_id, event_visitor_id): url = f"https://{self.host}/h5/union/api/interactiveIGoChoose/getDesDecodeInfo?activityNo={activity_id}&eventVisitorId={event_visitor_id}"; print("请求活动描述数据"); response = self.get_request(url); if response.status_code == 200: try: json_str = response.content; json_str = json_str.decode('utf-8'); params = json.loads(json_str); print("请求活动描述数据成功"); print(json_str); return params; except Exception as e: print("请求活动描述发生错误"); print(e); finally: pass else: print("请求活动描述发生错误"); return None; def get_request(self, url, params=None, timeout=6): response = requests.get(url, headers=self.headers, params=params, cookies=self.cookies, timeout=timeout); return response; def post_request(self, url, data, timeout=6): response = requests.post(url, data=data, headers=self.headers, cookies=self.cookies, timeout=timeout); return response; def init_data_pre_excute(): # print('init_shared_data'); pass; def thread_worker_func(index, shared_namespace, account_info, activities_data): global g_shared_cache; g_shared_cache = shared_namespace.g_shared_cache; cli_options = shared_namespace.cli_options; print('########[%d]账号[%s]开始抢券工作########' % (index, account_info['account'])); try: grabber = OfpayGrabber(account_info, activities_data, cli_options); results = grabber.start(); except Exception as e: print(e); finally: pass if results: print(results); print(f"账号[{account_info['account']}]任务完成"); print('###############################################\n'); return {'account_info': account_info, 'results': results}; # 守护进程 def thread_daemon(func, *args, **kwargs): # print('thread_daemon'); timeout = kwargs.get('timeout', None); p = ThreadPool(1); res = p.apply_async(func, args=args); try: out = res.get(timeout=timeout) # Wait timeout seconds for func to complete. return out; except multiprocessing.TimeoutError: print("Aborting due to timeout={}".format(args[0])); return (args[0], 444); def collect_result(result): # print("{}".format(result)); pass; def main(cli_options): global g_shared_cache; g_shared_cache = SharedCache(); g_shared_cache.set('test', 10); cpu_threads = multiprocessing.cpu_count(); print(f"当前脚本模式 {cli_options.get('mode')}"); print(f"fast -此模式快速,不会重新拉取商品列表,直接进行购买操作"); print(f"normal-此模式正常,每次重新拉取商品列表,再进行购买操作"); print(f"多线程检查: CPU支持 {cpu_threads} 个线程/核心"); activities_data = read_json('elife_activities_data.json'); accout_data = read_csv('elife_accout_data.csv'); if cpu_threads > 1: # 创建进程池 manager = Manager(); # 创建共享命名空间 shared_namespace = manager.Namespace(); shared_namespace.g_shared_cache = g_shared_cache; shared_namespace.cli_options = cli_options; pool = multiprocessing.Pool(maxtasksperchild=1); results = []; index = 0; for item in accout_data: enable = int(item.get('enable')); if enable != 1: print('########账号[%s]抢券功能关闭########' % item['account']); continue; # result = pool.apply_async(thread_worker_func,(index, shared_namespace, item, activities_data,)); # try: # print(result.get(timeout=3)); # except multiprocessing.TimeoutError: # # pool.terminate(); # print('########账号[%s]抢券执行超时########' % item['account']); # results.append(result); abortable_func = partial(thread_daemon, thread_worker_func, timeout=60); pool.apply_async(abortable_func, args=[index, shared_namespace, item, activities_data,], callback=collect_result); index += 1; # 关闭进程池,不再接受新的任务 pool.close(); # 等待所有任务完成 pool.join(); # for result in results: # print(result.get()); else: for item in accout_data: enable = int(item.get('enable')); if enable != 1: print('########账号[%s]抢券功能关闭########' % item['account']); continue; print('########账号[%s]开始抢券工作########' % item['account']); grabber = OfpayGrabber(item, activities_data, cli_options); grabber.start(); print('########################################'); if __name__ == "__main__": parser = argparse.ArgumentParser(); parser.add_argument('-m', '--mode', type=str,default='fast', help='mode(normal,fast)'); parser.add_argument('-cs', '--check_stock', type=str,default='1', help='check stock'); parser.add_argument('-cr', '--check_repeat', type=str,default='1', help='check repeat'); args = parser.parse_args(); args = vars(args); args_key_map = { 'ELIFE_OFPAY_GRAB_MODE': 'mode', 'ELIFE_OFPAY_CHECK_STOCK': 'check_stock', 'ELIFE_OFPAY_CHECK_REPEAT': 'check_repeat', }; for key in args_key_map: value = os.getenv(key); if value != None: mkey = args_key_map[key]; args[mkey] = value; main(args);