-
Notifications
You must be signed in to change notification settings - Fork 0
/
ng_test.py
129 lines (96 loc) · 3 KB
/
ng_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import re
from pprint import pprint
from collections import defaultdict
from itertools import tee
from datetime import date
from tika import parser
import pandas as pd
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)
file_path = "/Users/davidciani/Downloads/pay-stubs/2021-02.pdf"
file_data = parser.from_file(file_path)
text = file_data["content"]
## Process Key-Values
paystub_info = {
"Name": None,
"My ID": None,
"Badge": None,
"Cost Center": None,
"SubArea": None,
"EE Grp": None,
"EE SGrp": None,
"Pay Date": None,
"Pay Period": None,
"Hours worked": None,
}
for a, b in pairwise(re.split("\s{2,}", text)):
if a[0:-1] in paystub_info.keys():
paystub_info[a[0:-1]] = b
# cleanup pay period
m = re.match(
"(\d{2})/(\d{2})/(\d{4})- (\d{2})/(\d{2})/(\d{4}) Period No: (\d{2})/(\d{4})",
paystub_info["Pay Period"],
)
del paystub_info["Pay Period"]
paystub_info["Pay Period Start"] = date(int(m[3]), int(m[1]), int(2))
paystub_info["Pay Period End"] = date(int(m[6]), int(m[4]), int(5))
paystub_info["Pay Period Year"] = int(m[8])
paystub_info["Pay Period Number"] = int(m[7])
pprint(paystub_info)
paystub_info_s = pd.Series(
paystub_info.values(), index=paystub_info.keys(), name="paystub_info"
)
print(paystub_info_s)
print()
## Process Tables
table_sections = [
"Earnings",
"Deductions",
"Taxes",
"Other Benefits & Information",
"Quota Information",
"Distribution of Net Payment",
]
current_table = None
tables = defaultdict(list)
for line in text.splitlines():
# Parse line and skip if blank
stripped_line = line.strip()
if stripped_line == "":
current_table = None
continue
split_line = re.split("\s{2,}", stripped_line)
# Detect section boundaries
if stripped_line in table_sections:
table_name = re.sub("\W+", "_", stripped_line.lower())
current_table = table_name
continue
# Add record to table
if current_table is not None:
tables[current_table].append(split_line)
# cleanup the taxes table
for ix, row in enumerate(tables["taxes"][1:]):
tables["taxes"][ix + 1] = [" ".join(row[0:2])] + row[2:]
# leanup the earnings table
tables["earnings"][0] = ["DESCRIPTION", "CURRENT", "YEAR-TO-DATE", "RETRO DATE"]
# cleanup the deductions table
tables["deductions"][0] = ["DESCRIPTION", "CURRENT", "REMARK", "YEAR-TO-DATE"]
for ix, row in enumerate(tables["deductions"][1:]):
if len(row) == 3:
tables["deductions"][ix + 1] = row[0:2] + [""] + row[2:]
# cleanup table record lengths
for key, table in tables.items():
record_length = len(table[0])
for ix, row in enumerate(table[1:]):
table[ix + 1] = row + [""] * (record_length - len(row))
tables_df = {}
for key, table in tables.items():
tables_df[key] = pd.DataFrame(table[1:], columns=table[0])
tables_df[key].name = key
pprint(tables)
for table in tables_df.values():
print(table)
print()