Skip to content

Commit

Permalink
GH 1357: Project caches should not accept arbitrary kwargs
Browse files Browse the repository at this point in the history
Accepting arbitrary kwargs led to confusion for scientists
accidentally passing incorrect keywords. This PR removes
the ability to pass arbitrary kwargs to public Project cache
constructors, to reduce the possibility of silent errors.
  • Loading branch information
kschelonka committed Mar 13, 2020
1 parent dadee8b commit f50680d
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 64 deletions.
102 changes: 79 additions & 23 deletions allensdk/brain_observatory/behavior/behavior_project_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os.path
import csv
from functools import partial
from typing import Type, Callable, Optional, List, Any, Dict
from typing import Type, Optional, List, Any, Dict, Union
from pathlib import Path
import pandas as pd
import time
import logging
Expand All @@ -15,8 +16,7 @@
import BehaviorProjectBase
from allensdk.api.caching_utilities import one_file_call_caching, call_caching
from allensdk.core.exceptions import MissingDataError
from allensdk.core.auth_config import LIMS_DB_CREDENTIAL_MAP
from allensdk.core.authentication import credential_injector, DbCredentials
from allensdk.core.authentication import DbCredentials

BehaviorProjectApi = Type[BehaviorProjectBase]

Expand Down Expand Up @@ -64,11 +64,17 @@ def __init__(
self,
fetch_api: Optional[BehaviorProjectApi] = None,
fetch_tries: int = 2,
**kwargs):
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True):
""" Entrypoint for accessing visual behavior data. Supports
access to summaries of session data and provides tools for
downloading detailed session data (such as dff traces).
Likely you will want to use a class constructor, such as `from_lims`,
to initialize a BehaviorProjectCache, rather than calling this
directly.
--- NOTE ---
Because NWB files are not currently supported for this project (as of
11/2019), this cache will not actually save any files of session data
Expand All @@ -87,38 +93,88 @@ def __init__(
Used to pull data from remote sources, after which it is locally
cached. Any object inheriting from BehaviorProjectBase is
suitable. Current options are:
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
BehaviorProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute"s internal database. Only works if you are
on our internal network.
fetch_tries :
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
**kwargs :
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
other kwargs are passed to allensdk.api.cache.Cache
raising an exception. Note that this is total tries, not retries.
Default=2.
manifest : str or Path
full path at which manifest json will be stored. Defaults
to "behavior_project_manifest.json" in the local directory.
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
Defaults to the manifest version in the class.
cache : bool
Whether to write to the cache. Default=True.
"""
kwargs["manifest"] = kwargs.get("manifest",
"behavior_project_manifest.json")
kwargs["version"] = kwargs.get("version", self.MANIFEST_VERSION)
manifest_ = manifest or "behavior_project_manifest.json"
version_ = version or self.MANIFEST_VERSION

super().__init__(**kwargs)
self.fetch_api = fetch_api or BehaviorProjectLimsApi.default()
super().__init__(manifest=manifest_, version=version_, cache=cache)
self.fetch_api = fetch_api
self.fetch_tries = fetch_tries
self.logger = logging.getLogger(self.__class__.__name__)

@classmethod
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
def from_lims(cls, manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2,
lims_credentials: Optional[DbCredentials] = None,
mtrain_credentials: Optional[DbCredentials] = None,
app_kwargs: Dict[str, Any] = None, **kwargs):
return cls(fetch_api=BehaviorProjectLimsApi.default(
host: Optional[str] = None,
scheme: Optional[str] = None,
asynchronous: Optional[str] = None) -> "BehaviorProjectCache":
"""
Construct a BehaviorProjectCache with a lims api. Use this method
to create a BehaviorProjectCache instance rather than calling
BehaviorProjectCache directly.
Parameters
==========
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache : bool
Whether to write to the cache
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
lims_credentials : DbCredentials
Optional credentials to access LIMS database.
If not set, will look for credentials in environment variables.
mtrain_credentials: DbCredentials
Optional credentials to access mtrain database.
If not set, will look for credentials in environment variables.
host : str
Web host for the app_engine. Currently unused. This argument is
included for consistency with EcephysProjectCache.from_lims.
scheme : str
URI scheme, such as "http". Currently unused. This argument is
included for consistency with EcephysProjectCache.from_lims.
asynchronous : bool
Whether to fetch from web asynchronously. Currently unused.
Returns
=======
BehaviorProjectCache
BehaviorProjectCache instance with a LIMS fetch API
"""
if host and scheme:
app_kwargs = {"host": host, "scheme": scheme,
"asynchronous": asynchronous}
else:
app_kwargs = None
fetch_api = BehaviorProjectLimsApi.default(
lims_credentials=lims_credentials,
mtrain_credentials=mtrain_credentials,
app_kwargs=app_kwargs),
**kwargs)
app_kwargs=app_kwargs)
return cls(fetch_api=fetch_api, manifest=manifest, version=version,
cache=cache, fetch_tries=fetch_tries)

