diff --git a/eventdata/parameter_sources/data/agents.json.gz b/eventdata/parameter_sources/data/agents.json.gz index 2b2e841d49a59..65e0eed129b2f 100755 Binary files a/eventdata/parameter_sources/data/agents.json.gz and b/eventdata/parameter_sources/data/agents.json.gz differ diff --git a/eventdata/parameter_sources/data/agents_name_lookup.json.gz b/eventdata/parameter_sources/data/agents_name_lookup.json.gz index 0df0ecf1b4485..7188881930331 100755 Binary files a/eventdata/parameter_sources/data/agents_name_lookup.json.gz and b/eventdata/parameter_sources/data/agents_name_lookup.json.gz differ diff --git a/eventdata/parameter_sources/data/referrers_url_base_lookup.json.gz b/eventdata/parameter_sources/data/referrers_url_base_lookup.json.gz index a74e87bc3539c..c0be464bad658 100755 Binary files a/eventdata/parameter_sources/data/referrers_url_base_lookup.json.gz and b/eventdata/parameter_sources/data/referrers_url_base_lookup.json.gz differ diff --git a/eventdata/parameter_sources/data/requests.json.gz b/eventdata/parameter_sources/data/requests.json.gz index 208e83be3b4b3..ace24b79f1e6e 100755 Binary files a/eventdata/parameter_sources/data/requests.json.gz and b/eventdata/parameter_sources/data/requests.json.gz differ diff --git a/eventdata/parameter_sources/elasticlogs_bulk_source.py b/eventdata/parameter_sources/elasticlogs_bulk_source.py index 7ec478e74f97b..ee38a3f788548 100755 --- a/eventdata/parameter_sources/elasticlogs_bulk_source.py +++ b/eventdata/parameter_sources/elasticlogs_bulk_source.py @@ -3,6 +3,7 @@ logger = logging.getLogger("track.elasticlogs") + class ElasticlogsBulkSource: """ Generates a bulk indexing request for elasticlogs data. @@ -80,12 +81,12 @@ def params(self): bulk_array = [] for x in range(0, self._bulk_size): evt, idx, typ = self._randomevent.generate_event() - bulk_array.append({'index': {'_index': idx, '_type': typ}}) + bulk_array.append('{"index": {"_index": "%s", "_type": "%s"}}"' % (idx, typ)) bulk_array.append(evt) - response = { "body": bulk_array, "action_metadata_present": True, "bulk-size": self._bulk_size } + response = { "body": "\n".join(bulk_array), "action_metadata_present": True, "bulk-size": self._bulk_size } if "pipeline" in self._params.keys(): - response["pipeline"] = params["pipeline"] + response["pipeline"] = self._params["pipeline"] return response diff --git a/eventdata/parameter_sources/randomevent.py b/eventdata/parameter_sources/randomevent.py index ae6371bbbac87..a58c440b0f00f 100755 --- a/eventdata/parameter_sources/randomevent.py +++ b/eventdata/parameter_sources/randomevent.py @@ -1,7 +1,5 @@ import json import random -import datetime -import calendar import gzip import re import os @@ -10,27 +8,28 @@ cwd = os.path.dirname(__file__) + class Agent: def __init__(self): self._agents = WeightedArray('%s/data/agents.json.gz' % cwd) - with gzip.open('%s/data/agents_name_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/agents_name_lookup.json.gz' % cwd, 'rt') as data_file: self._agents_name_lookup = json.load(data_file) - with gzip.open('%s/data/agents_os_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/agents_os_lookup.json.gz' % cwd, 'rt') as data_file: self._agents_os_lookup = json.load(data_file) - with gzip.open('%s/data/agents_os_name_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/agents_os_name_lookup.json.gz' % cwd, 'rt') as data_file: self._agents_os_name_lookup = json.load(data_file) def add_fields(self, event): agent = self._agents.get_random() - event['agent'] = "\"{}\"".format(agent[0]) - event['useragent'] = {} - event['useragent']['os'] = self._agents_os_lookup[agent[1]] - event['useragent']['os_name'] = self._agents_os_name_lookup[agent[2]] - event['useragent']['name'] = self._agents_name_lookup[agent[3]] + event['agent'] = agent[0] + event['useragent_os'] = self._agents_os_lookup[agent[1]] + event['useragent_os_name'] = self._agents_os_name_lookup[agent[2]] + event['useragent_name'] = self._agents_name_lookup[agent[3]] + class ClientIp: def __init__(self): @@ -38,16 +37,14 @@ def __init__(self): self._clientips = WeightedArray('%s/data/clientips.json.gz' % cwd) self._rare_clientips = WeightedArray('%s/data/rare_clientips.json.gz' % cwd) - with gzip.open('%s/data/clientips_country_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/clientips_country_lookup.json.gz' % cwd, 'rt') as data_file: self._clientips_country_lookup = json.load(data_file) - with gzip.open('%s/data/clientips_location_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/clientips_location_lookup.json.gz' % cwd, 'rt') as data_file: self._clientips_location_lookup = json.load(data_file) def add_fields(self, event): p = random.random() - event['geoip'] = {} - if p < self._rare_clientip_probability: data = self._rare_clientips.get_random() event['clientip'] = self.__fill_out_ip_prefix(data[0]) @@ -55,53 +52,57 @@ def add_fields(self, event): data = self._clientips.get_random() event['clientip'] = data[0] - event['geoip']['country_name'] = self._clientips_country_lookup[data[1]] - event['geoip']['location'] = self._clientips_location_lookup[data[2]] + event['country_name'] = self._clientips_country_lookup[data[1]] + event['location'] = self._clientips_location_lookup[data[2]] def __fill_out_ip_prefix(self, ip_prefix): rnd1 = random.random() - v1 = rnd1 * ( 1 - rnd1) * 255 * 4 + v1 = rnd1 * (1 - rnd1) * 255 * 4 k1 = (int)(v1) rnd2 = random.random() - v2 = rnd2 * ( 1 - rnd2) * 255 * 4 + v2 = rnd2 * (1 - rnd2) * 255 * 4 k2 = (int)(v2) return "{}.{}.{}".format(ip_prefix, k1, k2) + class Referrer: def __init__(self): self._referrers = WeightedArray('%s/data/referrers.json.gz' % cwd) - with gzip.open('%s/data/referrers_url_base_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/referrers_url_base_lookup.json.gz' % cwd, 'rt') as data_file: self._referrers_url_base_lookup = json.load(data_file) def add_fields(self, event): data = self._referrers.get_random() + event['referrer'] = "%s%s" % (self._referrers_url_base_lookup[data[0]], data[1]) - event['referrer'] = "\"{}{}\"".format(self._referrers_url_base_lookup[data[0]], data[1]) class Request: def __init__(self): self._requests = WeightedArray('%s/data/requests.json.gz' % cwd) - with gzip.open('%s/data/requests_url_base_lookup.json.gz' % cwd, 'rt') as data_file: + with gzip.open('%s/data/requests_url_base_lookup.json.gz' % cwd, 'rt') as data_file: self._requests_url_base_lookup = json.load(data_file) def add_fields(self, event): data = self._requests.get_random() - event['request'] = "{}{}".format(self._requests_url_base_lookup[data[0]], data[1]) event['bytes'] = data[2] event['verb'] = data[3] event['response'] = data[4] event['httpversion'] = data[5] + class RandomEvent: def __init__(self, params): self._agent = Agent() self._clientip = ClientIp() self._referrer = Referrer() self._request = Request() + # We will reuse the event dictionary. This assumes that each field will be present (and thus overwritten) in each event. + # This reduces object churn and improves peak indexing throughput. + self._event = {} self._index = 'elasticlogs' self._index_pattern = False @@ -116,13 +117,13 @@ def __init__(self, params): self._index_pattern = True self._type = 'logs' - if 'type' in params.keys(): + if 'type' in params.keys(): self._type = params['type'] if 'starting_point' in params.keys(): sp = params['starting_point'] else: - sp ="now" + sp = "now" if 'end_point' in params.keys(): ep = params['end_point'] @@ -140,23 +141,37 @@ def __init__(self, params): self._delete_fields.append(d.split('.')) def generate_event(self): - event = {} + timestruct = self._timestamp_generator.generate_timestamp_struct() + index_name = self.__generate_index_pattern(timestruct) + + event = self._event + event["@timestamp"] = timestruct["iso"] + self._agent.add_fields(event) self._clientip.add_fields(event) self._referrer.add_fields(event) self._request.add_fields(event) - timestruct = self._timestamp_generator.generate_timestamp_struct() - event['@timestamp'] = timestruct['iso'] - if 'message' not in self._delete_fields: - event['message'] = self.__generate_message_field(event) - - index_name = self.__generate_index_pattern(timestruct) - - self.__delete_requested_fields(event) - - return event, index_name, self._type + # note the leading comma! + message = ',"message":"%s"' % self.__generate_message_field(event) + else: + message = '' + + line = '{"@timestamp": "%s", ' \ + '"agent":"%s","useragent":{"os":"%s","os_name":"%s","name":"%s"},' \ + '"clientip":"%s","geoip":{"country_name":"%s","location":%s},' \ + '"referrer":"%s",' \ + '"request": "%s","bytes":%s,"verb":"%s","response":%s,"httpversion":"%s"' \ + '%s}' % \ + (event["@timestamp"], + event["agent"], event["useragent_os"], event["useragent_os_name"], event["useragent_name"], + event["clientip"], event["country_name"], event["location"], + event["referrer"], + event["request"], event["bytes"], event["verb"], event["response"], event["httpversion"], + message) + + return line, index_name, self._type def __generate_index_pattern(self, timestruct): if self._index_pattern: @@ -165,11 +180,6 @@ def __generate_index_pattern(self, timestruct): return self._index def __generate_message_field(self, event): - return '{} - - [{}] "{} {} HTTP/{}" {} {} "-" {} {}'.format(event['clientip'], event['@timestamp'], event['verb'], event['request'], event['httpversion'], event['response'], event['bytes'], event['referrer'], event['agent']) - - def __delete_requested_fields(self, event): - for d in self._delete_fields: - if len(d) == 1 and d[0] in event.keys(): - del event[d[0]] - elif len(d) == 2 and d[0] in event.keys() and d[1] in event[d[0]].keys(): - del event[d[0]][d[1]] + return '{} - - [{}] \\"{} {} HTTP/{}\\" {} {} \\"-\\" {} {}'.format(event['clientip'], event['@timestamp'], event['verb'], + event['request'], event['httpversion'], event['response'], + event['bytes'], event['referrer'], event['agent']) diff --git a/eventdata/parameter_sources/timeutils.py b/eventdata/parameter_sources/timeutils.py index 0a7e80c5f26c8..63e594283c068 100644 --- a/eventdata/parameter_sources/timeutils.py +++ b/eventdata/parameter_sources/timeutils.py @@ -1,12 +1,10 @@ -import random -import math import datetime -import calendar import re import random epoch = datetime.datetime.utcfromtimestamp(0) + class TimeParsingError(Exception): """Exception raised for parameter parsing errors. @@ -16,15 +14,18 @@ class TimeParsingError(Exception): def __init__(self, message): self.message = message + class TimestampStructGenerator: def __init__(self, starting_point, end_point=None, acceleration_factor=1.0): self._start_dt = None self._starting_point = self.__parse_point_def(starting_point) - if(end_point == None): + if end_point is None: self._end_point = None else: self._end_point = self.__parse_point_def(end_point) self._acceleration_factor = acceleration_factor + # reuse to reduce object churn + self._ts = {} @classmethod def StartingPoint(cls, starting_point, acceleration_factor=1.0): @@ -35,7 +36,7 @@ def Interval(cls, starting_point, end_point): return cls(starting_point, end_point) def generate_timestamp_struct(self): - if self._end_point == None: + if self._end_point is None: if self._starting_point['type'] == 'relative': dt = datetime.datetime.utcnow() + self._starting_point['offset'] else: @@ -61,18 +62,18 @@ def generate_timestamp_struct(self): return self.__generate_timestamp_struct_from_datetime(dt) def __generate_timestamp_struct_from_datetime(self, dt): - ts = {} - ts['iso'] = "{}Z".format(dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]) - ts['yyyy'] = ts['iso'][:4] - ts['yy'] = ts['iso'][2:4] - ts['mm'] = ts['iso'][5:7] - ts['dd'] = ts['iso'][8:10] - ts['hh'] = ts['iso'][11:13] - - return ts + # string formatting is about 4 times faster than strftime. + iso = "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ" % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond) + self._ts['iso'] = iso + self._ts['yyyy'] = iso[:4] + self._ts['yy'] = iso[2:4] + self._ts['mm'] = iso[5:7] + self._ts['dd'] = iso[8:10] + self._ts['hh'] = iso[11:13] + return self._ts def __set_start_dt_if_not_set(self): - if self._start_dt == None: + if self._start_dt is None: self._start_dt = datetime.datetime.utcnow() def __parse_point_def(self, point): @@ -82,7 +83,7 @@ def __parse_point_def(self, point): match = re.match("^now([+-]\d+)([hmd])$", point) if match: int_offset = int(match.group(1)) - + if match.group(2) == "m": return { 'type': "relative", 'offset': datetime.timedelta(minutes=int_offset)} diff --git a/eventdata/parameter_sources/weightedarray.py b/eventdata/parameter_sources/weightedarray.py index fd605bbf49bcc..a73ca8baee7d3 100755 --- a/eventdata/parameter_sources/weightedarray.py +++ b/eventdata/parameter_sources/weightedarray.py @@ -1,46 +1,105 @@ import json import random import gzip +import sys +import itertools +import bisect + class WeightedArray: + # These data sets have a very long tail and we apply a staged approach to save memory. + # If we'd just generate one large array, the array length would be in the tens of + # millions for some of the arrays resulting in an unacceptable memory usage per client. + # + # Based on experiments with the current data sets, we settled that one list represents the top 99 + # percent of all items and the other one represents the long tail with the last percent. These + # values provide an acceptable tradeoff for memory usage. + CUTOFF_FREQUENCY = 100 + # defines the percentage of values that represents the "bottom" part. + CUTOFF_PERCENT = 1 / CUTOFF_FREQUENCY + def __init__(self, json_file): - with gzip.open(json_file, 'rt') as data_file: + with gzip.open(json_file, 'rt') as data_file: item_list = json.load(data_file) - self._items = [] - self._totals = [] - self._sum = 0 + random.seed() + # 1. Calculate a histogram of all weights. + h = self.histogram(item_list) - for item in item_list: - self._sum += item[0] - self._items.append(item[1]) - self._totals.append(self._sum) + # 2. Calculate the weight that represents the last percent based on the histogram ... + bottom_percent_weight = self.weight_of_bottom_percent(h, percent=WeightedArray.CUTOFF_PERCENT) - random.seed() + # 3. ... so we can partition the items into the bottom and top parts. + # + # This implementation results in a peak memory usage of one client between 200 and 300 MB. + self._top_choices = self.create_items(item_list, min_weight=bottom_percent_weight) + self._bottom_choices = self.create_items(item_list, max_weight=bottom_percent_weight) - def get_random(self): - return self._items[self.__random_index()] + self._counter = 0 + # we increment before accessing the elements + self._bottom_idx = -1 + self._top_idx = -1 + # Not calculating the length over and over on the hot code path gives us a little bit higher peak throughput + self._bottom_len = len(self._bottom_choices) + self._top_len = len(self._top_choices) - def __random_index(self): - minimumIndex = 0 - maximumIndex = len(self._totals) - 1 + def weight_of_bottom_percent(self, histogram, percent): + """ + Determines the corresponding weight that represents at most the provided number of percent of all elements. + + :param histogram: A histogram of all elements. + :param percent: A float representing the maximum number of elements that should be covered. 1.00 is 100% percent. + """ total = 0 + for weight, frequency in histogram.items(): + total += weight * frequency + + running_total = 0 + for weight, frequency in histogram.items(): + running_total += weight * frequency + if running_total > percent * total: + return weight + + def histogram(self, item_list): + """ + Creates a histogram of the provided items. - rand = random.random() * self._sum + :param item_list: A list of tuples (weight, data). + """ + h = {} + for w, _ in item_list: + if w not in h: + h[w] = 0 + h[w] += 1 + return h - while maximumIndex > minimumIndex: - if self._totals[minimumIndex] > rand: - break + def create_items(self, item_list, min_weight=None, max_weight=None): + choices = [] + weights = [] + low = sys.maxsize - middleIndex = (int)((maximumIndex + minimumIndex) / 2) - total = self._totals[middleIndex] + for w, c in item_list: + if (min_weight and w > min_weight) or (max_weight and w <= max_weight): + low = low if low < w else w + weights.append(w) + choices.append(c) - if total > rand: - maximumIndex = middleIndex - else: - if middleIndex > minimumIndex: - minimumIndex = middleIndex - else: - minimumIndex += 1 - - return minimumIndex + cumdist = list(itertools.accumulate(weights)) + # choose the size of the resulting array so that the item with the lowest frequency still has a chance to appear (once). + total = cumdist[-1] + size = total // low + # pre-generate the randomly distributed weighted choices as we want to avoid any expensive operations + # on the fast-path (i.e. in #get_random()). + # + return [choices[bisect.bisect(cumdist, random.random() * total)] for _ in range(size)] + + def get_random(self): + self._counter += 1 + if self._counter < WeightedArray.CUTOFF_FREQUENCY: + self._top_idx = (self._top_idx + 1) % self._top_len + return self._top_choices[self._top_idx] + else: + # Don't let this counter ever overflow. We're just interested in small counts anyway. + self._counter = 0 + self._bottom_idx = (self._bottom_idx + 1) % self._bottom_len + return self._bottom_choices[self._bottom_idx]