-
Notifications
You must be signed in to change notification settings - Fork 3
/
zenpal.py
executable file
·136 lines (113 loc) · 4.99 KB
/
zenpal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
import csv
import datetime
import sys
import forex_python.converter
from argparse import ArgumentParser
ACCOUNT_CURR = 'USD'
CONVERSION = 'General Currency Conversion'
DT, AMT = 1, 0
Currency = 'Currency'
Amount = 'Amount'
Date = 'Date'
Time = 'Time'
Type = 'Type'
Name = 'Name'
def to_unix_time(data_row):
return int(datetime.datetime.strptime(data_row[Date] + ' ' + data_row[Time], "%d/%m/%Y %H:%M:%S").timestamp())
def to_iso_date(s):
return str(datetime.datetime.strptime(s, "%d/%m/%Y"))
def convert_cb(amount, curr, dt):
converter = forex_python.converter.CurrencyRates()
dt = datetime.datetime.utcfromtimestamp(dt)
rate = converter.get_rate(curr, ACCOUNT_CURR, dt)
return "{0:.2f}".format(rate * float(amount))
def load(filename):
def pre_process(data):
for data_row in data:
yield data_row.replace('\ufeff', '')
with open(filename) as csv_file:
csv_reader = csv.reader(pre_process(csv_file), delimiter=',')
line_count = 0
header = {}
out_lines = []
conv = {}
pending_trans = []
pending_conv = {}
for row in csv_reader:
if not header:
header = row
continue
row = dict(zip(header, row))
row[Amount] = row[Amount].replace(",", "")
if row[Type] == CONVERSION:
dkey = to_unix_time(row)
pending_conv[dkey] = pending_conv.get(dkey, {})
pending_conv[dkey][row[Currency]] = row[Amount]
if len(pending_conv[dkey]) == 2:
if ACCOUNT_CURR not in pending_conv[dkey]:
print("WARN: unparseable conversion {}".format(pending_conv[dkey]), file=sys.stderr)
else:
curr = [x for x in pending_conv[dkey].keys() if x != ACCOUNT_CURR][0]
usd_amount = pending_conv[dkey][ACCOUNT_CURR]
rub_amount = pending_conv[dkey][curr].replace("-", "")
conv[curr] = conv.get(curr, {})
t = conv[curr].get(rub_amount, [])
t.append([usd_amount, dkey])
conv[curr][rub_amount] = t
else:
if row[Currency] == ACCOUNT_CURR:
out_lines.append([to_iso_date(row[Date]), row[Name], row[Type], row[Amount]])
else:
pending_trans.append(row)
line_count += 1
for row in pending_trans:
abs_amount = row[Amount].replace("-", "")
usd_amount = None
if abs_amount in conv.get(row[Currency], {}):
conv_lists = conv[row[Currency]][abs_amount]
tr_time = to_unix_time(row)
if len(conv_lists) == 1:
conv_list = conv_lists[0]
else:
conv_list = conv_lists[0]
curr_diff = abs(conv_list[DT] - tr_time)
for x in conv_lists:
if abs(x[DT] - tr_time) < curr_diff:
curr_diff = abs(x[DT] - tr_time)
conv_list = x
if abs(conv_list[DT] - tr_time) <= 60 * 60 * 24:
usd_amount = conv_list[AMT]
if not usd_amount:
print("WARN: fetching {} {} from forex".format(row[Amount], row[Currency]), file=sys.stderr)
usd_amount = convert_cb(row[Amount], row[Currency], to_unix_time(row))
out_lines.append([to_iso_date(row[Date]), row[Name], row[Type] + " (" + row[Amount] + " " + row[Currency] + ")",
usd_amount])
return out_lines
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('-f', '--file', required=True, help='Path or name of the file you wish to edit')
parser.add_argument('-o', '--output_file', required=False, help='Path or name of the desired output file. Optional')
parser.add_argument('-a', '--append', type=bool, required=False,
help="'True' will add to existing file\n 'False' will create a new output file")
args = parser.parse_args()
infile = args.file
outfile = args.output_file
output_writer = csv.writer(sys.stdout, delimiter=';', quoting=csv.QUOTE_MINIMAL)
try:
if args.output_file:
if args.append:
with open(outfile, 'at', newline='') as f:
file_writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_MINIMAL)
for line in load(infile):
file_writer.writerow(line)
else:
with open(outfile, 'wt', newline='') as f:
file_writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_MINIMAL)
for line in load(infile):
file_writer.writerow(line)
else:
for line in load(infile):
output_writer.writerow(line)
except FileNotFoundError:
parser.error("{} file or path does not exist please try again.".format(args.file))