Skip to content

Commit

Permalink
Forward pointers implementation (#651)
Browse files Browse the repository at this point in the history
* initial commit for fw pointers implementation

* Finalized implementation of forward pointers

* incorporated PR comments for fw pointers implementation

* fw pointers: chunks_ids to chunk_ids

* fix for forward pointers implementation to be robust over mongo_retry with only subset of segments written

* minor fix for initialization of ARCTIC_FORWARD_POINTERS

* added implementation of cleanup/pruning supporting forward pointers and being also backwards compatible

* updated FW pointers implementation to allow any transition from/to ENABLED/DISABLED/HYBRID per write/append/read

* added comments explaining forward pointers variables and methods

* added check for number of gathered segments when updating fw pointers, and raise operation errors upon failure

* forward pointers implementation with SHAs instead of IDs

* completed fully functional implementation of SHA based forward pointers

* pruning compatible with forward pointers enabled/disabled/hybrid

* don't strip() twice in version str

* remove unnecessary import

* set back the pruning timeout to 120

* set back the pruning timeout to 120

* added numerical value of the arctic version used to crate a version, in the version metadata

* fixed index check integration test

* fixed multiple bugs with concat_and_write, corrected the pruning and fixed multiple tests after enabling for all VersionStore tests to run in all three modes for forward pointers

* fixed all integration tests for versionstore and fixed bug with pruning with FW pointers too early, as the version is not inserted yet when pruning happens

* fixed the last two broken integration tests

* fix the cleanup logic for forward pointers to retain the chunks created in the past than 24h. Added fw pointers in multiple other tests to verify functionality

* moved back in original order the publsh_changes and prune calls

* fixed python 3 compatiblility issue with pruning

* updated changelog

* fixed Binary(b'aaa') != b'aaa' in Python 3

* fixed last remaining failed tests for python 3 related to bson.binary.Binray comparison with binary
  • Loading branch information
dimosped authored Nov 27, 2018
1 parent 883455c commit 6735c04
Show file tree
Hide file tree
Showing 15 changed files with 2,367 additions and 1,613 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### 1.73
* Bugfix: #658 Write/append errors for Panel objects from older pandas versions
* Feature: #653 Add version meta-info in arctic module
* Feature: #663 Include arctic numerical version in the metadata of the version document
* Feature: #650 Implemented forward pointers for chunks in VersionStore (modes: enabled/disabled/hybrid)

### 1.72 (2018-11-06)
* Feature: #577 Added implementation for incremental serializer for numpy records
Expand Down
6 changes: 5 additions & 1 deletion arctic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@

from .arctic import Arctic, register_library_type
from .arctic import VERSION_STORE, TICK_STORE, CHUNK_STORE
from .store.version_store import register_versioned_storage
from .store.version_store import register_versioned_storage, register_version
from .store._pandas_ndarray_store import PandasDataFrameStore, PandasSeriesStore, PandasPanelStore
from .store._ndarray_store import NdarrayStore

try:
from pkg_resources import get_distribution
str_version = get_distribution(__name__).version.strip()
int_parts = tuple(int(x) for x in str_version.split('.'))
num_version = sum([1000 ** i * v for i, v in enumerate(reversed(int_parts))])
register_version(str_version, num_version)
except Exception:
__version__ = None
__version_parts__ = tuple()
__version_numerical__ = 0
else:
__version__ = str_version
__version_parts__ = int_parts
__version_numerical__ = num_version


register_versioned_storage(PandasDataFrameStore)
Expand Down
29 changes: 29 additions & 0 deletions arctic/_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import os

import numpy as np
import pymongo
from enum import Enum
from pandas import DataFrame
from pandas.util.testing import assert_frame_equal

Expand All @@ -15,6 +17,33 @@
_use_new_count_api = None


# This enum provides all the available modes of operation for Forward pointers
class FwPointersCfg(Enum):
ENABLED = 0 # use only forward pointers, don't update segment parent references
DISABLED = 1 # operate in legacy mode, update segment parent references, don't add forward pointers
HYBRID = 2 # maintain both forward pointers and parent references in segments; for reads prefer fw pointers


# The version document key used to store the ObjectIDs of segments
FW_POINTERS_REFS_KEY = 'SEGMENT_SHAS'
# The version document key for storing the FW pointers configuration used to create this version
FW_POINTERS_CONFIG_KEY = 'FW_POINTERS_CONFIG'
# This variable controls has effect in Hybrid mode, and controls whether forward and regacy pointers are cross-verified
ARCTIC_FORWARD_POINTERS_RECONCILE = bool(os.environ.get('ARCTIC_FORWARD_POINTERS_RECONCILE'))
try:
# Controls the mode of operation for FW pointers, has effect on any new versions created
ARCTIC_FORWARD_POINTERS_CFG = FwPointersCfg[(os.environ.get('ARCTIC_FORWARD_POINTERS_CFG',
FwPointersCfg.DISABLED.name).upper())]
except Exception:
logger.exception("Failed to configure forward pointers with configuration {}".format(
os.environ.get('ARCTIC_FORWARD_POINTERS_CFG')))
ARCTIC_FORWARD_POINTERS_CFG = FwPointersCfg.DISABLED


def get_fwptr_config(version):
return FwPointersCfg[version.get(FW_POINTERS_CONFIG_KEY, FwPointersCfg.DISABLED.name)]


def _detect_new_count_api():
try:
mongo_v = [int(v) for v in pymongo.version.split('.')]
Expand Down
2 changes: 2 additions & 0 deletions arctic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ class DataIntegrityException(ArcticException):
"""
pass


class ArcticSerializationException(ArcticException):
pass


class ConcurrentModificationException(DataIntegrityException):
pass

Expand Down
282 changes: 215 additions & 67 deletions arctic/store/_ndarray_store.py

Large diffs are not rendered by default.

81 changes: 72 additions & 9 deletions arctic/store/_version_store_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pandas.compat import pickle_compat
from pymongo.errors import OperationFailure

from arctic._util import mongo_count
from arctic._util import mongo_count, FW_POINTERS_REFS_KEY, FW_POINTERS_CONFIG_KEY, FwPointersCfg, get_fwptr_config


def _split_arrs(array_2d, slices):
Expand Down Expand Up @@ -46,15 +46,28 @@ def checksum(symbol, doc):
return Binary(sha.digest())


def cleanup(arctic_lib, symbol, version_ids):
"""
Helper method for cleaning up chunks from a version store
"""
collection = arctic_lib.get_top_level_collection()
def get_symbol_alive_shas(symbol, versions_coll):
return set(Binary(x) for x in versions_coll.distinct(FW_POINTERS_REFS_KEY, {'symbol': symbol}))


def _cleanup_fw_pointers(collection, symbol, version_ids, versions_coll, shas_to_delete, do_clean=True):
shas_to_delete = set(shas_to_delete) if shas_to_delete else set()

if not version_ids or not shas_to_delete:
return shas_to_delete

symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)

# This is the set of shas which are not referenced by any FW pointers
shas_safe_to_delete = shas_to_delete - symbol_alive_shas

if do_clean and shas_safe_to_delete:
collection.delete_many({'symbol': symbol, 'sha': {'$in': list(shas_safe_to_delete)}})

return shas_safe_to_delete


# Remove any chunks which contain just the parents, at the outset
# We do this here, because $pullALL will make an empty array: []
# and the index which contains the parents field will fail the unique constraint.
def _cleanup_parent_pointers(collection, symbol, version_ids):
for v in version_ids:
# Remove all documents which only contain the parent
collection.delete_many({'symbol': symbol,
Expand All @@ -69,6 +82,56 @@ def cleanup(arctic_lib, symbol, version_ids):
collection.delete_one({'symbol': symbol, 'parent': []})


def _cleanup_mixed(symbol, collection, version_ids, versions_coll):
# Pull the deleted version IDs from the the parents field
collection.update_many({'symbol': symbol, 'parent': {'$in': version_ids}}, {'$pullAll': {'parent': version_ids}})

# All-inclusive set of segments which are pointed by at least one version (SHA fw pointers)
symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)

spec = {'symbol': symbol, 'parent': []}
if symbol_alive_shas:
# This query unfortunately, while it hits the index (symbol, sha) to find the documents, in order to filter
# the documents by "parent: []" it fetches at server side, and pollutes the cache of WiredTiger
# TODO: add a new index for segments collection: (symbol, sha, parent)
spec['sha'] = {'$nin': list(symbol_alive_shas)}
collection.delete_many(spec)


def _get_symbol_pointer_cfgs(symbol, versions_coll):
return set(get_fwptr_config(v)
for v in versions_coll.find({'symbol': symbol}, projection={FW_POINTERS_CONFIG_KEY: 1}))


def cleanup(arctic_lib, symbol, version_ids, versions_coll, shas_to_delete=None, pointers_cfgs=None):
"""
Helper method for cleaning up chunks from a version store
"""
pointers_cfgs = set(pointers_cfgs) if pointers_cfgs else set()
collection = arctic_lib.get_top_level_collection()
version_ids = list(version_ids)

# Iterate versions to check if they are created only with fw pointers, parent pointers (old), or mixed
# Keep in mind that the version is not yet inserted.
all_symbol_pointers_cfgs = _get_symbol_pointer_cfgs(symbol, versions_coll)
all_symbol_pointers_cfgs.update(pointers_cfgs)

# All the versions of the symbol have been created with old arctic or with disabled forward pointers.
# Preserves backwards compatibility and regression for old pointers implementation.
if all_symbol_pointers_cfgs == {FwPointersCfg.DISABLED} or not all_symbol_pointers_cfgs:
_cleanup_parent_pointers(collection, symbol, version_ids)
return

# All the versions of the symbol we wish to delete have been created with forward pointers
if FwPointersCfg.DISABLED not in all_symbol_pointers_cfgs:
_cleanup_fw_pointers(collection, symbol, version_ids, versions_coll,
shas_to_delete=shas_to_delete, do_clean=True)
return

# Reaching here means the symbol has versions with mixed forward pointers and legacy/parent pointer configurations
_cleanup_mixed(symbol, collection, version_ids, versions_coll)


def version_base_or_id(version):
return version.get('base_version_id', version['_id'])

Expand Down
Loading

0 comments on commit 6735c04

Please sign in to comment.