# -*- coding: utf-8 -*- import os from datetime import datetime import pymysql import csv import json from decimal import Decimal # 自定义 JSON 编码器 class CustomEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return str(obj); elif isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S'); return super().default(obj); class OfpayDataSyncer: def __init__(self): self.db_conn = None; self.connect_mysql(); def connect_mysql(self): config = { 'host':'47.106.225.136', 'port':3306, 'user':'root', 'passwd':'sjojo123456', 'database':'mitmproxy', 'charset':'utf8' }; db_conn = None; while True: try: db_conn = pymysql.connect(**config); db_conn.ping(reconnect=True); except pymysql.OperationalError as e: print(e); print('连接断开,正在尝试重新连接...'); if db_conn: db_conn.close(); db_conn = pymysql.connect(**config); time.sleep(1); else: break; self.db_conn = db_conn; def sync_account(self, day=0, filename='results.csv'): sql_query = f''' SELECT * FROM elife_account_data WHERE update_time >= CURDATE() - {day} AND update_time < CURDATE() + INTERVAL 1 DAY; '''; cursor = self.db_conn.cursor(); cursor.execute(sql_query); results = cursor.fetchall(); if not results: print('同步用户数据失败'); return print(f'同步用户数据成功,记录条数:{len(results)}'); suffix = os.path.splitext(filename)[-1]; if suffix == '.csv' or suffix == '': filepath = filename if suffix != '' else filename+'.csv'; with open(filepath, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f); if results: headers = [desc[0] for desc in cursor.description]; writer.writerow(headers); for row in results: writer.writerow(row); if suffix == '.json' or suffix == '': filepath = filename if suffix != '' else filename+'.json'; data_list = []; column_names = [desc[0] for desc in cursor.description]; for row in results: data_dict = dict(zip(column_names, row)); # update_time = data_dict['update_time']; # if isinstance(update_time, datetime): # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S'); # data_dict['update_time'] = formatted_date; data_list.append(data_dict); with open(filepath, 'w', newline='', encoding='utf-8') as f: json_str = json.dumps(data_list, cls=CustomEncoder, ensure_ascii=False); f.write(json_str); def sync_activities(self, filename='results.json'): sql_query = f''' SELECT * FROM elife_activities WHERE groupId = '1' GROUP BY sortNum; '''; cursor = self.db_conn.cursor(); cursor.execute(sql_query); results = cursor.fetchall(); if not results: print('同步活动数据失败'); return print('同步活动数据进度1/2'); activity_list = []; activity_dict = {}; column_names = [desc[0] for desc in cursor.description]; activity_ids = []; for row in results: data_dict = dict(zip(column_names, row)); activity_id = data_dict['activityId']; # update_time = data_dict['updateTime']; # if isinstance(update_time, datetime): # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S'); # data_dict['updateTime'] = formatted_date; activity_list.append(data_dict); activity_ids.append(activity_id); activity_dict[activity_id] = data_dict; sql_query = f''' SELECT * FROM elife_activity_awards WHERE activityId in %s; '''; sql_query = sql_query % repr(tuple(activity_ids)); cursor.execute(sql_query); aw_results = cursor.fetchall(); print('同步活动数据进度2/2'); column_names = [desc[0] for desc in cursor.description]; for row in aw_results: data_dict = dict(zip(column_names, row)); activity_id = data_dict['activityId']; # update_time = data_dict['updateTime']; # if isinstance(update_time, datetime): # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S'); # data_dict['updateTime'] = formatted_date; if activity_id in activity_dict: entity = activity_dict[activity_id]; if 'awardList' not in entity: entity['awardList'] = []; entity['awardList'].append(data_dict); # print(activity_ids); # print(sql_query); # print(activity_list); with open(filename, 'w', newline='', encoding='utf-8') as f: json_str = json.dumps(activity_list, cls=CustomEncoder, ensure_ascii=False); f.write(json_str); print('同步活动数据成功'); def read_csv(self, filename='results.csv'): data = [] try: with open(filename, mode='r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile); for row in reader: data.append(row); except IOError as e: print(f"read_csv error occurred: {e}"); # print(data); return data; def read_json(self, filename='results.json'): data = None; try: with open(filename, 'r', encoding='utf-8') as file: data = json.load(file); except IOError as e: print(f"read_json error occurred: {e}"); # print(data); return data; def main(): syncer = OfpayDataSyncer(); syncer.sync_activities('elife_activities_data.json'); # syncer.read_json('elife_activities_data.json'); syncer.sync_account(1, 'elife_accout_data'); # syncer.read_csv('elife_accout_data.csv'); # syncer.read_json('elife_accout_data.json'); if __name__ == "__main__": main();