def get_session_table(
self,
Expand Down
183 changes: 144 additions & 39 deletions allensdk/brain_observatory/ecephys/ecephys_project_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import partial
from pathlib import Path
from typing import Any, List, Optional
from typing import Any, List, Optional, Union, Callable
import ast

import pandas as pd
Expand All @@ -9,13 +9,13 @@
import pynwb

from allensdk.api.cache import Cache

from allensdk.core.authentication import DbCredentials
from allensdk.brain_observatory.ecephys.ecephys_project_api import (
EcephysProjectApi, EcephysProjectLimsApi, EcephysProjectWarehouseApi,
EcephysProjectFixedApi
)
from allensdk.brain_observatory.ecephys.ecephys_project_api.rma_engine import (
AsyncRmaEngine,
AsyncRmaEngine, RmaEngine
)
from allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine import (
write_bytes_from_coroutine, write_from_stream
Expand All @@ -27,7 +27,6 @@
from allensdk.brain_observatory.ecephys import get_unit_filter_value
from allensdk.api.caching_utilities import one_file_call_caching


class EcephysProjectCache(Cache):

SESSIONS_KEY = 'sessions'
Expand Down Expand Up @@ -74,11 +73,13 @@ class EcephysProjectCache(Cache):
)

def __init__(
self,
fetch_api: EcephysProjectApi = EcephysProjectWarehouseApi.default(),
fetch_tries: int = 2,
stream_writer = write_from_stream,
**kwargs):
self,
fetch_api: EcephysProjectApi = EcephysProjectWarehouseApi.default(),
fetch_tries: int = 2,
stream_writer: Callable = write_from_stream,
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True):
""" Entrypoint for accessing ecephys (neuropixels) data. Supports
access to cross-session data (like stimulus templates) and high-level
summaries of sessionwise data and provides tools for downloading detailed
Expand All @@ -88,33 +89,34 @@ def __init__(
==========
fetch_api :
Used to pull data from remote sources, after which it is locally
cached. Any object exposing the EcephysProjectApi interface is
cached. Any object exposing the EcephysProjectApi interface is
suitable. Standard options are:
EcephysProjectWarehouseApi :: The default. Fetches publically
EcephysProjectWarehouseApi :: The default. Fetches publically
available Allen Institute data
EcephysProjectFixedApi :: Refuses to fetch any data - only the
existing local cache is accessible. Useful if you want to
settle on a fixed dataset for analysis.
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute's internal database. Only works if you are
EcephysProjectFixedApi :: Refuses to fetch any data - only the
existing local cache is accessible. Useful if you want to
settle on a fixed dataset for analysis
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute's internal database. Only works if you are
on our internal network.
fetch_tries :
Maximum number of times to attempt a download before giving up and
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
**kwargs :
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
other kwargs are passed to allensdk.api.cache.Cache
manifest : str or Path
full path at which manifest json will be stored (default =
"ecephys_project_manifest.json" in the local directory.)
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
"""
manifest_ = manifest or "ecephys_project_manifest.json"
version_ = version or self.MANIFEST_VERSION

kwargs['manifest'] = kwargs.get('manifest', 'ecephys_project_manifest.json')
kwargs['version'] = kwargs.get('version', self.MANIFEST_VERSION)

