Skip to content

Commit

Permalink
Merge pull request pandas-dev#376 from manahl/concat-flag
Browse files Browse the repository at this point in the history
Re-adding concat flag back
  • Loading branch information
AdrianTeng authored Jun 20, 2017
2 parents 0d945f6 + 926cedd commit e0ab146
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 11 deletions.
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## Changelog

### 1.47 (2017-06-19)
* Feature: Re-introduce #363 `concat` flag, essentially undo-ing 1.45

### 1.46 (2017-06-13)
* Feature: #374 Shard BSONStore on `_id` rather than `symbol`

Expand All @@ -18,7 +21,7 @@
* Feature: #365 add generic BSON store

### 1.42 (2017-05-12)
* Bugfix: #346 fixed daterange subsetting error on very large dateframes in version store
* Bugfix: #346 fixed daterange subsetting error on very large dataframes in version store
* Bugfix: #351 $size queries can't use indexes, use alternative queries

### 1.41 (2017-04-20)
Expand Down
30 changes: 25 additions & 5 deletions arctic/store/_pandas_ndarray_store.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import ast
import logging

import numpy as np
from bson.binary import Binary
from pandas import DataFrame, Series, Panel
import numpy as np

from arctic.exceptions import UnorderedDataException
from arctic.serialization.numpy_records import SeriesSerializer, DataFrameSerializer
from ._ndarray_store import NdarrayStore
from .._compression import compress, decompress
from ..date._util import to_pandas_closed_closed
from ..exceptions import ArcticException
from ._ndarray_store import NdarrayStore


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -116,6 +116,24 @@ def get_info(self, version):
ret['dtype'] = ast.literal_eval(version['dtype'])
return ret

def read_segment_last_dt(self, version):
if 'segment_index' in version:
index = np.fromstring(decompress(version['segment_index']), dtype=INDEX_DTYPE)
dt_index = self._datetime64_index(index)
if dt_index:
return index[dt_index][-1]
return None

def slice_overlap_item_or_raise(self, item, previous_version, concat):
"""If new item has overlap dt with previous version, keep only new bits if concat=True; raise if concat=False"""
prev_version_last_dt = self.read_segment_last_dt(previous_version)
if prev_version_last_dt and len(item) > 0 and item.index[0] <= prev_version_last_dt:
if concat:
item = item[item.index > prev_version_last_dt]
else:
raise UnorderedDataException(
"new data {} before to symbol ending {}".format(item.index[0], prev_version_last_dt))
return item

def _start_end(date_range, dts):
"""
Expand Down Expand Up @@ -152,7 +170,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
item, md = self.SERIALIZER.serialize(item)
super(PandasSeriesStore, self).write(arctic_lib, version, symbol, item, previous_version, dtype=md)

def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
def append(self, arctic_lib, version, symbol, item, previous_version, concat=False, **kwargs):
item = self.slice_overlap_item_or_raise(item, previous_version, concat)
item, md = self.SERIALIZER.serialize(item)
super(PandasSeriesStore, self).append(arctic_lib, version, symbol, item, previous_version, dtype=md, **kwargs)

Expand All @@ -176,7 +195,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
item, md = self.SERIALIZER.serialize(item)
super(PandasDataFrameStore, self).write(arctic_lib, version, symbol, item, previous_version, dtype=md)

def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
def append(self, arctic_lib, version, symbol, item, previous_version, concat=False, **kwargs):
item = self.slice_overlap_item_or_raise(item, previous_version, concat)
item, md = self.SERIALIZER.serialize(item)
super(PandasDataFrameStore, self).append(arctic_lib, version, symbol, item, previous_version, dtype=md, **kwargs)

Expand Down
46 changes: 46 additions & 0 deletions tests/integration/store/test_pandas_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,52 @@ def test_dataframe_append_should_add_new_columns_and_reorder(library):
assert_frame_equal(expected, actual)


def test_series_append_concat(library):
s1 = Series(data=[1.0], index=[dt(2012, 1, 1)])
s2 = Series([1.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)])
s2.index.name = 'index'
s2.name = 'values'
library.write('TEST_1', s1)
library.append('TEST_1', s2, concat=True)
result = library.read('TEST_1').data
assert_series_equal(s2, result)


def test_series_append_concat_only_appends_end(library):
s1 = Series([1.0], [dt(2012, 1, 1)])
s2 = Series([2.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)])
library.write('TEST_1', s1)
library.append('TEST_1', s2, concat=True)

result = library.read('TEST_1').data
expected = Series([1.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)])
expected.index.name = 'index'
expected.name = 'values'
assert_series_equal(expected, result)


def test_frame_append_concat(library):
df1 = DataFrame(data=[1.0], index=[dt(2012, 1, 1)], columns=['a'])
df2 = DataFrame([1.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)], columns=['a'])
df2.index.name = 'index'
library.write('TEST_1', df1)
library.append('TEST_1', df2, concat=True)
result = library.read('TEST_1').data
assert_frame_equal(df2, result)


def test_frame_append_concat_only_appends_end(library):
df1 = DataFrame([1.0], [dt(2012, 1, 1)], columns=['a'])
df2 = DataFrame([2.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)], columns=['a'])
library.write('TEST_1', df1)
library.append('TEST_1', df2, concat=True)

result = library.read('TEST_1').data
expected = DataFrame([1.0, 2.0], [dt(2012, 1, 1), dt(2012, 1, 2)], columns=['a'])
expected.index.name = 'index'
assert_frame_equal(expected, result)


# -- auto generated tests --- #
def dataframe(columns, length, index):
df = DataFrame(np.ones((length, columns)), columns=list(string.ascii_lowercase[:columns]))
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
2012-11-08 17:06:11.040 | 3.0""")

ts1_append = read_str_as_pandas(""" times | near
2012-09-08 17:06:11.040 | 1.0
2012-10-08 17:06:11.040 | 2.0
2012-10-09 17:06:11.040 | 2.5
2012-11-08 17:06:11.040 | 3.0
2012-11-09 17:06:11.040 | 3.0""")
2012-11-09 17:06:11.040 | 1.0
2012-11-10 17:06:11.040 | 2.0
2012-11-11 17:06:11.040 | 2.5
2012-11-12 17:06:11.040 | 3.0
2012-11-13 17:06:11.040 | 3.0""")


symbol = 'TS1'
Expand Down

0 comments on commit e0ab146

Please sign in to comment.