-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsync.py
224 lines (182 loc) · 8.16 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import requests
import json
import datetime
import time
import traceback
import sys
from config import config
from coinflex import CoinFlex
class History:
def __init__(self):
self.cf = CoinFlex(config['rest_url'], config['rest_path'], config['api_key'], config['api_secret'])
self.data = {}
# load endpoints.json
with open('endpoints.json', 'r') as file:
self.endpoints = json.load(file)
def loadFromFile(self, filename):
try:
with open(filename, 'r') as file:
self.data = json.load(file)
#print(f"data loaded from {filename}: ", json.dumps(self.data, indent=2))
except (FileNotFoundError, json.decoder.JSONDecodeError):
print(f"data file '{filename}' not found or not valid JSON data, starting from empty data")
def dumpToFile(self, filename):
with open(filename, "w") as outfile:
outfile.write(json.dumps(self.data, indent=1))
# sync "accountinfo" endpoint to self.data['accountinfo']
def sync_accountinfo(self):
# request accountinfo and add to data
print("requesting /v3/account...")
r = self.cf.request('/v3/account', {})
# print("accountinfo response", r)
# print("accountinfo response.content", r.content)
# print("accountinfo response.json", json.dumps(r.json(), indent=4))
self.data['accountinfo'] = r.json()
# sync data from enpoints given by enpoint_names to self.data
def sync_endpoints(self, endpoint_names):
t_now = int(time.time() * 1000)
for name in endpoint_names:
endpoint = self.endpoints[name]
self.sync_endpoint(endpoint, t_now)
# sync data from given enpoints self.data
def sync_endpoint(self, endpoint, t_now):
print(f"\n*** syncing endpoint {endpoint} ***\n")
name = endpoint['name']
time_field_name = endpoint['time_field_name']
# determine latest_t
if name not in self.data:
self.data[name] = {
'latest_t': None,
'data': []
}
if 'latest_t' in self.data[name] and self.data[name]['latest_t']:
latest_t = self.data[name]['latest_t']
print(f"{name}: using latest_t {latest_t} from data")
elif len(self.data[name]['data']) > 0 and time_field_name:
print(f"{name}: using {time_field_name} for latest_t")
latest_t = max(int(d[time_field_name]) for d in self.data[name]['data'])
else:
latest_t = config['t_account_start']
if latest_t > t_now:
latest_t = t_now
if 'items_config_var' not in endpoint:
items = ['<all>']
else:
items = config[endpoint['items_config_var']]
print(f"--- endpoint '{name}': syncing items {items} ---")
for item in items:
self.sync_endpoint_item(endpoint, item, t_now, latest_t)
# sync data specified by 'item' from given endpoint to self.data
def sync_endpoint_item(self, endpoint, item, t_now, latest_t):
name = endpoint['name']
limit = endpoint['limit']
path = endpoint['path'].format(name=name, item=item)
time_field_name = endpoint['time_field_name']
current_start_t = latest_t
current_period = endpoint['max_period']
print(f"\n--- syncing {name}, item {item}: latest_t = {latest_t} = {datetime.datetime.fromtimestamp(latest_t/1000)} ---\n")
received_data = None
finished = False
while not finished:
params = {
'limit': limit,
'startTime': int(current_start_t),
'endTime': int(current_start_t + current_period)
}
if "params" in endpoint:
params_to_add = endpoint['params']
for key in params_to_add.keys():
params[key] = params_to_add[key].format(name=name, item=item)
if params['endTime'] > t_now:
params['endTime'] = t_now
# fire request
print(f"requesting path {path} with params {params}")
r = self.cf.request(path, params)
#print("response", r)
if r.status_code != 200:
print(f"status_code {r.status_code}, content: {r.content}")
if r.status_code == 429: # rate limit hit
print(f" rate limit encountered, sleeping {endpoint['rate_limit_sleep_s']} seconds...")
time.sleep(endpoint['rate_limit_sleep_s'])
else:
raise Exception(f"HTTP Status Code {r.status_code}, aborting (will store data)")
else:
received_json = r.json()
# temporary hack to get around behaviour introduced 4/15 2022 that api throws error 20001 when there is no data
print("received_json", received_json)
if "success" in received_json and "code" in received_json and "message" in received_json:
print('looks like error response')
if received_json["success"] == False and received_json["code"] == "20001" and received_json["message"] == "result not found, please check your parameters":
print('special hack to ignore error code 20001')
received_json["data"] = []
if "data" not in received_json:
print("ERROR from api, response:")
print(json.dumps(received_json, indent=2))
else:
# pick out received_data
received_data = received_json["data"]
if "is_wallet_history" in endpoint and endpoint["is_wallet_history"]:
received_data = []
for d in received_json["data"]:
for item in d["walletHistory"]:
item["accountId"] = d["accountId"]
item["accountName"] = d["name"]
received_data.append(item)
print(f" requested {path} with {params}...")
#print("received_data: ", received_data)
# work around issue A6 removing redeem operations still in progess. This can be removed when A6 is fixed by coinflex
# NOTE: this introduces danger of missing a redeem that is still in progress in case startTime/endTime filter is on requestedAt.
# In case filter is on redeemetAt it should be fine
if name == 'redeem':
received_data = [d for d in received_data if d["redeemedAt"] != d["requestedAt"]]
if received_data == None:
print("no data received (not even empty), probably error")
print("response.json", json.dumps(r.json(), indent=4))
else:
print(f" received {len(received_data)}/{limit} items")
# adjust time interval parameters
if len(received_data) == limit: # limit was hit exactly
# try again with shorter period (exponential backoff)
current_period /= 2
print(f"limit hit, reducing current_period to {current_period} and trying again")
if current_period < 1:
sys.exit(f"current_period reduced to <1. Too many trades at that timestamp")
#sys.exit(f"limit hit, due to issue A5 we have to abort, consider reducing max_period in endpoints.json for endpoint named '{name}'")
'''
# latest_t is taken from received_data
self.data[name]['latest_t'] = max(int(d[time_field_name]) for d in received_data)
print(f"self.data[name]['latest_t'] = {self.data[name]['latest_t']}")
# store all items except the ones with latest timestamp
# there could be more non-delivered items with that timestamp,...
for d in received_data:
if True or int(d[time_field_name]) != self.data[name]['latest_t']:
print(f"storing {d}")
self.data[name]['data'].append(d)
else:
print(f"skipping storage of {d}")
# so we need to include that timestamp as startTime in next request
current_start_t = self.data[name]['latest_t']
'''
elif len(received_data) >= 0:
current_period *= 2
if current_period > endpoint['max_period']:
current_period = endpoint['max_period']
# latest_t is set to endTime of request
# is this problematic due to possible clock difference local vs. server (TODO)?
self.data[name]['latest_t'] = params['endTime']
# append data to storage
self.data[name]['data'] += received_data
# next request can used endTime + 1 as startTime
current_start_t = self.data[name]['latest_t'] + 1
#print(" new current_start_t: ", datetime.datetime.fromtimestamp(current_start_t/1000))
if current_start_t >= t_now:
finished = True
# instantiate history, load data from file, sync and dump back to same file
# cf = CoinFlex(config['rest_url'], config['rest_path'], config['api_key'], config['api_secret'])
# r = cf.request('/v3/wallet', {'limit': 200, 'startTime': 1678867715314, 'endTime': 1678960237226})
# print(r.json())
history = History()
history.loadFromFile(config['data_filename'])
history.sync_accountinfo()
history.sync_endpoints(config['endpoints_to_sync'].split(","))
history.dumpToFile(config['data_filename'])