# -*- coding: utf-8 -*- import os import re import sys import time import pymysql import logging # import jwt import json from datetime import datetime, timezone, timedelta from mitmproxy import flowfilter from mitmproxy import http from mitmproxy import ctx # from http.cookies import SimpleCookie sys.path.append('../') import utils.Utils as Utils sys.path.pop() """ #http.HTTPFlow 实例 flow flow.request.http_version #HTTP 版本 flow.request.headers #获取所有头信息,包含Host、User-Agent、Content-type等字段 flow.request.cookies #cookie头 flow.request.url #完整的请求地址,包含域名及请求参数,但是不包含放在body里面的请求参数 flow.request.pretty_url #同flow.request.url目前没看出什么差别 flow.request.host #域名 flow.request.port #请求的目标端口 flow.request.method #请求方式。POST、GET等 flow.request.scheme #什么请求 ,如https flow.request.path # 请求的路径,url除域名之外的内容 flow.request.get_text() #请求中body内容,有一些http会把请求参数放在body里面,那么可通过此方法获取,返回字典类型 flow.request.replace() # 使用正则替换content中的内容 flow.request.query #返回MultiDictView类型的数据,url直接带的键值参数 flow.request.get_content()#bytes,结果如flow.request.get_text() flow.request.raw_content #bytes,结果如flow.request.get_content() flow.request.urlencoded_form #MultiDictView,content-type:application/x-www-form-urlencoded时的请求参数,不包含url直接带的键值参数 flow.request.multipart_form #MultiDictView,content-type:multipart/form-data flow.request.timestamp_start #请求开始的时间戳 flow.request.timestamp_end #请求结束的时间戳 时的请求参数,不包含url直接带的键值参数 #以上均为获取request信息的一些常用方法,对于response,同理 flow.response.status_code #状态码 flow.response.headers #获取所有头信息 flow.response.cookies #cookie头 flow.response.text#返回内容,已解码 flow.response.content #返回内容,二进制 flow.response.set_text() #修改返回内容,不需要转码 flow.response.replace() # 使用正则替换content中的内容 flow.response.timestamp_start #响应开始的时间戳 flow.response.timestamp_end #响应结束的时间戳 """ class OfpayHelper: order_simple_data = { "awardId": "W1155090378949787660", "activityId": "A923605206137307136", "activityName": "采集", "activityState": "2", "activityStartTime": "2022-01-01 00:00", "activityEndTime": "2888-12-31 23:59", "businessType": "4005", "outActivityCode": "eCoffee", "mobile": "", "prizeId": "sku14117", "prizeName": "数据采集成功", "prizeAlias": "", "prizeDesc": "", "prizeDescUrl": "https://mstatic.ofpay.com/marketing/upload/ca2ed3a05b2846b7909debf2df8e3495.png", "prizeBannerUrl": "https://mstatic.ofpay.com/marketing/upload/c4d1a0b94b50462eb0f040306a9badf4.png", "categoryId": "1", "rechargeType": "09", "goodsScene": "0", "goodsList": [], "orderNum": 1, "createTime": "", "imgUrl": "https://mstatic.ofpay.com/marketing/upload/fc0cc0a86db64f638d4e193913d7efea.png", "orderStatus": "3", "detailId": "T123456789", "clientAccount": "13430389115", "redeemCode": "", "redeemCodeStatus": "", "dynamicCodeSign": "1", "startEffectTime": "", "endEffectTime": "", "toExpireFlag": "0", "faceVal": "", "orderId": "T240311090006428", "tenantId": "0000000191", "price": "30", "awardPrice": "20.8", "salePrice": "20.8", "rechargeId": "R1216679598197055488", "rechargeTime": "2024-01-01 00:00:00", "payStatus": "2", "discountPrice": "", "activityPrice": "", "customerInfo": "{\"device_id\":\"D29ED082-549A-4882-98FC-8BB881D1552B\",\"loginType\":\"interactiveIGoChoose\",\"gameAccount\":\"13430389115\",\"city_code\":\"440100\",\"cisno\":\"ZbHv0CEM2cGjx0DB9DXVJg==\",\"isNewUser\":\"0\",\"marketId\":\"M923156289016692736\",\"city_name\":\"广州市\",\"phone\":\"13430389115\",\"fromEntry\":\"APP\",\"currentTimeMillis\":\"1710119786257\",\"userUuid\":\"Pfd6kjTSmjCfQ8boswe1PpAmfgZW0acz\",\"cust_id\":\"Pfd6kjTSmjCfQ8boswe1PpAmfgZW0acz\",\"invitationCode\":\"BGCKWC\"}", "callbackOrder": "", "activityRechargeEffectStartTime": "", "activityRechargeEffectEndTime": "", "accountType": "", "payFlag": "1", "activityPayFlag": True, "thirdInfo": "{\"faceValue\":\"30.00\",\"customGatewayId\":\"ZDY_ICBC_ZJWN\",\"showSign\":\"1\",\"xcxShowSign\":\"2\",\"order\":\"28\",\"toBPrice\":\"30.00\",\"showPhone\":\"1\",\"pointActivity\":\"HD0460132E7oLMG1mH\",\"stockShowSign\":\"2\"}", "vendorVoucher": "", "productUseMsg": "", "proof": "", "amount": 1, "parentActivityNo": "", "parentDetailId": "", "subOrderExt": "{\"orderStatus\":\"\",\"payStatus\":\"\"}", "logisticsNo": "", "company": "", "promoteId": "", "version": 1, "gateWayId": "", "payType": "", "needRechargeNum": "0" } def __init__(self): self.domain_name = 'market-web.ofpay.com'; self.host_ip = None; ip_address = Utils.get_ip_address(self.domain_name); if ip_address: self.host_ip = ip_address; self.db_conn = None; self.connect_mysql(); def connect_mysql(self): config = { 'host':'47.106.225.136', 'port':3306, 'user':'root', 'passwd':'sjojo123456', 'database':'mitmproxy', 'charset':'utf8' }; db_conn = None; while True: try: db_conn = pymysql.connect(**config); db_conn.ping(reconnect=True); except pymysql.OperationalError as e: print(e); print('连接断开,正在尝试重新连接...'); if db_conn: db_conn.close(); db_conn = pymysql.connect(**config); time.sleep(1); else: break; self.db_conn = db_conn; def check_mysql_connect(self): try: with self.db_conn.cursor() as cursor: cursor.execute('SELECT 1'); except pymysql.MySQLError as e: print(e); self.db_conn.close(); print('mysql重连...'); self.connect_mysql(); def check_host_pass(self, host): if self.host_ip: if host != self.host_ip and host != self.domain_name: return False; else: if host != self.domain_name: return False; return True; def request(self, flow: http.HTTPFlow): if not self.check_host_pass(flow.request.host): return; url = flow.request.url; path = flow.request.path; request = flow.request; def response(self, flow: http.HTTPFlow): if not self.check_host_pass(flow.request.host): return; url = flow.request.url; path = flow.request.path; print("###[OfpayHelper]path=%s"%path); if path.startswith('/h5/union/interactiveIGoChoose/index'): self.handle_login(flow); elif path.startswith('/h5/union/api/interactiveIGoChoose/indexConfigRebuild'): self.handle_activitylist(flow); elif path.startswith('/h5/union/api/interactiveIGoChoose/orderList'): self.handle_orderlist(flow); def get_jwt_token_data(self, flow: http.HTTPFlow): request = flow.request; headers = dict(request.headers); jwt_data = None; try: jwt_str = None; if 'Authorization' in headers: jwt_str = headers['Authorization']; else: cookies = dict(request.cookies); if 'unionToken_interactiveIGoChoose' in cookies: jwt_str = cookies['unionToken_interactiveIGoChoose']; if jwt_str: jwt_data = Utils.parse_jwt(jwt_str); except jwt.PyJWTError as e: print('jwt token解析失败'); else: pass finally: pass if jwt_data: payload = jwt_data['payload']; if 'customerInfo' in payload: info_str = payload['customerInfo']; customer_info = json.loads(info_str); payload['customerInfo'] = customer_info; return jwt_data; def handle_login(self, flow: http.HTTPFlow): ctx.log.info('###handle_login###'); request = flow.request; response = flow.response; if not response.cookies: return; jwt_data = self.get_jwt_token_data(flow); if not jwt_data: return; payload = jwt_data['payload']; account = None; if 'customerInfo' not in payload: account = payload['customerInfo']['phone']; login_params = flow.request.query.get('loginParams'); cookies = dict(request.cookies); for name, morsel in response.cookies.items(): value = morsel[0]; if not value: continue; cookies[name] = value; # # 获取所有的Set-Cookie头部 # set_cookie_headers = flow.response.headers.get_all("Set-Cookie") # for cookie_header in set_cookie_headers: # cookie = SimpleCookie(); # cookie.load(cookie_header); # # SimpleCookie对象可以像字典一样工作 # for key, morsel in cookie.items(): # # 这里可以添加进一步的逻辑来处理cookie的键和值 # # 例如,可以检查cookie的过期时间,路径等属性 # print("Attributes:", morsel); authorization = cookies['unionToken_interactiveIGoChoose']; sign_time = None; expire_time = None; try: jwt_data = Utils.parse_jwt(authorization); if not jwt_data: return; payload = jwt_data['payload']; if 'customerInfo' in payload: info_str = payload['customerInfo']; customer_info = json.loads(info_str); payload['customerInfo'] = customer_info; account = customer_info['phone']; sign_time = Utils.seconds_to_beijing_time(payload['iat']); expire_time = Utils.seconds_to_beijing_time(payload['exp']); except Exception as e: print(e); if not account: return; try: sql_query = f''' UPDATE elife_account_data SET authorization = %s, cookies = %s, update_time = %s, expire_time = %s, login_params = %s WHERE account = %s; '''; sql_params = (authorization, repr(cookies), sign_time, expire_time, login_params, account); # print(sql_params); self.check_mysql_connect(); cursor = self.db_conn.cursor(); cursor.execute(sql_query, sql_params); self.db_conn.commit(); cursor.close(); except pymysql.OperationalError as e: print(e); def handle_activitylist(self, flow: http.HTTPFlow): ctx.log.info('###handle_activitylist###'); request = flow.request; response = flow.response; jwt_data = self.get_jwt_token_data(flow); if not jwt_data: return; payload = jwt_data['payload']; if 'customerInfo' not in payload: return; account = payload['customerInfo']['phone']; if not account: return; if account != '13430389115' or account != '17607571132': return; rsp_params = json.loads(response.get_text()); if rsp_params['code'] != 'success': return; rsp_data = rsp_params['data']; sql_activity_query = f''' CALL UpdateElifeActivities(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); '''; sql_activity_params = []; sql_award_query = f''' CALL UpdateElifeActivityAwards(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); '''; sql_award_params = []; activity_list = rsp_params['data']; activity_sort_num = 0; for activity_info in activity_list: sql_activity_params.append(('1', activity_info['activityId'], activity_info['activityAlias'], activity_info['outActivityCode'], activity_info['activityTitle'], activity_info['activityState'], activity_info['activityIcon'], activity_info['activityLink'], activity_info['activityBanner'], activity_info['subLoginType'], activity_info['subActivityId'] if 'subActivityId' in activity_info else '', activity_info['activityDesc'], activity_sort_num)); activity_sort_num += 1; award_list = activity_info['awardList']; for award_info in award_list: # print(award_info) sql_award_params.append(( award_info['awardId'], award_info['prizeName'], award_info['activityId'], award_info['prizeId'], award_info['prizeDesc'], award_info['prizeBannerUrl'], award_info['prizeDescUrl'], award_info['imgUrl'], int(award_info['stockNum']), float(award_info['price']), award_info['categoryType'], award_info['goodsScene'], award_info['rechargeType'], int(award_info['useStock']), int(award_info['remainStock']), int(award_info['cycleStock']), int(award_info['cycleRemainStock']), int(award_info['orderNum']), award_info['payFlag'], award_info['thirdInfo'], award_info['awardType'], int(award_info['firstSignAward']), int(award_info['renewSignAward']), award_info['prizeAlias'] if 'prizeAlias' in award_info else '', award_info['parentAwardId'] if 'parentAwardId' in award_info else '')); try: self.check_mysql_connect(); cursor = self.db_conn.cursor(); cursor.executemany(sql_activity_query, sql_activity_params); cursor.executemany(sql_award_query, sql_award_params); self.db_conn.commit(); cursor.close(); except pymysql.OperationalError as e: print(e); def handle_orderlist(self, flow: http.HTTPFlow): ctx.log.info('###handle_orderlist###'); request = flow.request; response = flow.response; cookies = dict(request.cookies); # 转换cookies格式为dict # if 'unionToken_interactiveIGoChoose' not in cookies: # return; # account = None; # try: # jwt_str = cookies['unionToken_interactiveIGoChoose']; # # payload = jwt.decode(jwt_str, '', algorithms=['HS256'], verify=False, options={'verify_signature':False}); # # info_str = payload.get('customerInfo'); # # 不依赖库,简单方法解析 # jwt_data = Utils.parse_jwt(jwt_str); # if jwt_data: # payload = jwt_data['payload']; # info_str = payload['customerInfo']; # customer_info = json.loads(info_str); # account = customer_info['phone']; # except jwt.PyJWTError as e: # print('jwt token解析失败'); # else: # pass # finally: # pass jwt_data = self.get_jwt_token_data(flow); if not jwt_data: return; payload = jwt_data['payload']; if 'customerInfo' not in payload: return; account = payload['customerInfo']['phone']; if not account: return; headers = dict(request.headers); uuid = headers['UUID']; authorization = headers['Authorization']; user_agent = headers['User-Agent']; market_id = request.query.get('marketId'); event_visitor_id = request.query.get('eventVisitorId'); clientAccount = account if account else '13400000000'; # create_time = '2024-01-01 00:00:00'; create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S'); capture_code = Utils.generate_random_code(6); simple_data = OfpayHelper.order_simple_data; simple_data['prizeDesc'] = capture_code; simple_data['createTime'] = create_time; simple_data['endEffectTime'] = create_time; # simple_data['rechargeTime'] = create_time; simple_data['clientAccount'] = clientAccount; rsp_params = json.loads(response.get_text()); if rsp_params['code'] == 'success': rsp_data = rsp_params['data']; rsp_data['list'].insert(0, simple_data); update_time = create_time; sign_time = Utils.seconds_to_beijing_time(payload['iat']); expire_time = Utils.seconds_to_beijing_time(payload['exp']); simple_data['createTime'] = expire_time; simple_data['endEffectTime'] = expire_time; # simple_data['rechargeTime'] = expire_time; sql_query = f''' CALL UpdateElifeAccountData(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); '''; sql_params = (account, uuid, authorization, repr(cookies), user_agent, market_id, event_visitor_id, capture_code , update_time, expire_time); try: self.check_mysql_connect(); cursor = self.db_conn.cursor(); cursor.execute(sql_query, sql_params); self.db_conn.commit(); cursor.close(); simple_data['prizeName'] = '数据采集成功'; except pymysql.OperationalError as e: print(e); simple_data['prizeName'] = '数据采集失败'; simple_data['prizeDesc'] = ''; response.set_text(json.dumps(rsp_params));