Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

fix python code so it runs in python3 - print command, stringio #17

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions create-biglambda-role.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@

try:
response = client.create_role(RoleName=rn,AssumeRolePolicyDocument=json.dumps(trust_role))
print response['Role']['Arn']
print "Success: done creating role"
print(response['Role']['Arn'])
print("Success: done creating role")
except botocore.exceptions.ClientError as e:
print "Error: {0}".format(e)
print("Error: {0}".format(e))

try:
with open('policy.json') as json_data:
response = client.put_role_policy(RoleName=rn,PolicyName=rp,
PolicyDocument=json.dumps(json.load(json_data))
)
print "Success: done adding inline policy to role"
print("Success: done adding inline policy to role")
except botocore.exceptions.ClientError as e:
print "Error: {0}".format(e)
print("Error: {0}".format(e))


8 changes: 4 additions & 4 deletions delete-biglambda-role.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

try:
response = client.delete_role_policy(RoleName=rn,PolicyName=rp)
print "Success: done deleting role policy"
print("Success: done deleting role policy")
except botocore.exceptions.ClientError as e:
print "Error: {0}".format(e)
print("Error: {0}".format(e))

try:
response = client.delete_role(RoleName=rn)
print "Success: done deleting role"
print("Success: done deleting role")
except botocore.exceptions.ClientError as e:
print "Error: {0}".format(e)
print("Error: {0}".format(e))
30 changes: 15 additions & 15 deletions src/python/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import math
import random
import re
import StringIO
from io import StringIO
import sys
import time

Expand Down Expand Up @@ -172,7 +172,7 @@ def invoke_lambda(batches, m_id):
#batch = [k['Key'] for k in batches[m_id-1]]
batch = [k.key for k in batches[m_id-1]]
xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch))
#print "invoking", m_id, len(batch)
#print("invoking", m_id, len(batch))
resp = lambda_client.invoke(
FunctionName = mapper_lambda_name,
InvocationType = 'RequestResponse',
Expand All @@ -186,10 +186,10 @@ def invoke_lambda(batches, m_id):
)
out = eval(resp['Payload'].read())
mapper_outputs.append(out)
print "mapper output", out
print("mapper output", out)
xray_recorder.end_segment()
# Exec Parallel
print "# of Mappers ", n_mappers
print("# of Mappers ", n_mappers)
pool = ThreadPool(n_mappers)
Ids = [i+1 for i in range(n_mappers)]
invoke_lambda_partial = partial(invoke_lambda, batches)
Expand All @@ -205,7 +205,7 @@ def invoke_lambda(batches, m_id):
pool.close()
pool.join()

print "all the mappers finished"
print("all the mappers finished")
xray_recorder.end_subsegment() #Invoke mappers

# Delete Mapper function
Expand Down Expand Up @@ -241,11 +241,11 @@ def invoke_lambda(batches, m_id):
keys = [jk["Key"] for jk in job_keys]
total_s3_size = sum([jk["Size"] for jk in job_keys])

print "check to see if the job is done"
print("check to see if the job is done")

# check job done
if job_id + "/result" in keys:
print "job done"
print("job done")
reducer_lambda_time += float(s3.Object(job_bucket, job_id + "/result").metadata['processingtime'])
for key in keys:
if "task/reducer" in key:
Expand All @@ -268,14 +268,14 @@ def invoke_lambda(batches, m_id):
lambda_cost = total_lambda_secs * 0.00001667 * lambda_memory/ 1024.0
s3_cost = (s3_get_cost + s3_put_cost + s3_storage_hour_cost)

# Print costs
print "Reducer L", reducer_lambda_time * 0.00001667 * lambda_memory/ 1024.0
print "Lambda Cost", lambda_cost
print "S3 Storage Cost", s3_storage_hour_cost
print "S3 Request Cost", s3_get_cost + s3_put_cost
print "S3 Cost", s3_cost
print "Total Cost: ", lambda_cost + s3_cost
print "Total Lines:", total_lines
# printcosts
print("Reducer L", reducer_lambda_time * 0.00001667 * lambda_memory/ 1024.0)
print("Lambda Cost", lambda_cost)
print("S3 Storage Cost", s3_storage_hour_cost)
print("S3 Request Cost", s3_get_cost + s3_put_cost)
print("S3 Cost", s3_cost)
print("Total Cost: ", lambda_cost + s3_cost)
print("Total Lines:", total_lines)
xray_recorder.end_subsegment() #Calculate cost

# Delete Reducer function
Expand Down
8 changes: 4 additions & 4 deletions src/python/lambdautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def create_lambda_function(self):
TracingConfig={'Mode':'PassThrough'}
)
self.function_arn = response['FunctionArn']
print response
print(response)

