Skip to content

Commit

Permalink
Merge pull request pandas-dev#146 from manahl/tickstore-querying
Browse files Browse the repository at this point in the history
Make sure we don't fetch chunks that don't span the start point
  • Loading branch information
mildbyte committed Jun 7, 2016
2 parents a600275 + c00a533 commit cb0bee6
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 11 deletions.
22 changes: 16 additions & 6 deletions arctic/tickstore/tickstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ def _mongo_date_range_query(self, symbol, date_range):
if date_range.start:
assert date_range.start.tzinfo
start = date_range.start

# If all chunks start inside of the range, we default to capping to our
# range so that we don't fetch any chunks from the beginning of time
start_range['$gte'] = start

match = self._symbol_query(symbol)
match.update({'s': {'$lte': start}})

Expand All @@ -175,17 +180,22 @@ def _mongo_date_range_query(self, symbol, date_range):
# Throw away everything but the start of every chunk and the symbol
{'$project': {'_id': 0, 's': 1, 'sy': 1}},
# For every symbol, get the latest chunk start (that is still before
# our sought start, so all of these chunks span the start point)
# our sought start)
{'$group': {'_id': '$sy', 'start': {'$max': '$s'}}},
# Take the earliest one of these chunk starts
{'$sort': {'start': 1}},
{'$limit': 1}])
])
# Now we need to get the earliest start of the chunk that still spans the start point.
# Since we got them sorted by start, we just need to fetch their ends as well and stop
# when we've seen the first such chunk
try:
first_dt = next(result)['start']
for candidate in result:
chunk = self._collection.find_one({'s': candidate['start'], 'sy': candidate['_id']}, {'e': 1})
if chunk['e'].replace(tzinfo=mktz('UTC')) >= start:
start_range['$gte'] = candidate['start'].replace(tzinfo=mktz('UTC'))
break
except StopIteration:
pass
if first_dt:
start_range['$gte'] = first_dt


# Find the end bound
if date_range.end:
Expand Down
87 changes: 87 additions & 0 deletions tests/integration/tickstore/test_ts_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,93 @@ def test_read_chunk_boundaries(tickstore_lib):
assert len(tickstore_lib.read(['SYM1', 'SYM2'], columns=None, date_range=DateRange(dt(2013, 6, 1, 12, 45, tzinfo=mktz('UTC')), dt(2013, 6, 1, 15, 00, tzinfo=mktz('UTC'))))) == 4


def test_read_spanning_chunks(tickstore_lib):
SYM1_DATA = [
{'a': 1.,
'b': 2.,
'index': dt(2013, 6, 1, 11, 00, tzinfo=mktz('UTC'))
},
{'a': 3.,
'b': 4.,
'index': dt(2013, 6, 1, 12, 00, tzinfo=mktz('UTC'))
},
# Chunk boundary here
{'a': 5.,
'b': 6.,
'index': dt(2013, 6, 1, 14, 00, tzinfo=mktz('UTC'))
}
]
SYM2_DATA = [
{'a': 7.,
'b': 8.,
'index': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC'))
},
{'a': 9.,
'b': 10.,
'index': dt(2013, 6, 1, 13, 30, tzinfo=mktz('UTC'))
},
# Chunk boundary here
{'a': 11.,
'b': 12.,
'index': dt(2013, 6, 1, 14, 30, tzinfo=mktz('UTC'))
}
]
tickstore_lib._chunk_size = 2
tickstore_lib.write('SYM1', SYM1_DATA)
tickstore_lib.write('SYM2', SYM2_DATA)

# Even though the latest chunk that's the closest to the start point for SYM1 starts at 11:00, it ends before the start point,
# so we want to ignore it and start from SYM2 (12:30) instead.
assert tickstore_lib._mongo_date_range_query(
['SYM1', 'SYM2'],
date_range=DateRange(dt(2013, 6, 1, 12, 45, tzinfo=mktz('UTC')),
dt(2013, 6, 1, 15, 00, tzinfo=mktz('UTC')))) == \
{'s': {'$gte': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC')), '$lte': dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC'))}}


