-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassemble_header.py
105 lines (90 loc) · 2.24 KB
/
assemble_header.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
header_cols = [
"identifier",
"carrier_code",
"vessel_country_code",
"vessel_name",
"port_of_unlading",
"estimated_arrival_date",
"foreign_port_of_lading_qualifier",
"foreign_port_of_lading",
"manifest_quantity",
"manifest_unit",
"weight",
"weight_unit",
"record_status_indicator",
"place_of_receipt",
"port_of_destination",
"foreign_port_of_destination_qualifier",
"foreign_port_of_destination",
"conveyance_id_qualifier",
"conveyance_id",
"mode_of_transportation",
"actual_arrival_date",
]
bill_cols = [
"identifier",
"master_bol_number",
"house_bol_number",
"sub_house_bol_number",
"voyage_number",
"bill_type_code",
"manifest_number",
"trade_update_date",
"run_date"
]
def create_spark_session():
spark = SparkSession \
.builder \
.getOrCreate()
return spark
def process_header_data(spark, input_dir, output):
"""
Process header data by joining the header and bill data
Parameters
----------
spark : Spark session
Current Spark session
input_dir : str
Input directory
output : str
Input directory
"""
header = spark.read \
.option("header", True) \
.option("escape", '"') \
.option("inferSchema", True) \
.csv(f"{input_dir}/ams/*/*/ams__header_*__*.csv") \
.select(*header_cols) \
.where(col('identifier').isNotNull())
bill = spark.read \
.option("header", True) \
.option("escape", '"') \
.option("inferSchema", True) \
.csv(f"{input_dir}/ams/*/*/ams__billgen_*__*.csv") \
.select(*bill_cols)
header_full = header.join(bill, ['identifier'], how='left')
header_full.repartition(1).write.mode('overwrite').format("csv") \
.option("header", True) \
.option("escape", '"') \
.save(f"{output}/header/")
def main(input_dir, output):
"""
Process header data by joining the header and bill data
Parameters
----------
input_dir : str
Input directory
output : str
Input directory
"""
spark = create_spark_session()
process_header_data(spark, input_dir, output)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', default='/input')
parser.add_argument('-o', '--output', default='/output')
args = parser.parse_args()
main(args.input, args.output)