Skip to content

Commit

Permalink
Merge pull request #56 from obsrvbl/handle-parquet
Browse files Browse the repository at this point in the history
Handle AWS parquet format
  • Loading branch information
MayCisco authored Jan 25, 2022
2 parents 43cf350 + 0a6e46d commit fac4c6c
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9.0, 3.10.0-rc.1, pypy3]
python-version: [3.6, 3.7, 3.8, 3.9.0, 3.10.1, pypy3]

steps:
- uses: actions/checkout@v2
Expand Down
30 changes: 22 additions & 8 deletions flowlogs_reader/flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

from calendar import timegm
from concurrent.futures import ThreadPoolExecutor
from csv import DictReader
from csv import DictReader as csv_dict_reader
from datetime import datetime, timedelta
from gzip import open as gz_open
from os.path import basename
from parquet import DictReader as parquet_dict_reader
from threading import Lock

import boto3
import io

from botocore.exceptions import PaginationError
from dateutil.rrule import rrule, DAILY
Expand Down Expand Up @@ -144,7 +146,11 @@ def __init__(self, event_data, EPOCH_32_MAX=2147483647):
('traffic_path', int),
):
value = event_data.get(key, '-')
value = None if (value == '-') else func(value)
if value == '-' or value == 'None' or value is None:
value = None
else:
value = func(value)

setattr(self, key, value)

def __eq__(self, other):
Expand Down Expand Up @@ -377,15 +383,23 @@ def __init__(

def _read_file(self, key):
resp = self.boto_client.get_object(Bucket=self.bucket, Key=key)
with gz_open(resp['Body'], mode='rt') as gz_f:
reader = DictReader(gz_f, delimiter=' ')
reader.fieldnames = [
f.replace('-', '_') for f in reader.fieldnames
]
if key.endswith('.parquet'):
body = resp['Body'].read()
reader = parquet_dict_reader(io.BytesIO(body))
yield from reader
with THREAD_LOCK:
self.bytes_processed += gz_f.tell()
self.bytes_processed += len(body)
self.compressed_bytes_processed += resp['ContentLength']
else:
with gz_open(resp['Body'], mode='rt') as gz_f:
reader = csv_dict_reader(gz_f, delimiter=' ')
reader.fieldnames = [
f.replace('-', '_') for f in reader.fieldnames
]
yield from reader
with THREAD_LOCK:
self.bytes_processed += gz_f.tell()
self.compressed_bytes_processed += resp['ContentLength']

def _get_keys(self, prefix):
# S3 keys have a file name like:
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[metadata]
name = flowlogs_reader
version = 3.1.0
version = 3.2.0
license = Apache
url = https://github.com/obsrvbl/flowlogs-reader
description = Reader for AWS VPC Flow Logs
Expand All @@ -35,6 +35,7 @@ python_requires = >=3.6
install_requires =
boto3>=1.7.75
botocore>=1.10.75
parquet>=1.3.1
python-dateutil>=2.7.0

[options.packages.find]
Expand Down
Binary file added tests/data/flows.parquet
Binary file not shown.
146 changes: 146 additions & 0 deletions tests/test_flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
'subnet-0123456789abcdef 7 7 IPv4 5 vpc-04456ab739938ee3f\n'
)

PARQUET_FILE = 'tests/data/flows.parquet'


class FlowRecordTestCase(TestCase):
def test_parse(self):
Expand Down Expand Up @@ -757,6 +759,150 @@ def test_serial_v5(self):
reader.compressed_bytes_processed, len(compress(V5_FILE.encode()))
)

