123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # -*- coding: utf-8 -*-
- import os
- from datetime import datetime
- import pymysql
- import csv
- import json
- from decimal import Decimal
- # 自定义 JSON 编码器
- class CustomEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, Decimal):
- return str(obj);
- elif isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S');
- return super().default(obj);
- class OfpayDataSyncer:
- def __init__(self):
- self.db_conn = None;
- self.connect_mysql();
- def connect_mysql(self):
- config = {
- 'host':'47.106.225.136',
- 'port':3306,
- 'user':'root',
- 'passwd':'sjojo123456',
- 'database':'mitmproxy',
- 'charset':'utf8'
- };
- db_conn = None;
- while True:
- try:
- db_conn = pymysql.connect(**config);
- db_conn.ping(reconnect=True);
- except pymysql.OperationalError as e:
- print(e);
- print('连接断开,正在尝试重新连接...');
- if db_conn:
- db_conn.close();
- db_conn = pymysql.connect(**config);
- time.sleep(1);
- else:
- break;
- self.db_conn = db_conn;
- def sync_account(self, day=0, filename='results.csv'):
- sql_query = f'''
- SELECT * FROM elife_account_data WHERE update_time >= CURDATE() - {day} AND update_time < CURDATE() + INTERVAL 1 DAY;
- ''';
- cursor = self.db_conn.cursor();
- cursor.execute(sql_query);
- results = cursor.fetchall();
- if not results:
- print('同步用户数据失败');
- return
- print(f'同步用户数据成功,记录条数:{len(results)}');
- suffix = os.path.splitext(filename)[-1];
- if suffix == '.csv' or suffix == '':
- filepath = filename if suffix != '' else filename+'.csv';
- with open(filepath, 'w', newline='', encoding='utf-8') as f:
- writer = csv.writer(f);
- if results:
- headers = [desc[0] for desc in cursor.description];
- writer.writerow(headers);
- for row in results:
- writer.writerow(row);
- if suffix == '.json' or suffix == '':
- filepath = filename if suffix != '' else filename+'.json';
- data_list = [];
- column_names = [desc[0] for desc in cursor.description];
- for row in results:
- data_dict = dict(zip(column_names, row));
- # update_time = data_dict['update_time'];
- # if isinstance(update_time, datetime):
- # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
- # data_dict['update_time'] = formatted_date;
- data_list.append(data_dict);
- with open(filepath, 'w', newline='', encoding='utf-8') as f:
- json_str = json.dumps(data_list, cls=CustomEncoder, ensure_ascii=False);
- f.write(json_str);
- def sync_activities(self, filename='results.json'):
- sql_query = f'''
- SELECT * FROM elife_activities WHERE groupId = '1' GROUP BY sortNum;
- ''';
- cursor = self.db_conn.cursor();
- cursor.execute(sql_query);
- results = cursor.fetchall();
- if not results:
- print('同步活动数据失败');
- return
- print('同步活动数据进度1/2');
- activity_list = [];
- activity_dict = {};
- column_names = [desc[0] for desc in cursor.description];
- activity_ids = [];
- for row in results:
- data_dict = dict(zip(column_names, row));
- activity_id = data_dict['activityId'];
- # update_time = data_dict['updateTime'];
- # if isinstance(update_time, datetime):
- # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
- # data_dict['updateTime'] = formatted_date;
- activity_list.append(data_dict);
- activity_ids.append(activity_id);
- activity_dict[activity_id] = data_dict;
- sql_query = f'''
- SELECT * FROM elife_activity_awards WHERE activityId in %s;
- ''';
- sql_query = sql_query % repr(tuple(activity_ids));
- cursor.execute(sql_query);
- aw_results = cursor.fetchall();
- print('同步活动数据进度2/2');
- column_names = [desc[0] for desc in cursor.description];
- for row in aw_results:
- data_dict = dict(zip(column_names, row));
- activity_id = data_dict['activityId'];
- # update_time = data_dict['updateTime'];
- # if isinstance(update_time, datetime):
- # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
- # data_dict['updateTime'] = formatted_date;
- if activity_id in activity_dict:
- entity = activity_dict[activity_id];
- if 'awardList' not in entity:
- entity['awardList'] = [];
- entity['awardList'].append(data_dict);
- # print(activity_ids);
- # print(sql_query);
- # print(activity_list);
- with open(filename, 'w', newline='', encoding='utf-8') as f:
- json_str = json.dumps(activity_list, cls=CustomEncoder, ensure_ascii=False);
- f.write(json_str);
- print('同步活动数据成功');
- def read_csv(self, filename='results.csv'):
- data = []
- try:
- with open(filename, mode='r', encoding='utf-8') as csvfile:
- reader = csv.DictReader(csvfile);
- for row in reader:
- data.append(row);
- except IOError as e:
- print(f"read_csv error occurred: {e}");
- # print(data);
- return data;
- def read_json(self, filename='results.json'):
- data = None;
- try:
- with open(filename, 'r', encoding='utf-8') as file:
- data = json.load(file);
- except IOError as e:
- print(f"read_json error occurred: {e}");
- # print(data);
- return data;
- def main():
- syncer = OfpayDataSyncer();
- syncer.sync_activities('elife_activities_data.json');
- # syncer.read_json('elife_activities_data.json');
- syncer.sync_account(1, 'elife_accout_data');
- # syncer.read_csv('elife_accout_data.csv');
- # syncer.read_json('elife_accout_data.json');
- if __name__ == "__main__":
- main();
|