Skip to content

Commit

Permalink
Merge pull request #53 from obsrvbl/verison-3.0.0
Browse files Browse the repository at this point in the history
Verison 3.0.0
  • Loading branch information
bbayles authored May 25, 2021
2 parents 336001e + c2bc177 commit ceda052
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 91 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Other command line switches:

For CloudWatch Logs locations:

* `flowlogs_reader --fields='${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}'` - use the given `fields` to prevent the module from querying EC2 for the log line format
* `flowlogs_reader --filter-pattern='REJECT' location` - use the given [filter pattern](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/FilterAndPatternSyntax.html) to have the server limit the output

For S3 locations:
Expand Down Expand Up @@ -149,13 +150,12 @@ By default these classes will yield records from the last hour.

You can control what's retrieved with these parameters:
* `start_time` and `end_time` are Python `datetime.datetime` objects
* `region_name` is a string like `'us-east-1'`. This will be used to create a [boto3 Session object](http://boto3.readthedocs.io/en/latest/reference/core/session.html#boto3.session.Session).
* `profile_name` is a string like `'my-profile'`
* `boto_client_kwargs` is a dictionary of parameters to pass when creating the [boto3 client](http://boto3.readthedocs.io/en/latest/reference/core/session.html#boto3.session.Session.client).
* `boto_client` is a boto3 client object. This takes overrides `region_name`, `profile_name`, and `boto_client_kwargs`.
* `region_name` is a string like `'us-east-1'`.
* `boto_client` is a boto3 client object.

When using `FlowLogsReader` with CloudWatch Logs:

* The `fields` keyword is a tuple like `('version', 'account-id')`. If not supplied then the EC2 API will be queried to find out the log format.
* The `filter_pattern` keyword is a string like `REJECT` or `443` used to filter the logs. See the examples below.

When using `S3FlowLogsReader` with S3:
Expand Down
9 changes: 9 additions & 0 deletions flowlogs_reader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def get_reader(args):
if args.location_type == 'cwl':
cls = FlowLogsReader
client_type = 'logs'
kwargs['fields'] = None
elif args.location_type == 's3':
cls = S3FlowLogsReader
client_type = 's3'
Expand All @@ -116,6 +117,9 @@ def get_reader(args):
if args.end_time:
kwargs['end_time'] = datetime.strptime(args.end_time, time_format)

if args.location_type == 'cwl' and args.fields:
kwargs['fields'] = tuple(x.strip('${}') for x in args.fields.split())

if args.location_type == 'cwl' and args.filter_pattern:
kwargs['filter_pattern'] = args.filter_pattern

Expand Down Expand Up @@ -198,6 +202,11 @@ def main(argv=None):
help='format of time to parse',
)
# Other filtering parameters
parser.add_argument(
'--fields',
type=str,
help='Log line format (CWL only)',
)
parser.add_argument(
'--filter-pattern',
type=str,
Expand Down
101 changes: 58 additions & 43 deletions flowlogs_reader/flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
from datetime import datetime, timedelta
from gzip import open as gz_open
from os.path import basename
from threading import Lock

import boto3
from botocore.exceptions import NoRegionError, PaginationError

from botocore.exceptions import PaginationError
from dateutil.rrule import rrule, DAILY

DEFAULT_FILTER_PATTERN = (
'[version="2", account_id, interface_id, srcaddr, dstaddr, '
'srcport, dstport, protocol, packets, bytes, '
'start, end, action, log_status]'
DEFAULT_FIELDS = (
'version',
'account_id',
'interface_id',
'srcaddr',
'dstaddr',
'srcport',
'dstport',
'protocol',
'packets',
'bytes',
'start',
'end',
'action',
'log_status',
)
DEFAULT_REGION_NAME = 'us-east-1'
DUPLICATE_NEXT_TOKEN_MESSAGE = 'The same next token was received twice'

# The lastEventTimestamp may be delayed by up to an hour:
Expand All @@ -40,6 +52,8 @@
SKIPDATA = 'SKIPDATA'
NODATA = 'NODATA'

THREAD_LOCK = Lock()


class FlowRecord:
"""
Expand Down Expand Up @@ -175,11 +189,11 @@ def to_message(self):
return ' '.join(ret)

@classmethod
def from_cwl_event(cls, cwl_event):
fields = cwl_event['message'].split()
def from_cwl_event(cls, cwl_event, fields=DEFAULT_FIELDS):
data = cwl_event['message'].split()

event_data = {}
for key, value in zip(cls.__slots__, fields):
for key, value in zip(fields, data):
event_data[key] = value

return cls(event_data)
Expand All @@ -190,50 +204,25 @@ def __init__(
self,
client_type,
region_name=None,
profile_name=None,
start_time=None,
end_time=None,
boto_client_kwargs=None,
boto_client=None,
):
# Get a boto3 client with which to perform queries
self.region_name = region_name
if boto_client is not None:
self.boto_client = boto_client
else:
self.boto_client = self._get_client(
client_type, region_name, profile_name, boto_client_kwargs
)
kwargs = {'region_name': region_name} if region_name else {}
self.boto_client = boto3.client(client_type, **kwargs)

# If no time filters are given use the last hour
now = datetime.utcnow()
self.start_time = start_time or now - timedelta(hours=1)
self.end_time = end_time or now

# Initialize the iterator
self.bytes_processed = 0
self.iterator = self._reader()

def _get_client(
self, client_type, region_name, profile_name, boto_client_kwargs
):
session_kwargs = {}
if region_name is not None:
session_kwargs['region_name'] = region_name

if profile_name is not None:
session_kwargs['profile_name'] = profile_name

client_kwargs = boto_client_kwargs or {}

session = boto3.session.Session(**session_kwargs)
try:
boto_client = session.client(client_type, **client_kwargs)
except NoRegionError:
boto_client = session.client(
client_type, region_name=DEFAULT_REGION_NAME, **client_kwargs
)

return boto_client

def __iter__(self):
return self

Expand Down Expand Up @@ -261,23 +250,41 @@ class FlowLogsReader(BaseReader):
def __init__(
self,
log_group_name,
filter_pattern=DEFAULT_FILTER_PATTERN,
filter_pattern=None,
thread_count=0,
fields=DEFAULT_FIELDS,
ec2_client=None,
**kwargs,
):
super().__init__('logs', **kwargs)
self.log_group_name = log_group_name

self.paginator_kwargs = {}

if filter_pattern is not None:
self.paginator_kwargs['filterPattern'] = filter_pattern

self.thread_count = thread_count

if fields is None:
fields = self._get_fields(
self.region_name, self.log_group_name, ec2_client=ec2_client
)
self.fields = tuple(f.replace('-', '_') for f in fields)

self.start_ms = timegm(self.start_time.utctimetuple()) * 1000
self.end_ms = timegm(self.end_time.utctimetuple()) * 1000

def _get_fields(self, region_name, log_group_name, ec2_client=None):
if ec2_client is None:
kwargs = {'region_name': region_name} if region_name else {}
ec2_client = boto3.client('ec2', **kwargs)

resp = ec2_client.describe_flow_logs(
Filters=[{'Name': 'log-group-name', 'Values': [log_group_name]}]
)
log_format = resp['FlowLogs'][0]['LogFormat']
return tuple(x.strip('${}') for x in log_format.split())

def _get_log_streams(self):
paginator = self.boto_client.get_paginator('describe_log_streams')
all_pages = paginator.paginate(
Expand Down Expand Up @@ -318,7 +325,13 @@ def _read_streams(self, stream_name=None):

try:
for page in response_iterator:
yield from page['events']
page_bytes = 0
for event in page.get('events', []):
page_bytes += len(event['message'])
yield event

with THREAD_LOCK:
self.bytes_processed += page_bytes
except PaginationError as e:
if e.kwargs['message'].startswith(DUPLICATE_NEXT_TOKEN_MESSAGE):
pass
Expand All @@ -332,10 +345,10 @@ def _reader(self):
func = lambda x: list(self._read_streams(x))
for events in executor.map(func, all_streams):
for event in events:
yield FlowRecord.from_cwl_event(event)
yield FlowRecord.from_cwl_event(event, self.fields)
else:
for event in self._read_streams():
yield FlowRecord.from_cwl_event(event)
yield FlowRecord.from_cwl_event(event, self.fields)


class S3FlowLogsReader(BaseReader):
Expand Down Expand Up @@ -368,6 +381,8 @@ def _read_file(self, key):
f.replace('-', '_') for f in reader.fieldnames
]
yield from reader
with THREAD_LOCK:
self.bytes_processed += gz_f.tell()

def _get_keys(self, prefix):
# S3 keys have a file name like:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[metadata]
name = flowlogs_reader
version = 2.4.0
version = 3.0.0
license = Apache
url = https://github.com/obsrvbl/flowlogs-reader
description = Reader for AWS VPC Flow Logs
Expand Down
Loading

0 comments on commit ceda052

Please sign in to comment.