super(EcephysProjectCache, self).__init__(**kwargs)
super(EcephysProjectCache, self).__init__(manifest=manifest_,
version=version_,
cache=cache)
self.fetch_api = fetch_api
self.fetch_tries = fetch_tries
self.stream_writer = stream_writer
Expand Down Expand Up @@ -516,7 +518,7 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
"asynchronous": True
} if fetch_api_kwargs is None else fetch_api_kwargs

if "stream_writer" not in kwargs:
if kwargs.get("stream_writer") is None:
if fetch_api_kwargs.get("asynchronous", True):
kwargs["stream_writer"] = write_bytes_from_coroutine
else:
Expand All @@ -528,21 +530,124 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
)

@classmethod
def from_lims(cls, lims_kwargs=None, **kwargs):
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
scheme: Optional[str] = None,
host: Optional[str] = None,
asynchronous: bool = True,
manifest: Optional[str] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2):
"""
Create an instance of EcephysProjectCache with an
EcephysProjectLimsApi. Retrieves bleeding-edge data stored
locally on Allen Institute servers. Only available for use
on-site at the Allen Institute or through a vpn. Requires Allen
Institute database credentials.
Parameters
==========
lims_credentials : DbCredentials
Credentials to access LIMS database. If not provided will
attempt to find credentials in environment variables.
scheme : str
URI scheme, such as "http". Defaults to
EcephysProjectLimsApi.default value if unspecified.
Will not be used unless `host` is also specified.
host : str
Web host. Defaults to EcephysProjectLimsApi.default
value if unspecified. Will not be used unless `scheme` is
also specified.
asynchronous : bool
Whether to fetch file asynchronously. Defaults to True.
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
"""
if scheme and host:
app_kwargs = {"scheme": scheme, "host": host}
else:
app_kwargs = None
return cls._from_http_source_default(
EcephysProjectLimsApi, lims_kwargs, **kwargs
)
EcephysProjectLimsApi,
{"lims_credentials": lims_credentials,
"app_kwargs": app_kwargs,
"asynchronous": asynchronous,
}, # expects dictionary of kwargs
manifest=manifest, version=version, cache=cache,
fetch_tries=fetch_tries)

@classmethod
def from_warehouse(cls, warehouse_kwargs=None, **kwargs):
def from_warehouse(cls,
scheme: Optional[str] = None,
host: Optional[str] = None,
asynchronous: bool = True,
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2):
"""
Create an instance of EcephysProjectCache with an
EcephysProjectWarehouseApi. Retrieves released data stored in
the warehouse.
Parameters
==========
scheme : str
URI scheme, such as "http". Defaults to
EcephysProjectWarehouseAPI.default value if unspecified.
Will not be used unless `host` is also specified.
host : str
Web host. Defaults to EcephysProjectWarehouseApi.default
value if unspecified. Will not be used unless `scheme` is also
specified.
asynchronous : bool
Whether to fetch file asynchronously. Defaults to True.
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
"""
if scheme and host:
app_kwargs = {"scheme": scheme, "host": host,
"asynchronous": asynchronous}
else:
app_kwargs = None
return cls._from_http_source_default(
EcephysProjectWarehouseApi, warehouse_kwargs, **kwargs
EcephysProjectWarehouseApi, app_kwargs, manifest=manifest,
version=version, cache=cache, fetch_tries=fetch_tries
)


@classmethod
def fixed(cls, **kwargs):
return cls(fetch_api=EcephysProjectFixedApi(), **kwargs)
def fixed(cls, manifest=None, version=None):
"""
Creates a EcephysProjectCache that refuses to fetch any data
- only the existing local cache is accessible. Useful if you
want to settle on a fixed dataset for analysis.
Parameters
==========
manifest : str or Path
full path to existing manifest json
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
"""
return cls(fetch_api=EcephysProjectFixedApi(), manifest=manifest,
version=version)


def count_owned(this, other, foreign_key, count_key, inplace=False):
Expand Down
Loading

0 comments on commit f50680d

Please sign in to comment.