-
-
Notifications
You must be signed in to change notification settings - Fork 522
/
cryptocom.py
567 lines (537 loc) · 24.4 KB
/
cryptocom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
import csv
import functools
import logging
from collections import defaultdict
from pathlib import Path
from typing import Any
from rotkehlchen.accounting.structures.balance import Balance
from rotkehlchen.accounting.structures.base import HistoryEvent
from rotkehlchen.accounting.structures.types import HistoryEventSubType, HistoryEventType
from rotkehlchen.assets.converters import asset_from_cryptocom
from rotkehlchen.constants import ONE, ZERO
from rotkehlchen.constants.assets import A_USD
from rotkehlchen.constants.prices import ZERO_PRICE
from rotkehlchen.data_import.utils import BaseExchangeImporter, UnsupportedCSVEntry, hash_csv_row
from rotkehlchen.db.drivers.gevent import DBCursor
from rotkehlchen.errors.asset import UnknownAsset
from rotkehlchen.errors.misc import InputError
from rotkehlchen.errors.serialization import DeserializationError
from rotkehlchen.exchanges.data_structures import AssetMovement, Trade
from rotkehlchen.logging import RotkehlchenLogsAdapter
from rotkehlchen.serialization.deserialize import (
deserialize_asset_amount,
deserialize_asset_amount_force_positive,
deserialize_timestamp_from_date,
)
from rotkehlchen.types import (
AssetAmount,
AssetMovementCategory,
Fee,
Location,
Price,
Timestamp,
TradeType,
)
from rotkehlchen.utils.misc import ts_sec_to_ms
logger = logging.getLogger(__name__)
log = RotkehlchenLogsAdapter(logger)
CRYPTOCOM_PREFIX = 'CCM_'
class CryptocomImporter(BaseExchangeImporter):
def _consume_cryptocom_entry(
self,
write_cursor: DBCursor,
csv_row: dict[str, Any],
timestamp_format: str = '%Y-%m-%d %H:%M:%S',
) -> None:
"""Consumes a cryptocom entry row from the CSV and adds it into the database
Can raise:
- DeserializationError if something is wrong with the format of the expected values
- UnsupportedCryptocomEntry if importing of this entry is not supported.
- KeyError if the an expected CSV key is missing
- UnknownAsset if one of the assets founds in the entry are not supported
"""
row_type = csv_row['Transaction Kind']
timestamp = deserialize_timestamp_from_date(
date=csv_row['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
)
description = csv_row['Transaction Description']
notes = f'{description}\nSource: crypto.com (CSV import)'
# No fees info until (Nov 2020) on crypto.com
# fees are not displayed in the export data
fee = Fee(ZERO)
fee_currency = A_USD # whatever (used only if there is no fee)
if row_type in (
'crypto_purchase',
'crypto_exchange',
'viban_purchase',
'crypto_viban_exchange',
'recurring_buy_order',
'card_top_up',
):
# variable mapping to raw data
currency = csv_row['Currency']
to_currency = csv_row['To Currency']
native_currency = csv_row['Native Currency']
amount = csv_row['Amount']
to_amount = csv_row['To Amount']
native_amount = csv_row['Native Amount']
trade_type = TradeType.BUY if to_currency != native_currency else TradeType.SELL
if row_type in (
'crypto_exchange',
'crypto_viban_exchange',
'recurring_buy_order',
'viban_purchase',
):
# trades (fiat, crypto) to (crypto, fiat)
base_asset = asset_from_cryptocom(to_currency)
quote_asset = asset_from_cryptocom(currency)
if quote_asset is None:
raise DeserializationError('Got a trade entry with an empty quote asset')
base_amount_bought = deserialize_asset_amount(to_amount)
quote_amount_sold = deserialize_asset_amount(amount)
elif row_type == 'card_top_up':
quote_asset = asset_from_cryptocom(currency)
base_asset = asset_from_cryptocom(native_currency)
base_amount_bought = deserialize_asset_amount_force_positive(native_amount)
quote_amount_sold = deserialize_asset_amount_force_positive(amount)
else:
base_asset = asset_from_cryptocom(currency)
quote_asset = asset_from_cryptocom(native_currency)
base_amount_bought = deserialize_asset_amount(amount)
quote_amount_sold = deserialize_asset_amount(native_amount)
rate = Price(abs(quote_amount_sold / base_amount_bought))
trade = Trade(
timestamp=timestamp,
location=Location.CRYPTOCOM,
base_asset=base_asset,
quote_asset=quote_asset,
trade_type=trade_type,
amount=base_amount_bought,
rate=rate,
fee=fee,
fee_currency=fee_currency,
link='',
notes=notes,
)
self.add_trade(write_cursor, trade)
elif row_type in (
'crypto_withdrawal',
'crypto_deposit',
'viban_deposit',
'viban_card_top_up',
):
if row_type in ('crypto_withdrawal', 'viban_deposit', 'viban_card_top_up'):
category = AssetMovementCategory.WITHDRAWAL
amount = deserialize_asset_amount_force_positive(csv_row['Amount'])
else:
category = AssetMovementCategory.DEPOSIT
amount = deserialize_asset_amount(csv_row['Amount'])
asset = asset_from_cryptocom(csv_row['Currency'])
asset_movement = AssetMovement(
location=Location.CRYPTOCOM,
category=category,
address=None,
transaction_id=None,
timestamp=timestamp,
asset=asset,
amount=amount,
fee=fee,
fee_asset=asset,
link='',
)
self.add_asset_movement(write_cursor, asset_movement)
elif row_type in (
'airdrop_to_exchange_transfer',
'mco_stake_reward',
'crypto_payment_refund',
'pay_checkout_reward'
'transfer_cashback',
'rewards_platform_deposit_credited',
'pay_checkout_reward',
'transfer_cashback',
'supercharger_reward_to_app_credited',
'referral_card_cashback',
'referral_gift',
'referral_bonus',
'crypto_earn_interest_paid',
'reimbursement',
):
asset = asset_from_cryptocom(csv_row['Currency'])
amount = deserialize_asset_amount(csv_row['Amount'])
event = HistoryEvent(
event_identifier=f'{CRYPTOCOM_PREFIX}{hash_csv_row(csv_row)}',
sequence_index=0,
timestamp=ts_sec_to_ms(timestamp),
location=Location.CRYPTOCOM,
event_type=HistoryEventType.RECEIVE,
event_subtype=HistoryEventSubType.NONE,
balance=Balance(amount=amount),
asset=asset,
notes=notes,
)
self.add_history_events(write_cursor, [event])
elif row_type in ('crypto_payment', 'reimbursement_reverted', 'card_cashback_reverted'):
asset = asset_from_cryptocom(csv_row['Currency'])
amount = abs(deserialize_asset_amount(csv_row['Amount']))
event = HistoryEvent(
event_identifier=f'{CRYPTOCOM_PREFIX}{hash_csv_row(csv_row)}',
sequence_index=0,
timestamp=ts_sec_to_ms(timestamp),
location=Location.CRYPTOCOM,
event_type=HistoryEventType.SPEND,
event_subtype=HistoryEventSubType.NONE,
balance=Balance(amount=amount),
asset=asset,
notes=notes,
)
self.add_history_events(write_cursor, [event])
elif row_type == 'invest_deposit':
asset = asset_from_cryptocom(csv_row['Currency'])
amount = deserialize_asset_amount(csv_row['Amount'])
asset_movement = AssetMovement(
location=Location.CRYPTOCOM,
category=AssetMovementCategory.DEPOSIT,
address=None,
transaction_id=None,
timestamp=timestamp,
asset=asset,
amount=amount,
fee=fee,
fee_asset=fee_currency,
link='',
)
self.add_asset_movement(write_cursor, asset_movement)
elif row_type == 'invest_withdrawal':
asset = asset_from_cryptocom(csv_row['Currency'])
amount = deserialize_asset_amount(csv_row['Amount'])
asset_movement = AssetMovement(
location=Location.CRYPTOCOM,
category=AssetMovementCategory.WITHDRAWAL,
address=None,
transaction_id=None,
timestamp=timestamp,
asset=asset,
amount=amount,
fee=fee,
fee_asset=fee_currency,
link='',
)
self.add_asset_movement(write_cursor, asset_movement)
elif row_type == 'crypto_transfer':
asset = asset_from_cryptocom(csv_row['Currency'])
amount = deserialize_asset_amount(csv_row['Amount'])
if amount < 0:
event_type = HistoryEventType.SPEND
amount = abs(amount)
else:
event_type = HistoryEventType.RECEIVE
event = HistoryEvent(
event_identifier=f'{CRYPTOCOM_PREFIX}{hash_csv_row(csv_row)}',
sequence_index=0,
timestamp=ts_sec_to_ms(timestamp),
location=Location.CRYPTOCOM,
event_type=event_type,
event_subtype=HistoryEventSubType.NONE,
balance=Balance(amount=amount),
asset=asset,
notes=notes,
)
self.add_history_events(write_cursor, [event])
elif row_type in (
'crypto_earn_program_created',
'crypto_earn_program_withdrawn',
'lockup_lock',
'lockup_unlock',
'dynamic_coin_swap_bonus_exchange_deposit',
'crypto_wallet_swap_debited',
'crypto_wallet_swap_credited',
'lockup_swap_debited',
'lockup_swap_credited',
'lockup_swap_rebate',
'dynamic_coin_swap_bonus_exchange_deposit',
# we don't handle cryto.com exchange yet
'crypto_to_exchange_transfer',
'exchange_to_crypto_transfer',
# supercharger actions
'supercharger_deposit',
'supercharger_withdrawal',
# already handled using _import_cryptocom_associated_entries
'dynamic_coin_swap_debited',
'dynamic_coin_swap_credited',
'dust_conversion_debited',
'dust_conversion_credited',
'interest_swap_credited',
'interest_swap_debited',
# The user has received an aidrop but can't claim it yet
'airdrop_locked',
):
# those types are ignored because it doesn't affect the wallet balance
# or are not handled here
return
else:
raise UnsupportedCSVEntry(
f'Unknown entrype type "{row_type}" encountered during '
f'cryptocom data import. Ignoring entry',
)
def _import_cryptocom_associated_entries(
self,
write_cursor: DBCursor,
data: Any,
tx_kind: str,
timestamp_format: str = '%Y-%m-%d %H:%M:%S',
) -> None:
"""Look for events that have associated entries and handle them as trades.
This method looks for `*_debited` and `*_credited` entries using the
same timestamp to handle them as one trade.
Known kind: 'dynamic_coin_swap' or 'dust_conversion'
May raise:
- UnknownAsset if an unknown asset is encountered in the imported files
- KeyError if a row contains unexpected data entries
"""
multiple_rows: dict[Any, dict[str, Any]] = {}
investments_deposits: dict[str, list[Any]] = defaultdict(list)
investments_withdrawals: dict[str, list[Any]] = defaultdict(list)
debited_row = None
credited_row = None
expects_debited = False
credited_timestamp = None
for row in data:
log.debug(f'Processing cryptocom row at {row["Timestamp (UTC)"]} and type {tx_kind}')
# If we don't have the corresponding debited entry ignore them
# and warn the user
if (
expects_debited is True and
row['Transaction Kind'] != f'{tx_kind}_debited'
):
self.db.msg_aggregator.add_warning(
f'Error during cryptocom CSV import consumption. Found {tx_kind}_credited '
f'but no amount debited afterwards at date {row["Timestamp (UTC)"]}',
)
# Pop the last credited event as it's invalid. We always assume to be at least
# one debited event and one credited event. If we don't find the debited event
# we have to remove the credit at the right timestamp or our logic will break.
# We notify the user about this issue so (s)he can take actions.
multiple_rows.pop(credited_timestamp, None)
# reset expects_debited value
expects_debited = False
if row['Transaction Kind'] == f'{tx_kind}_debited':
timestamp = deserialize_timestamp_from_date(
date=row['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
)
if expects_debited is False and timestamp != credited_timestamp:
self.db.msg_aggregator.add_warning(
f'Error during cryptocom CSV import consumption. Found {tx_kind}_debited'
f' but no amount credited before at date {row["Timestamp (UTC)"]}',
)
continue
if timestamp not in multiple_rows:
multiple_rows[timestamp] = {}
if 'debited' not in multiple_rows[timestamp]:
multiple_rows[timestamp]['debited'] = []
multiple_rows[timestamp]['debited'].append(row)
expects_debited = False
elif row['Transaction Kind'] == f'{tx_kind}_credited':
# The only is one credited row
timestamp = deserialize_timestamp_from_date(
date=row['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
)
if timestamp not in multiple_rows:
multiple_rows[timestamp] = {}
expects_debited = True
credited_timestamp = timestamp
multiple_rows[timestamp]['credited'] = row
elif row['Transaction Kind'] == f'{tx_kind}_deposit':
asset = row['Currency']
investments_deposits[asset].append(row)
elif row['Transaction Kind'] == f'{tx_kind}_withdrawal':
asset = row['Currency']
investments_withdrawals[asset].append(row)
for timestamp, m_row in multiple_rows.items():
# When we convert multiple assets dust to CRO
# in one time, it will create multiple debited rows with
# the same timestamp
try:
debited_rows = m_row['debited']
credited_row = m_row['credited']
except KeyError as e:
self.db.msg_aggregator.add_warning(
f'Failed to get {e!s} event at timestamp {timestamp}.',
)
continue
total_debited_usd = functools.reduce(
lambda acc, row:
acc +
deserialize_asset_amount(row['Native Amount (in USD)']),
debited_rows,
ZERO,
)
# If the value of the transaction is too small (< 0,01$),
# crypto.com will display 0 as native amount
# if we have multiple debited rows, we can't import them
# since we can't compute their dedicated rates, so we skip them
if len(debited_rows) > 1 and total_debited_usd == 0:
return
if credited_row is not None and len(debited_rows) != 0:
for debited_row in debited_rows:
description = credited_row['Transaction Description']
notes = f'{description}\nSource: crypto.com (CSV import)'
# No fees here
fee = Fee(ZERO)
fee_currency = A_USD
base_asset = asset_from_cryptocom(credited_row['Currency'])
quote_asset = asset_from_cryptocom(debited_row['Currency'])
part_of_total = (
ONE
if len(debited_rows) == 1
else deserialize_asset_amount(
debited_row['Native Amount (in USD)'],
) / total_debited_usd
)
quote_amount_sold = deserialize_asset_amount(
debited_row['Amount'],
) * part_of_total
base_amount_bought = deserialize_asset_amount(
credited_row['Amount'],
) * part_of_total
if base_amount_bought != ZERO:
rate = Price(abs(quote_amount_sold / base_amount_bought))
else:
rate = ZERO_PRICE
trade = Trade(
timestamp=timestamp,
location=Location.CRYPTOCOM,
base_asset=base_asset,
quote_asset=quote_asset,
trade_type=TradeType.BUY,
amount=AssetAmount(base_amount_bought),
rate=rate,
fee=fee,
fee_currency=fee_currency,
link='',
notes=notes,
)
self.add_trade(write_cursor, trade)
# Compute investments profit
if len(investments_withdrawals) != 0:
for asset in investments_withdrawals:
asset_object = asset_from_cryptocom(asset)
if asset not in investments_deposits:
log.error(
f'Investment withdrawal without deposit at crypto.com. Ignoring '
f'staking info for asset {asset_object}',
)
continue
# Sort by date in ascending order
withdrawals_rows = sorted(
investments_withdrawals[asset],
key=lambda x: deserialize_timestamp_from_date(
date=x['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
),
)
investments_rows = sorted(
investments_deposits[asset],
key=lambda x: deserialize_timestamp_from_date(
date=x['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
),
)
last_date = Timestamp(0)
for withdrawal in withdrawals_rows:
withdrawal_date = deserialize_timestamp_from_date(
date=withdrawal['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
)
amount_deposited = ZERO
for deposit in investments_rows:
deposit_date = deserialize_timestamp_from_date(
date=deposit['Timestamp (UTC)'],
formatstr=timestamp_format,
location='cryptocom',
)
if last_date < deposit_date <= withdrawal_date:
# Amount is negative
amount_deposited += deserialize_asset_amount(deposit['Amount'])
amount_withdrawal = deserialize_asset_amount(withdrawal['Amount'])
# Compute profit
profit = amount_withdrawal + amount_deposited
if profit >= ZERO:
last_date = withdrawal_date
event = HistoryEvent(
event_identifier=f'{CRYPTOCOM_PREFIX}{hash_csv_row(withdrawal)}',
sequence_index=0,
timestamp=ts_sec_to_ms(withdrawal_date),
location=Location.CRYPTOCOM,
event_type=HistoryEventType.RECEIVE,
event_subtype=HistoryEventSubType.NONE,
balance=Balance(amount=profit),
asset=asset_object,
notes=f'Staking profit for {asset}',
)
self.add_history_events(write_cursor, [event])
def _import_csv(self, write_cursor: DBCursor, filepath: Path, **kwargs: Any) -> None:
"""May raise:
- InputError if one of the rows is malformed
"""
with open(filepath, encoding='utf-8-sig') as csvfile:
data = csv.DictReader(csvfile)
try:
# Notice: Crypto.com csv export gathers all swapping entries (`lockup_swap_*`,
# `crypto_wallet_swap_*`, ...) into one entry named `dynamic_coin_swap_*`.
self._import_cryptocom_associated_entries(
write_cursor=write_cursor,
data=data,
tx_kind='dynamic_coin_swap',
**kwargs,
)
# reset the iterator
csvfile.seek(0)
# pass the header since seek(0) make the first row to be the header
next(data)
self._import_cryptocom_associated_entries(
write_cursor=write_cursor,
data=data,
tx_kind='dust_conversion',
**kwargs,
)
csvfile.seek(0)
next(data)
self._import_cryptocom_associated_entries(write_cursor, data, 'interest_swap', **kwargs) # noqa: E501
csvfile.seek(0)
next(data)
self._import_cryptocom_associated_entries(write_cursor, data, 'invest', **kwargs)
csvfile.seek(0)
next(data)
except KeyError as e:
raise InputError(f'Crypto.com csv missing entry for {e!s}') from e
except UnknownAsset as e:
raise InputError(f'Encountered unknown asset {e!s} at crypto.com csv import') from e # noqa: E501
for row in data:
try:
self._consume_cryptocom_entry(write_cursor, row, **kwargs)
except UnknownAsset as e:
self.db.msg_aggregator.add_warning(
f'During cryptocom CSV import found action with unknown '
f'asset {e.identifier}. Ignoring entry',
)
continue
except DeserializationError as e:
self.db.msg_aggregator.add_warning(
f'Error during cryptocom CSV import deserialization. '
f'Error was {e!s}. Ignoring entry',
)
continue
except UnsupportedCSVEntry as e:
self.db.msg_aggregator.add_warning(str(e))
continue
except KeyError as e:
raise InputError(f'Could not find key {e!s} in csv row {row!s}') from e