def test_read_inside_range(tickstore_lib):
SYM1_DATA = [
{'a': 1.,
'b': 2.,
'index': dt(2013, 6, 1, 0, 00, tzinfo=mktz('UTC'))
},
{'a': 3.,
'b': 4.,
'index': dt(2013, 6, 1, 1, 00, tzinfo=mktz('UTC'))
},
# Chunk boundary here
{'a': 5.,
'b': 6.,
'index': dt(2013, 6, 1, 14, 00, tzinfo=mktz('UTC'))
}
]
SYM2_DATA = [
{'a': 7.,
'b': 8.,
'index': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC'))
},
{'a': 9.,
'b': 10.,
'index': dt(2013, 6, 1, 13, 30, tzinfo=mktz('UTC'))
},
# Chunk boundary here
{'a': 11.,
'b': 12.,
'index': dt(2013, 6, 1, 14, 30, tzinfo=mktz('UTC'))
}
]
tickstore_lib._chunk_size = 2
tickstore_lib.write('SYM1', SYM1_DATA)
tickstore_lib.write('SYM2', SYM2_DATA)

# If there are no chunks spanning the range, we still cap the start range so that we don't
# fetch SYM1's 0am--1am chunk
assert tickstore_lib._mongo_date_range_query(
['SYM1', 'SYM2'],
date_range=DateRange(dt(2013, 6, 1, 10, 0, tzinfo=mktz('UTC')),
dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC')))) == \
{'s': {'$gte': dt(2013, 6, 1, 10, 0, tzinfo=mktz('UTC')), '$lte': dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC'))}}

def test_read_longs(tickstore_lib):
DUMMY_DATA = [
{'a': 1,
Expand Down
19 changes: 14 additions & 5 deletions tests/unit/tickstore/test_tickstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@ def test_mongo_date_range_query():
self = create_autospec(TickStore)
self._collection = create_autospec(Collection)
self._symbol_query.return_value = {"sy": { "$in" : [ "s1" , "s2"]}}
self._collection.aggregate.return_value = iter([{"_id": "s1", "start": dt(2014, 1, 1, 0, 0, tzinfo=mktz())}])
self._collection.aggregate.return_value = iter([{"_id": "s1", "start": dt(2014, 1, 1, 0, 0, tzinfo=mktz())},
{"_id": "s2", "start": dt(2014, 1, 1, 12, 0, tzinfo=mktz())}])

self._collection.find_one.side_effect = [
{'e': dt(2014, 1, 1, 15, 0, tzinfo=mktz())},
{'e': dt(2014, 1, 2, 12, 0, tzinfo=mktz())}]

query = TickStore._mongo_date_range_query(self, 'sym', DateRange(dt(2014, 1, 2, 0, 0, tzinfo=mktz()),
dt(2014, 1, 3, 0, 0, tzinfo=mktz())))

assert self._collection.aggregate.call_args_list == [call([
{"$match": {"s": {"$lte": dt(2014, 1, 2, 0, 0, tzinfo=mktz())}, "sy": { "$in" : [ "s1" , "s2"]}}},
{"$project": {"_id": 0, "s": 1, "sy": 1}},
{"$group": {"_id": "$sy", "start": {"$max": "$s"}}},
{"$sort": {"start": 1}},
{"$limit": 1}])]
assert query == {'s': {'$gte': dt(2014, 1, 1, 0, 0, tzinfo=mktz()), '$lte': dt(2014, 1, 3, 0, 0, tzinfo=mktz())}}
{"$sort": {"start": 1}}])]

assert self._collection.find_one.call_args_list == [
call({'sy': 's1', 's': dt(2014, 1, 1, 0, 0, tzinfo=mktz())}, {'e': 1}),
call({'sy': 's2', 's': dt(2014, 1, 1, 12, 0, tzinfo=mktz())}, {'e': 1})]

assert query == {'s': {'$gte': dt(2014, 1, 1, 12, 0, tzinfo=mktz()), '$lte': dt(2014, 1, 3, 0, 0, tzinfo=mktz())}}


def test_mongo_date_range_query_asserts():
Expand Down

0 comments on commit cb0bee6

Please sign in to comment.