forked from yunojuno/lambda-s3-sftp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_to_sftp.py
187 lines (147 loc) · 6.59 KB
/
s3_to_sftp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
AWS Lambda function for transferring files from S3 to SFTP on a create event.
Style note: the specific S3 interactions have been split out into very simple
one line functions - this is to make the code easier to read and test. It would
be perfectly valid to just have a single function that runs the entire thing.
Required env vars:
SSH_HOSTNAME
SSH_USERNAME
SSH_PASSWORD or SSH_PRIVATE_KEY (S3 file path in 'bucket:key' format)
Optional env vars
SSH_PORT - defaults to 22
SSH_DIR - if specified the SFTP client will transfer the files to the
specified directory.
SSH_FILENAME - used as a mask for the remote filename. Supports three
string replacement vars - {bucket}, {key}, {current_date}. Bucket
and key refer to the uploaded S3 file. Current date is in ISO format.
"""
import datetime
import io
import logging
import os
import boto3
import botocore.exceptions
import paramiko
logger = logging.getLogger()
logger.setLevel(os.getenv('LOGGING_LEVEL', 'DEBUG'))
# read in shared properties on module load - will fail hard if any are missing
SSH_HOST = os.environ['SSH_HOST']
SSH_USERNAME = os.environ['SSH_USERNAME']
# must have one of pwd / key - fail hard if both are missing
SSH_PASSWORD = os.getenv('SSH_PASSWORD')
# path to a private key file on S3 in 'bucket:key' format.
SSH_PRIVATE_KEY = os.getenv('SSH_PRIVATE_KEY')
assert SSH_PASSWORD or SSH_PRIVATE_KEY, "Missing SSH_PASSWORD or SSH_PRIVATE_KEY"
# optional
SSH_PORT = int(os.getenv('SSH_PORT', 22))
SSH_DIR = os.getenv('SSH_DIR')
# filename mask used for the remote file
SSH_FILENAME = os.getenv('SSH_FILENAME', 'data_{current_date}')
def on_trigger_event(event, context):
"""
Move uploaded S3 files to SFTP endpoint, then delete.
This is the Lambda entry point. It receives the event payload and
processes it. In this case it receives a set of 'Record' dicts which
should contain details of an S3 file create event. The contents of this
dict can be found in the tests.py::TEST_RECORD - the example comes from
the Lambda test event rig.
The only important information we process in this function are the
`eventName`, which must start with ObjectCreated, and then the bucket name
and object key.
This function then connects to the SFTP server, copies the file across,
and then (if successful), deletes the original. This is done to prevent
sensitive data from hanging around - it basically only exists for as long
as it takes Lambda to pick it up and transfer it.
See http://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html
Args:
event: dict, the event payload delivered by Lambda.
context: a LambdaContext object - unused.
"""
if SSH_PRIVATE_KEY:
key_obj = get_private_key(*SSH_PRIVATE_KEY.split(':'))
else:
key_obj = None
# prefix all logging statements - otherwise impossible to filter out in
# Cloudwatch
logger.info(f"S3-SFTP: received trigger event")
sftp_client, transport = connect_to_sftp(
hostname=SSH_HOST,
port=SSH_PORT,
username=SSH_USERNAME,
password=SSH_PASSWORD,
pkey=key_obj
)
if SSH_DIR:
sftp_client.chdir(SSH_DIR)
logger.debug(f"S3-SFTP: Switched into remote SFTP upload directory")
with transport:
for s3_file in s3_files(event):
filename = sftp_filename(SSH_FILENAME, s3_file)
bucket = s3_file.bucket_name
contents = ''
try:
logger.info(f"S3-SFTP: Transferring S3 file '{s3_file.key}'")
transfer_file(sftp_client, s3_file, filename)
except botocore.exceptions.BotoCoreError as ex:
logger.exception(f"S3-SFTP: Error transferring S3 file '{s3_file.key}'.")
contents = str(ex)
filename = filename + '.x'
def connect_to_sftp(hostname, port, username, password, pkey):
"""Connect to SFTP server and return client object."""
transport = paramiko.Transport((hostname, port))
transport.connect(username=username, password=password, pkey=pkey)
client = paramiko.SFTPClient.from_transport(transport)
logger.debug(f"S3-SFTP: Connected to remote SFTP server")
return client, transport
def get_private_key(bucket, key):
"""
Return an RSAKey object from a private key stored on S3.
It will fail hard if the key cannot be read, or is invalid.
"""
key_obj = boto3.resource('s3').Object(bucket, key)
key_str = key_obj.get()['Body'].read().decode('utf-8')
key = paramiko.RSAKey.from_private_key(io.StringIO(key_str))
logger.debug(f"S3-SFTP: Retrieved private key from S3")
return key
def s3_files(event):
"""
Iterate through event and yield boto3.Object for each S3 file created.
This function loops through all the records in the payload,
checks that the event is a file creation, and if so, yields a
boto3.Object that represents the file.
NB Redshift will trigger an `ObjectCreated:CompleteMultipartUpload` event
will UNLOADing the data; if you select to dump a manifest file as well,
then this will trigger `ObjectCreated:Put`
Args:
event: dict, the payload received from the Lambda trigger.
See tests.py::TEST_RECORD for a sample.
"""
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
event_category, event_subcat = record['eventName'].split(':')
if event_category == 'ObjectCreated':
logger.info(f"S3-SFTP: Received '{ event_subcat }' trigger on '{ key }'")
yield boto3.resource('s3').Object(bucket, key)
else:
logger.warning(f"S3-SFTP: Ignoring invalid event: { record }")
def sftp_filename(file_mask, s3_file):
"""Create destination SFTP filename."""
return file_mask.format(
bucket=s3_file.bucket_name,
key=s3_file.key.replace("_000", ""),
current_date=datetime.date.today().isoformat()
)
def transfer_file(sftp_client, s3_file, filename):
"""
Transfer S3 file to SFTP server.
Args:
sftp_client: paramiko.SFTPClient, connected to SFTP endpoint
s3_file: boto3.Object representing the S3 file
filename: string, the remote filename to use
Returns a 2-tuple containing the name of the remote file as transferred,
and any status message to be written to the archive file.
"""
with sftp_client.file(filename, 'w') as sftp_file:
s3_file.download_fileobj(Fileobj=sftp_file)
logger.info(f"S3-SFTP: Transferred '{ s3_file.key }' from S3 to SFTP as '{ filename }'")