123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- # -*- coding: utf-8 -*-
- import os
- import requests
- import json
- import csv
- import base64
- from datetime import datetime, timezone, timedelta
- import pymysql
- def base64url_decode(base64_str):
- size = len(base64_str) % 4
- if size == 2:
- base64_str += '=='
- elif size == 3:
- base64_str += '='
- elif size != 0:
- raise ValueError('Invalid base64 string')
- return base64.urlsafe_b64decode(base64_str.encode('utf-8'))
- def parse_jwt(jwt_token):
- jwt_token_list = jwt_token.split('.')
- header = base64url_decode(jwt_token_list[0]).decode()
- payload = base64url_decode(jwt_token_list[1]).decode()
- return {
- 'header': json.loads(header),
- 'payload': json.loads(payload),
- 'signature': jwt_token_list[-1]
- }
- def read_csv(filename='results.csv'):
- data = []
- try:
- with open(filename, mode='r', encoding='utf-8') as csvfile:
- reader = csv.DictReader(csvfile);
- for row in reader:
- data.append(row);
- except IOError as e:
- print(f"read_csv error occurred: {e}");
- # print(data);
- return data;
- def read_json(filename='results.json'):
- data = None;
- try:
- with open(filename, 'r', encoding='utf-8') as file:
- data = json.load(file);
- except IOError as e:
- print(f"read_json error occurred: {e}");
- # print(data);
- return data;
- def seconds_to_beijing_time(seconds):
- utc_time = datetime.fromtimestamp(seconds, tz=timezone.utc);
- beijing_time = utc_time.astimezone(timezone(timedelta(hours=8)));
- formatted_time = beijing_time.strftime('%Y-%m-%d %H:%M:%S');
- return formatted_time;
- class OfpayChecker:
- def __init__(self, accout_data):
- self.force_refresh = True;
- self.accout_data = accout_data;
- self.host = 'market-web.ofpay.com';
- self.market_id = accout_data['market_id'];
- self.event_visitor_id = accout_data['event_visitor_id'];
- self.accout_id = accout_data['account'];
- uuid = accout_data['uuid'];
- user_agent = accout_data['user_agent'];
- authorization = accout_data['authorization'];
- self.cookies = eval(accout_data['cookies']);
- cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]);
- self.headers = {
- 'Host': self.host,
- 'UUID': uuid,
- 'Accept': '*/*',
- 'Sec-Fetch-Site': 'same-origin',
- 'Origin': 'https://market-web.ofpay.com',
- 'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Sec-Fetch-Mode': 'cors',
- 'Content-Type': 'application/json; charset=utf-8',
- 'Connection': 'keep-alive',
- 'User-Agent': user_agent,
- 'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
- 'Authorization': authorization,
- 'Sec-Fetch-Dest': 'empty',
- 'Referer': 'https://market-web.ofpay.com/h5/union/standard/interactiveIGoChoose/index',
- 'Cookie': cookies_str,
- };
- self.connect_mysql();
- def connect_mysql(self):
- config = {
- 'host':'47.106.225.136',
- 'port':3306,
- 'user':'root',
- 'passwd':'sjojo123456',
- 'database':'mitmproxy',
- 'charset':'utf8'
- };
- db_conn = None;
- while True:
- try:
- db_conn = pymysql.connect(**config);
- db_conn.ping(reconnect=True);
- except pymysql.OperationalError as e:
- print(e);
- print('连接断开,正在尝试重新连接...');
- if db_conn:
- db_conn.close();
- db_conn = pymysql.connect(**config);
- time.sleep(1);
- else:
- break;
- self.db_conn = db_conn;
- def check_mysql_connect(self):
- try:
- with self.db_conn.cursor() as cursor:
- cursor.execute('SELECT 1');
- except pymysql.MySQLError as e:
- print(e);
- self.db_conn.close();
- print('mysql重连...');
- self.connect_mysql();
- def check_refresh_token(self, force_refresh=False):
- expire_time_str = self.accout_data['expire_time'];
- expire_dt = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S")
- now_dt = datetime.now();
- is_valid = False;
- if now_dt < expire_dt:
- is_valid = True;
- if not force_refresh and is_valid:
- return 0;
- login_params = self.accout_data['login_params'];
- url = f'https://{self.host}/h5/union/interactiveIGoChoose/index?loginParams={login_params}';
- response = self.get_request(url);
- if response.status_code != 200:
- return -2;
- # print(response.content);
- # cookie_dict = dict(response.cookies);
- cookie_dict = {}
- for cookie in response.cookies:
- if cookie.name:
- key = cookie.name;
- cookie_dict[key] = cookie.value;
- authorization = None;
- for key in cookie_dict:
- value = cookie_dict[key];
- if not value:
- continue;
- self.cookies[key] = value;
- if key == 'unionToken_interactiveIGoChoose':
- authorization = self.cookies[key];
- if authorization:
- self.headers['Authorization'] = authorization;
- cookies_str = '; '.join([f'{key}={value}' for key, value in self.cookies.items()]);
- self.headers['Cookie'] = cookies_str;
- return 1;
- return -1;
- def start(self):
- ret_code = self.check_refresh_token(self.force_refresh);
- if ret_code == 1:
- self.sync_new_token();
- print('刷新token成功');
- elif ret_code == 0:
- print('无需刷新token');
- else:
- print('刷新token失败');
- def sync_new_token(self):
- account = self.accout_id;
- if not account:
- return;
- authorization = self.headers['Authorization'];
- cookies = self.cookies;
- sign_time = None
- expire_time = None;
- try:
- jwt_data = parse_jwt(authorization);
- if not jwt_data:
- return;
- payload = jwt_data['payload'];
- if 'customerInfo' in payload:
- info_str = payload['customerInfo'];
- customer_info = json.loads(info_str);
- payload['customerInfo'] = customer_info;
- account = customer_info['phone'];
- sign_time = seconds_to_beijing_time(payload['iat']);
- expire_time = seconds_to_beijing_time(payload['exp']);
- except Exception as e:
- print(e);
- try:
- sql_query = f'''
- UPDATE elife_account_data
- SET
- authorization = %s,
- cookies = %s,
- update_time = %s,
- expire_time = %s
- WHERE account = %s;
- ''';
- sql_params = (authorization, repr(cookies), sign_time, expire_time, account);
- # print(sql_params);
- self.check_mysql_connect();
- cursor = self.db_conn.cursor();
- cursor.execute(sql_query, sql_params);
- self.db_conn.commit();
- cursor.close();
- except pymysql.OperationalError as e:
- print(e);
- def get_request(self, url, params=None):
- response = requests.get(url, headers=self.headers, params=params, cookies=self.cookies);
- return response;
- def post_request(self, url, data):
- response = requests.post(url, data=data, headers=self.headers, cookies=self.cookies);
- return response;
- def main():
- accout_data = read_csv('elife_accout_data.csv');
- for item in accout_data:
- print('########账号[%s]开始检查刷新Token########' % item['account']);
- checker = OfpayChecker(item);
- checker.start();
- print('########################################');
- if __name__ == "__main__":
- main();
|