OfpayDataSync.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # -*- coding: utf-8 -*-
  2. import os
  3. from datetime import datetime
  4. import pymysql
  5. import csv
  6. import json
  7. from decimal import Decimal
  8. # 自定义 JSON 编码器
  9. class CustomEncoder(json.JSONEncoder):
  10. def default(self, obj):
  11. if isinstance(obj, Decimal):
  12. return str(obj);
  13. elif isinstance(obj, datetime):
  14. return obj.strftime('%Y-%m-%d %H:%M:%S');
  15. return super().default(obj);
  16. class OfpayDataSyncer:
  17. def __init__(self):
  18. self.db_conn = None;
  19. self.connect_mysql();
  20. def connect_mysql(self):
  21. config = {
  22. 'host':'47.106.225.136',
  23. 'port':3306,
  24. 'user':'root',
  25. 'passwd':'sjojo123456',
  26. 'database':'mitmproxy',
  27. 'charset':'utf8'
  28. };
  29. db_conn = None;
  30. while True:
  31. try:
  32. db_conn = pymysql.connect(**config);
  33. db_conn.ping(reconnect=True);
  34. except pymysql.OperationalError as e:
  35. print(e);
  36. print('连接断开,正在尝试重新连接...');
  37. if db_conn:
  38. db_conn.close();
  39. db_conn = pymysql.connect(**config);
  40. time.sleep(1);
  41. else:
  42. break;
  43. self.db_conn = db_conn;
  44. def sync_account(self, day=0, filename='results.csv'):
  45. sql_query = f'''
  46. SELECT * FROM elife_account_data WHERE update_time >= CURDATE() - {day} AND update_time < CURDATE() + INTERVAL 1 DAY;
  47. ''';
  48. cursor = self.db_conn.cursor();
  49. cursor.execute(sql_query);
  50. results = cursor.fetchall();
  51. if not results:
  52. print('同步用户数据失败');
  53. return
  54. print(f'同步用户数据成功,记录条数:{len(results)}');
  55. suffix = os.path.splitext(filename)[-1];
  56. if suffix == '.csv' or suffix == '':
  57. filepath = filename if suffix != '' else filename+'.csv';
  58. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  59. writer = csv.writer(f);
  60. if results:
  61. headers = [desc[0] for desc in cursor.description];
  62. writer.writerow(headers);
  63. for row in results:
  64. writer.writerow(row);
  65. if suffix == '.json' or suffix == '':
  66. filepath = filename if suffix != '' else filename+'.json';
  67. data_list = [];
  68. column_names = [desc[0] for desc in cursor.description];
  69. for row in results:
  70. data_dict = dict(zip(column_names, row));
  71. # update_time = data_dict['update_time'];
  72. # if isinstance(update_time, datetime):
  73. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  74. # data_dict['update_time'] = formatted_date;
  75. data_list.append(data_dict);
  76. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  77. json_str = json.dumps(data_list, cls=CustomEncoder, ensure_ascii=False);
  78. f.write(json_str);
  79. def sync_activities(self, filename='results.json'):
  80. sql_query = f'''
  81. SELECT * FROM elife_activities WHERE groupId = '1' GROUP BY sortNum;
  82. ''';
  83. cursor = self.db_conn.cursor();
  84. cursor.execute(sql_query);
  85. results = cursor.fetchall();
  86. if not results:
  87. print('同步活动数据失败');
  88. return
  89. print('同步活动数据进度1/2');
  90. activity_list = [];
  91. activity_dict = {};
  92. column_names = [desc[0] for desc in cursor.description];
  93. activity_ids = [];
  94. for row in results:
  95. data_dict = dict(zip(column_names, row));
  96. activity_id = data_dict['activityId'];
  97. # update_time = data_dict['updateTime'];
  98. # if isinstance(update_time, datetime):
  99. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  100. # data_dict['updateTime'] = formatted_date;
  101. activity_list.append(data_dict);
  102. activity_ids.append(activity_id);
  103. activity_dict[activity_id] = data_dict;
  104. sql_query = f'''
  105. SELECT * FROM elife_activity_awards WHERE activityId in %s;
  106. ''';
  107. sql_query = sql_query % repr(tuple(activity_ids));
  108. cursor.execute(sql_query);
  109. aw_results = cursor.fetchall();
  110. print('同步活动数据进度2/2');
  111. column_names = [desc[0] for desc in cursor.description];
  112. for row in aw_results:
  113. data_dict = dict(zip(column_names, row));
  114. activity_id = data_dict['activityId'];
  115. # update_time = data_dict['updateTime'];
  116. # if isinstance(update_time, datetime):
  117. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  118. # data_dict['updateTime'] = formatted_date;
  119. if activity_id in activity_dict:
  120. entity = activity_dict[activity_id];
  121. if 'awardList' not in entity:
  122. entity['awardList'] = [];
  123. entity['awardList'].append(data_dict);
  124. # print(activity_ids);
  125. # print(sql_query);
  126. # print(activity_list);
  127. with open(filename, 'w', newline='', encoding='utf-8') as f:
  128. json_str = json.dumps(activity_list, cls=CustomEncoder, ensure_ascii=False);
  129. f.write(json_str);
  130. print('同步活动数据成功');
  131. def read_csv(self, filename='results.csv'):
  132. data = []
  133. try:
  134. with open(filename, mode='r', encoding='utf-8') as csvfile:
  135. reader = csv.DictReader(csvfile);
  136. for row in reader:
  137. data.append(row);
  138. except IOError as e:
  139. print(f"read_csv error occurred: {e}");
  140. # print(data);
  141. return data;
  142. def read_json(self, filename='results.json'):
  143. data = None;
  144. try:
  145. with open(filename, 'r', encoding='utf-8') as file:
  146. data = json.load(file);
  147. except IOError as e:
  148. print(f"read_json error occurred: {e}");
  149. # print(data);
  150. return data;
  151. def main():
  152. syncer = OfpayDataSyncer();
  153. syncer.sync_activities('elife_activities_data.json');
  154. # syncer.read_json('elife_activities_data.json');
  155. syncer.sync_account(1, 'elife_accout_data');
  156. # syncer.read_csv('elife_accout_data.csv');
  157. # syncer.read_json('elife_accout_data.json');
  158. if __name__ == "__main__":
  159. main();