# -*- coding: utf-8 -*- import os import requests import json import csv import base64 from datetime import datetime, timezone, timedelta import pymysql def base64url_decode(base64_str): size = len(base64_str) % 4 if size == 2: base64_str += '==' elif size == 3: base64_str += '=' elif size != 0: raise ValueError('Invalid base64 string') return base64.urlsafe_b64decode(base64_str.encode('utf-8')) def parse_jwt(jwt_token): jwt_token_list = jwt_token.split('.') header = base64url_decode(jwt_token_list[0]).decode() payload = base64url_decode(jwt_token_list[1]).decode() return { 'header': json.loads(header), 'payload': json.loads(payload), 'signature': jwt_token_list[-1] } def read_csv(filename='results.csv'): data = [] try: with open(filename, mode='r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile); for row in reader: data.append(row); except IOError as e: print(f"read_csv error occurred: {e}"); # print(data); return data; def read_json(filename='results.json'): data = None; try: with open(filename, 'r', encoding='utf-8') as file: data = json.load(file); except IOError as e: print(f"read_json error occurred: {e}"); # print(data); return data; def seconds_to_beijing_time(seconds): utc_time = datetime.fromtimestamp(seconds, tz=timezone.utc); beijing_time = utc_time.astimezone(timezone(timedelta(hours=8))); formatted_time = beijing_time.strftime('%Y-%m-%d %H:%M:%S'); return formatted_time; class OfpayChecker: def __init__(self, accout_data): self.force_refresh = True; self.accout_data = accout_data; self.host = 'market-web.ofpay.com'; self.market_id = accout_data['market_id']; self.event_visitor_id = accout_data['event_visitor_id']; self.accout_id = accout_data['account']; uuid = accout_data['uuid']; user_agent = accout_data['user_agent']; authorization = accout_data['authorization']; self.cookies = eval(accout_data['cookies']); cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]); self.headers = { 'Host': self.host, 'UUID': uuid, 'Accept': '*/*', 'Sec-Fetch-Site': 'same-origin', 'Origin': 'https://market-web.ofpay.com', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Sec-Fetch-Mode': 'cors', 'Content-Type': 'application/json; charset=utf-8', 'Connection': 'keep-alive', 'User-Agent': user_agent, 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Authorization': authorization, 'Sec-Fetch-Dest': 'empty', 'Referer': 'https://market-web.ofpay.com/h5/union/standard/interactiveIGoChoose/index', 'Cookie': cookies_str, }; self.connect_mysql(); def connect_mysql(self): config = { 'host':'47.106.225.136', 'port':3306, 'user':'root', 'passwd':'sjojo123456', 'database':'mitmproxy', 'charset':'utf8' }; db_conn = None; while True: try: db_conn = pymysql.connect(**config); db_conn.ping(reconnect=True); except pymysql.OperationalError as e: print(e); print('连接断开,正在尝试重新连接...'); if db_conn: db_conn.close(); db_conn = pymysql.connect(**config); time.sleep(1); else: break; self.db_conn = db_conn; def check_mysql_connect(self): try: with self.db_conn.cursor() as cursor: cursor.execute('SELECT 1'); except pymysql.MySQLError as e: print(e); self.db_conn.close(); print('mysql重连...'); self.connect_mysql(); def check_refresh_token(self, force_refresh=False): expire_time_str = self.accout_data['expire_time']; expire_dt = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S") now_dt = datetime.now(); is_valid = False; if now_dt < expire_dt: is_valid = True; if not force_refresh and is_valid: return 0; login_params = self.accout_data['login_params']; url = f'https://{self.host}/h5/union/interactiveIGoChoose/index?loginParams={login_params}'; response = self.get_request(url); if response.status_code != 200: return -2; # print(response.content); # cookie_dict = dict(response.cookies); cookie_dict = {} for cookie in response.cookies: if cookie.name: key = cookie.name; cookie_dict[key] = cookie.value; authorization = None; for key in cookie_dict: value = cookie_dict[key]; if not value: continue; self.cookies[key] = value; if key == 'unionToken_interactiveIGoChoose': authorization = self.cookies[key]; if authorization: self.headers['Authorization'] = authorization; cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]); self.headers['Cookie'] = cookies_str; return 1; return -1; def start(self): ret_code = self.check_refresh_token(self.force_refresh); if ret_code == 1: self.sync_new_token(); print('刷新token成功'); elif ret_code == 0: print('无需刷新token'); else: print('刷新token失败'); def sync_new_token(self): account = self.accout_id; if not account: return; authorization = self.headers['Authorization']; cookies = self.cookies; sign_time = None expire_time = None; try: jwt_data = parse_jwt(authorization); if not jwt_data: return; payload = jwt_data['payload']; if 'customerInfo' in payload: info_str = payload['customerInfo']; customer_info = json.loads(info_str); payload['customerInfo'] = customer_info; account = customer_info['phone']; sign_time = seconds_to_beijing_time(payload['iat']); expire_time = seconds_to_beijing_time(payload['exp']); except Exception as e: print(e); try: sql_query = f''' UPDATE elife_account_data SET authorization = %s, cookies = %s, update_time = %s, expire_time = %s WHERE account = %s; '''; sql_params = (authorization, repr(cookies), sign_time, expire_time, account); # print(sql_params); self.check_mysql_connect(); cursor = self.db_conn.cursor(); cursor.execute(sql_query, sql_params); self.db_conn.commit(); cursor.close(); except pymysql.OperationalError as e: print(e); def get_request(self, url, params=None): response = requests.get(url, headers=self.headers, params=params, cookies=self.cookies); return response; def post_request(self, url, data): response = requests.post(url, data=data, headers=self.headers, cookies=self.cookies); return response; def main(): accout_data = read_csv('elife_accout_data.csv'); for item in accout_data: print('########账号[%s]开始检查刷新Token########' % item['account']); checker = OfpayChecker(item); checker.start(); print('########################################'); if __name__ == "__main__": main();