-
Notifications
You must be signed in to change notification settings - Fork 32
/
spark-lambda-os.py
101 lines (88 loc) · 3.5 KB
/
spark-lambda-os.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from __future__ import print_function
import boto3
import os
import sys
import uuid
import zipfile
import socket
import time
import logging
from subprocess import call
boto3.set_stream_logger(name='boto3', level=logging.ERROR)
boto3.set_stream_logger(name='botocore', level=logging.ERROR)
s3_client = boto3.client('s3')
def list_all_files():
for f in os.listdir('/tmp'):
print('/tmp/' + f)
for f in os.listdir('/tmp/lambda'):
print('/tmp/lambda/' + f)
print("-----------------------")
def run_executor(spark_driver_hostname, spark_driver_port, spark_executor_cmdline, java_partial_cmdline, executor_partial_cmdline, java_extra_options):
#cmdline = spark_executor_cmdline
cmdline = java_partial_cmdline + java_extra_options + executor_partial_cmdline
cmdline_arr = cmdline.split(' ')
cmdline_arr = [x for x in cmdline_arr if x]
print("START: Spark executor: " + cmdline)
print(cmdline_arr)
call(cmdline_arr)
print("FINISH: Spark executor")
def handler(event, context):
print('START: ')
print(event)
print(context.function_name)
print(context.function_version)
print(context.invoked_function_arn)
print(context.memory_limit_in_mb)
print(context.aws_request_id)
print(context.log_group_name)
print(context.log_stream_name)
print(context.identity)
spark_driver_hostname = event['sparkDriverHostname']
spark_driver_port = event['sparkDriverPort']
spark_executor_cmdline = event['sparkCommandLine']
java_partial_cmdline = event['javaPartialCommandLine']
executor_partial_cmdline = event['executorPartialCommandLine']
java_extra_options = '-Dspark.lambda.awsRequestId=' + context.aws_request_id + ' ' + \
'-Dspark.lambda.logGroupName=' + context.log_group_name + ' ' + \
'-Dspark.lambda.logStreamName=' + context.log_stream_name + ' '
if os.path.isfile("/tmp/lambda/spark-installed"):
print("FAST PATH: Not downloading spark")
print("Cleaning up old temporary data /tmp/spark-application*")
call(['rm', '-rf', '/tmp/spark-application*'])
print('START: executor')
run_executor(spark_driver_hostname, spark_driver_port, spark_executor_cmdline,
java_partial_cmdline, executor_partial_cmdline, java_extra_options)
print('FINISH: executor')
return {
'output' : 'Fast path Handler done'
}
bucket = event['sparkS3Bucket']
key = event['sparkS3Key']
call(['rm', '-rf', '/tmp/*'])
call(['mkdir', '-p', '/tmp/lambda'])
download_path = '/tmp/lambda/spark-lambda.zip'
print('START: Downloading spark tarball')
print("Bucket - %s Key - %s" %(bucket, key))
s3_client.download_file(bucket, key, download_path)
list_all_files()
print('Extracting spark tarball')
zip_ref = zipfile.ZipFile(download_path, 'r')
zip_ref.extractall('/tmp/lambda/')
zip_ref.close()
list_all_files()
call(['df', '-h'])
call(['rm', download_path])
call(['rm', '-rf', '/tmp/lambda/spark/python/test_support/'])
call(['rm', '-rf', '/tmp/lambda/spark/R/'])
call(['rm', '-rf', '/tmp/lambda/spark/R/'])
call(['df', '-h'])
print('FINISH: Downloading spark tarball')
print('START: executor')
run_executor(spark_driver_hostname, spark_driver_port, spark_executor_cmdline,
java_partial_cmdline, executor_partial_cmdline, java_extra_options)
print('FINISH: executor')
open('/tmp/lambda/spark-installed', 'a').close()
print('FINISH')
return {
'output' : 'Handler done'
}