def update_function(self):
'''
Expand All @@ -53,7 +53,7 @@ def update_function(self):
# parse arn and remove the release number (:n)
arn = ":".join(updated_arn.split(':')[:-1])
self.function_arn = arn
print response
print(response)

def update_code_or_create_on_noexist(self):
'''
Expand All @@ -73,7 +73,7 @@ def add_lambda_permission(self, sId, bucket):
StatementId = '%s' % sId,
SourceArn = 'arn:aws:s3:::' + bucket
)
print resp
print(resp)

def create_s3_eventsource_notification(self, bucket, prefix=None):
if not prefix:
Expand Down Expand Up @@ -126,7 +126,7 @@ def compute_batch_size(keys, lambda_memory, concurrent_lambdas):
else:
size += key.size
avg_object_size = size/len(keys)
print "Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size)
print("Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size))
if avg_object_size < max_mem_for_data and len(keys) < concurrent_lambdas:
b_size = 1
else:
Expand Down
6 changes: 3 additions & 3 deletions src/python/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
import random
import resource
import StringIO
from io import StringIO
import time

# create an S3 session
Expand Down Expand Up @@ -53,7 +53,7 @@ def lambda_handler(event, context):
output[srcIp] = 0
output[srcIp] += float(data[3])
except Exception, e:
print e
print(e)
#err += '%s' % e

time_in_secs = (time.time() - start_time)
Expand All @@ -68,7 +68,7 @@ def lambda_handler(event, context):
"memoryUsage": '%s' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
}

print "metadata", metadata
print("metadata", metadata)
write_to_s3(job_bucket, mapper_fname, json.dumps(output), metadata)
return pret

Expand Down
6 changes: 3 additions & 3 deletions src/python/reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
import random
import resource
import StringIO
from io import StringIO
import urllib2
import time

Expand Down Expand Up @@ -55,14 +55,14 @@ def lambda_handler(event, context):
results[srcIp] = 0
results[srcIp] += float(val)
except Exception, e:
print e
print(e)

time_in_secs = (time.time() - start_time)
#timeTaken = time_in_secs * 1000000000 # in 10^9
#s3DownloadTime = 0
#totalProcessingTime = 0
pret = [len(reducer_keys), line_count, time_in_secs]
print "Reducer ouputput", pret
print("Reducer ouputput", pret)

if n_reducers == 1:
# Last reducer file, final result
Expand Down
18 changes: 9 additions & 9 deletions src/python/reducerCoordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import lambdautils
import random
import re
import StringIO
from io import StringIO
import time
import urllib

Expand Down Expand Up @@ -127,33 +127,33 @@ def lambda_handler(event, context):
files = s3_client.list_objects(Bucket=bucket, Prefix=job_id)["Contents"]

if check_job_done(files) == True:
print "Job done!!! Check the result file"
print("Job done!!! Check the result file")
# TODO: Delete reducer and coordinator lambdas
return
else:
### Stateless Coordinator logic
mapper_keys = get_mapper_files(files)
print "Mappers Done so far ", len(mapper_keys)
print("Mappers Done so far ", len(mapper_keys))

if map_count == len(mapper_keys):

# All the mappers have finished, time to schedule the reducers
stepInfo = get_reducer_state_info(files, job_id, bucket)

print "stepInfo", stepInfo
print("stepInfo", stepInfo)

step_number = stepInfo[0];
reducer_keys = stepInfo[1];

if len(reducer_keys) == 0:
print "Still waiting to finish Reducer step ", step_number
print("Still waiting to finish Reducer step ", step_number)
return

# Compute this based on metadata of files
r_batch_size = get_reducer_batch_size(reducer_keys);

print "Starting the the reducer step", step_number
print "Batch Size", r_batch_size
print("Starting the the reducer step", step_number)
print("Batch Size", r_batch_size)

# Create Batch params for the Lambda function
r_batch_params = lambdautils.batch_creator(reducer_keys, r_batch_size);
Expand All @@ -180,13 +180,13 @@ def lambda_handler(event, context):
"reducerId": i
})
)
print resp
print(resp)

# Now write the reducer state
fname = "%s/reducerstate.%s" % (job_id, step_id)
write_reducer_state(n_reducers, n_s3, bucket, fname)
else:
print "Still waiting for all the mappers to finish .."
print("Still waiting for all the mappers to finish ..")

'''
ev = {
Expand Down
4 changes: 2 additions & 2 deletions src/python/s3_download_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def lambda_handler(event, context):
contents = response['Body'].read()

time_in_secs = (time.time() - start_time)
print "Time taken (s)", time_in_secs
print "Size (MB)", total_bytes / 1024/1024
print("Time taken (s)", time_in_secs)
print("Size (MB)", total_bytes / 1024/1024)
return time_in_secs

'''
Expand Down