def _test_parquet_reader(self, data, expected):
boto_client = boto3.client('s3')
with Stubber(boto_client) as stubbed_client:
# Accounts call
accounts_response = {
'ResponseMetadata': {'HTTPStatusCode': 200},
'CommonPrefixes': [
# This one is used
{'Prefix': 'AWSLogs/123456789010/'},
# This one is ignored
{'Prefix': 'AWSLogs/123456789011/'},
],
}
accounts_params = {
'Bucket': 'example-bucket',
'Delimiter': '/',
'Prefix': 'AWSLogs/',
}
stubbed_client.add_response(
'list_objects_v2', accounts_response, accounts_params
)
# Regions call
regions_response = {
'ResponseMetadata': {'HTTPStatusCode': 200},
'CommonPrefixes': [
# This one is used
{'Prefix': 'AWSLogs/123456789010/vpcflowlogs/pangaea-1/'},
# This one is ignored
{'Prefix': 'AWSLogs/123456789010/vpcflowlogs/pangaea-2/'},
],
}
regions_params = {
'Bucket': 'example-bucket',
'Delimiter': '/',
'Prefix': 'AWSLogs/123456789010/vpcflowlogs/',
}
stubbed_client.add_response(
'list_objects_v2', regions_response, regions_params
)
# List objects call
list_response = {
'ResponseMetadata': {'HTTPStatusCode': 200},
'Contents': [
{
'Key': (
'AWSLogs/123456789010/vpcflowlogs/pangaea-1/'
'2015/08/12/'
'123456789010_vpcflowlogs_'
'pangaea-1_fl-102010_'
'20150812T1200Z_'
'h45h.log.parquet'
),
},
],
}
list_params = {
'Bucket': 'example-bucket',
'Prefix': (
'AWSLogs/123456789010/vpcflowlogs/pangaea-1/2015/08/12/'
),
}
stubbed_client.add_response(
'list_objects_v2', list_response, list_params
)

get_response = {
'ResponseMetadata': {'HTTPStatusCode': 200},
'Body': StreamingBody(BytesIO(data), len(data)),
'ContentLength': len(data),
}
get_params = {
'Bucket': 'example-bucket',
'Key': (
'AWSLogs/123456789010/vpcflowlogs/pangaea-1/'
'2015/08/12/'
'123456789010_vpcflowlogs_'
'pangaea-1_fl-102010_'
'20150812T1200Z_'
'h45h.log.parquet'
),
}
stubbed_client.add_response('get_object', get_response, get_params)
# Do the deed
stubbed_client.activate()
reader = S3FlowLogsReader(
'example-bucket',
start_time=self.start_time,
end_time=self.end_time,
thread_count=self.thread_count,
include_accounts={'123456789010'},
include_regions={'pangaea-1'},
boto_client=boto_client,
)
actual = [record.to_dict() for record in reader]
self.assertEqual(actual, expected)

return reader

def test_serial_parquet(self):
expected = [
{
'version': 2,
'account_id': '123456789010',
'interface_id': 'eni-102010ab',
'srcaddr': '198.51.100.1',
'dstaddr': '192.0.2.1',
'srcport': 443,
'dstport': 49152,
'protocol': 6,
'packets': 10,
'bytes': 840,
'start': datetime(2015, 8, 12, 13, 47, 43),
'end': datetime(2015, 8, 12, 13, 47, 44),
'action': 'ACCEPT',
'log_status': 'OK',
},
{
'version': 2,
'account_id': '123456789010',
'interface_id': 'eni-202010ab',
'start': datetime(2015, 8, 12, 13, 47, 43),
'end': datetime(2015, 8, 12, 13, 47, 44),
'log_status': 'NODATA',
},
{
'version': 2,
'account_id': '123456789010',
'interface_id': 'eni-202010ab',
'srcaddr': '198.51.100.7',
'dstaddr': '192.0.2.156',
'srcport': 80,
'dstport': 100,
'protocol': 8080,
'start': datetime(2015, 8, 12, 13, 47, 43),
'end': datetime(2015, 8, 12, 13, 47, 44),
'log_status': 'NODATA',
},
]
with open(PARQUET_FILE, "rb") as parquet_data:
data = parquet_data.read()
reader = self._test_parquet_reader(data, expected)
self.assertEqual(reader.compressed_bytes_processed, len(data))
self.assertEqual(reader.bytes_processed, parquet_data.tell())

def test_threads(self):
expected = [
{
Expand Down

0 comments on commit fac4c6c

Please sign in to comment.