forked from Sparcc/sap-collibra-transformation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransform.py
202 lines (176 loc) · 10.3 KB
/
transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from openpyxl import load_workbook
import sys, os
class SapDataParser:
#TODO calculate this
upperRange = 0
currentInfoArea = ''
currentTable = ''
outputRowNum = 2
sourceFileName = ''
outputFileName = 'output.xlsx'
hasInfoArea = False
hasTable = False
domain = 'SAFYR SAP Test'
community = 'Technical Metadata Community'
domainType = 'Physical Data Dictionary'
def __init__(self,input, output):
print('Loading Excel Files')
self.sourceFileName = input
self.wb = load_workbook(filename = input)
self.ws = self.wb.active
self.resetOutputFile(output)
self.outputFileName = output
self.output = load_workbook(output)
self.sOutput = self.output.active
self.buildFieldMap()
self.buildHeaders()
def resetOutputFile(self,fileName = 'output.xlsx'):
pathName = '.\\emptyOutput\\' + fileName
destination = '.\\'
os.system('del {d}\{fn}'.format(d=destination,fn=fileName))
os.system('copy {pn} {d}"'.format(pn=pathName,d=destination))
def buildHeaders(self):
for k,v in self.fieldTemp.items():
self.sOutput[v+'1'] = k
def setDataLength(self, upperRange = 49646):
#TODO: Calculate upperRange
self.upperRange = upperRange + 1
def buildFieldMap(self):
print('Building Map')
#src fields
self.fieldSrc={}
self.fieldSrc['Parent'] = 'A'
self.fieldSrc['Child'] = 'B'
self.fieldSrc['DD_TABLENAME'] = 'C'
self.fieldSrc['LONG_DESC'] = 'D'
self.fieldSrc['DD_TABLETYPE'] = 'E'
self.fieldSrc['DD_FIELDNAME'] = 'F'
self.fieldSrc['SHORT_DESC'] = 'G'
self.fieldSrc['POSIT'] = 'H'
self.fieldSrc['MANDATORY'] = 'I'
self.fieldSrc['DD_DATATYPE_ERP'] = 'J'
self.fieldSrc['DATA_LENGTH'] = 'K'
self.fieldSrc['DATA_DECIMALS'] = 'L'
self.fieldSrc['KEY_FLAG'] = 'M'
tb = load_workbook(filename = 'template.xlsx')
ts = tb.active
self.fieldTemp={}
for col in ts.iter_cols():
self.fieldTemp[col[0].value] = col[0].column
def start(self, startingRow = 2, limit = 0):
print('Starting transformation')
rowNum = startingRow
temp = limit
if temp >0:
self.upperRange = limit
while rowNum < self.upperRange:
self.processRow(rowNum)
rowNum+=1
print('Complete! Saving File...')
self.output.save(self.outputFileName)
print('Done')
def processRow(self,rowNum):
print('Current Row is: '+str(rowNum))
#info area check
data = self.ws[self.fieldSrc['Child']+str(rowNum)].value
if (data is None): #initial state or no info area
self.hasInfoArea = False
else:
self.hasInfoArea = True
if (data != self.currentInfoArea) and (self.hasInfoArea == True): #not equal to current info area, change detected
print('Creating new info area {info} with parent {par}'.format(info=data,par=self.ws[self.fieldSrc['Parent']+str(rowNum)].value))
self.currentInfoArea = data
self.createNewInfoArea(rowNum)
self.createNewInfoArea(rowNum,isChild=True)
self.hasInfoArea = True
#table check
data = self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value
if (data is None): #initial state or no table
print('No more of this table exists')
self.hasTable = False
else:
self.hasTable = True
if (data !=self.currentTable) and (self.hasTable == True): #mismatch detected
self.hasTable = True
self.currentTable = data
self.createNewTable(rowNum)
#build columns in table
data = self.ws[self.fieldSrc['DD_FIELDNAME']+str(rowNum)].value
if self.hasTable == True:
if data is not None: #columns must be put in a table
print('Creating column {col} under table: {tab}'.format(col='data',tab=self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value))
self.createNewColumn(rowNum)
def createNewInfoArea(self,rowNum, isChild = False):
self.sOutput[self.fieldTemp['Status']+str(self.outputRowNum)] = 'Candidate'
self.sOutput[self.fieldTemp['Type']+str(self.outputRowNum)] = 'Info Area'
self.sOutput[self.fieldTemp['Domain']+str(self.outputRowNum)] = self.domain
self.sOutput[self.fieldTemp['Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['Domain Type']+str(self.outputRowNum)] = self.domainType
if isChild:
self.sOutput[self.fieldTemp['Name']+str(self.outputRowNum)] = self.ws[self.fieldSrc['Child']+str(rowNum)].value
#relation to parent
self.sOutput[self.fieldTemp['is a child of [Info Area] > Info Area']+str(self.outputRowNum)] = self.ws[self.fieldSrc['Parent']+str(rowNum)].value
self.sOutput[self.fieldTemp['is a child of [Info Area] > Type']+str(self.outputRowNum)] = 'Info Area'
self.sOutput[self.fieldTemp['is a child of [Info Area] > Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['is a child of [Info Area] > Domain Type']+str(self.outputRowNum)] = self.domainType
self.sOutput[self.fieldTemp['is a child of [Info Area] > Domain']+str(self.outputRowNum)] = self.domain
else:
self.sOutput[self.fieldTemp['Name']+str(self.outputRowNum)] = self.ws[self.fieldSrc['Parent']+str(rowNum)].value
self.outputRowNum +=1
def createNewTable(self,rowNum):
self.sOutput[self.fieldTemp['Status']+str(self.outputRowNum)] = 'Candidate'
self.sOutput[self.fieldTemp['Type']+str(self.outputRowNum)] = 'Table'
self.sOutput[self.fieldTemp['Domain']+str(self.outputRowNum)] = self.domain
self.sOutput[self.fieldTemp['Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['Domain Type']+str(self.outputRowNum)] = self.domainType
self.sOutput[self.fieldTemp['Table Type']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_TABLETYPE']+str(rowNum)].value
self.sOutput[self.fieldTemp['Description']+str(self.outputRowNum)] = self.ws[self.fieldSrc['LONG_DESC']+str(rowNum)].value
#relation (sometimes has no info area)
if self.hasInfoArea and self.currentInfoArea !='' and self.currentInfoArea is not None:
self.sOutput[self.fieldTemp['Name']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value
self.sOutput[self.fieldTemp['is captured in [Info Area] > Info Area']+str(self.outputRowNum)] = self.ws[self.fieldSrc['Child']+str(rowNum)].value
self.sOutput[self.fieldTemp['is captured in [Info Area] > Type']+str(self.outputRowNum)] = 'Info Area'
self.sOutput[self.fieldTemp['is captured in [Info Area] > Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['is captured in [Info Area] > Domain Type']+str(self.outputRowNum)] = self.domainType
self.sOutput[self.fieldTemp['is captured in [Info Area] > Domain']+str(self.outputRowNum)] = self.domain
else:
self.sOutput[self.fieldTemp['Name']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value
self.outputRowNum +=1
def createNewColumn(self,rowNum):
self.sOutput[self.fieldTemp['Name']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value + '::' + self.ws[self.fieldSrc['DD_FIELDNAME']+str(rowNum)].value
self.sOutput[self.fieldTemp['Status']+str(self.outputRowNum)] = 'Candidate'
self.sOutput[self.fieldTemp['Type']+str(self.outputRowNum)] = 'Column'
self.sOutput[self.fieldTemp['Domain']+str(self.outputRowNum)] = self.domain
self.sOutput[self.fieldTemp['Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['Domain Type']+str(self.outputRowNum)] = self.domainType
#attributes
self.sOutput[self.fieldTemp['Is Nullable']+str(self.outputRowNum)] = self.convertToCommonTerm(self.ws[self.fieldSrc['MANDATORY']+str(rowNum)].value)
self.sOutput[self.fieldTemp['Description']+str(self.outputRowNum)] = self.ws[self.fieldSrc['SHORT_DESC']+str(rowNum)].value
self.sOutput[self.fieldTemp['Is Primary Key']+str(self.outputRowNum)] = self.convertToCommonTerm(self.ws[self.fieldSrc['KEY_FLAG']+str(rowNum)].value)
self.sOutput[self.fieldTemp['Number of Fractional Digits']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DATA_DECIMALS']+str(rowNum)].value
self.sOutput[self.fieldTemp['Size']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DATA_LENGTH']+str(rowNum)].value
self.sOutput[self.fieldTemp['Column Position']+str(self.outputRowNum)] = self.ws[self.fieldSrc['POSIT']+str(rowNum)].value
self.sOutput[self.fieldTemp['Technical Data Type']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_DATATYPE_ERP']+str(rowNum)].value
#relation
self.sOutput[self.fieldTemp['is part of [Table] > Table']+str(self.outputRowNum)] = self.ws[self.fieldSrc['DD_TABLENAME']+str(rowNum)].value
self.sOutput[self.fieldTemp['is part of [Table] > Type']+str(self.outputRowNum)] = 'Table'
self.sOutput[self.fieldTemp['is part of [Table] > Community']+str(self.outputRowNum)] = self.community
self.sOutput[self.fieldTemp['is part of [Table] > Domain Type']+str(self.outputRowNum)] = self.domainType
self.sOutput[self.fieldTemp['is part of [Table] > Domain']+str(self.outputRowNum)] = self.domain
self.outputRowNum +=1
def convertToCommonTerm(self,v):
v = str(v)
returnValue = v
for c in ('yes', 'true', 't', 'y'):
if v.lower() == c:
returnValue = 'True'
for c in ('no', 'false', 'f', 'n'):
if v.lower() == c:
returnValue = 'False'
for c in ("none",'none'): # there is a difference between ' and " !
if v.lower() == c:
returnValue = 'False'
for c in ("",''):
if v.lower() == c:
returnValue = 'False'
return returnValue