Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH 1357: Project caches should not accept arbitrary kwargs #1373

Merged
merged 2 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions allensdk/brain_observatory/behavior/behavior_project_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os.path
import csv
from functools import partial
from typing import Type, Callable, Optional, List, Any, Dict
from typing import Type, Optional, List, Any, Dict, Union
from pathlib import Path
import pandas as pd
import time
import logging
Expand All @@ -15,8 +16,7 @@
import BehaviorProjectBase
from allensdk.api.caching_utilities import one_file_call_caching, call_caching
from allensdk.core.exceptions import MissingDataError
from allensdk.core.auth_config import LIMS_DB_CREDENTIAL_MAP
from allensdk.core.authentication import credential_injector, DbCredentials
from allensdk.core.authentication import DbCredentials

BehaviorProjectApi = Type[BehaviorProjectBase]

Expand Down Expand Up @@ -64,11 +64,17 @@ def __init__(
self,
fetch_api: Optional[BehaviorProjectApi] = None,
fetch_tries: int = 2,
**kwargs):
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True):
""" Entrypoint for accessing visual behavior data. Supports
access to summaries of session data and provides tools for
downloading detailed session data (such as dff traces).

Likely you will want to use a class constructor, such as `from_lims`,
to initialize a BehaviorProjectCache, rather than calling this
directly.

--- NOTE ---
Because NWB files are not currently supported for this project (as of
11/2019), this cache will not actually save any files of session data
Expand All @@ -87,38 +93,88 @@ def __init__(
Used to pull data from remote sources, after which it is locally
cached. Any object inheriting from BehaviorProjectBase is
suitable. Current options are:
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
BehaviorProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute"s internal database. Only works if you are
on our internal network.
fetch_tries :
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
**kwargs :
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
other kwargs are passed to allensdk.api.cache.Cache
raising an exception. Note that this is total tries, not retries.
Default=2.
manifest : str or Path
full path at which manifest json will be stored. Defaults
to "behavior_project_manifest.json" in the local directory.
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
Defaults to the manifest version in the class.
cache : bool
Whether to write to the cache. Default=True.
"""
kwargs["manifest"] = kwargs.get("manifest",
"behavior_project_manifest.json")
kwargs["version"] = kwargs.get("version", self.MANIFEST_VERSION)
manifest_ = manifest or "behavior_project_manifest.json"
version_ = version or self.MANIFEST_VERSION

super().__init__(**kwargs)
self.fetch_api = fetch_api or BehaviorProjectLimsApi.default()
super().__init__(manifest=manifest_, version=version_, cache=cache)
self.fetch_api = fetch_api
self.fetch_tries = fetch_tries
self.logger = logging.getLogger(self.__class__.__name__)

@classmethod
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
def from_lims(cls, manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2,
lims_credentials: Optional[DbCredentials] = None,
mtrain_credentials: Optional[DbCredentials] = None,
app_kwargs: Dict[str, Any] = None, **kwargs):
return cls(fetch_api=BehaviorProjectLimsApi.default(
host: Optional[str] = None,
scheme: Optional[str] = None,
asynchronous: bool = True) -> "BehaviorProjectCache":
"""
Construct a BehaviorProjectCache with a lims api. Use this method
to create a BehaviorProjectCache instance rather than calling
BehaviorProjectCache directly.

Parameters
==========
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache : bool
Whether to write to the cache
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
lims_credentials : DbCredentials
Optional credentials to access LIMS database.
If not set, will look for credentials in environment variables.
mtrain_credentials: DbCredentials
Optional credentials to access mtrain database.
If not set, will look for credentials in environment variables.
host : str
Web host for the app_engine. Currently unused. This argument is
included for consistency with EcephysProjectCache.from_lims.
scheme : str
URI scheme, such as "http". Currently unused. This argument is
included for consistency with EcephysProjectCache.from_lims.
asynchronous : bool
Whether to fetch from web asynchronously. Currently unused.
Returns
=======
BehaviorProjectCache
BehaviorProjectCache instance with a LIMS fetch API
"""
if host and scheme:
kschelonka marked this conversation as resolved.
Show resolved Hide resolved
app_kwargs = {"host": host, "scheme": scheme,
"asynchronous": asynchronous}
else:
app_kwargs = None
fetch_api = BehaviorProjectLimsApi.default(
lims_credentials=lims_credentials,
mtrain_credentials=mtrain_credentials,
app_kwargs=app_kwargs),
**kwargs)
app_kwargs=app_kwargs)
return cls(fetch_api=fetch_api, manifest=manifest, version=version,
cache=cache, fetch_tries=fetch_tries)

def get_session_table(
self,
Expand Down
185 changes: 144 additions & 41 deletions allensdk/brain_observatory/ecephys/ecephys_project_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import partial
from pathlib import Path
from typing import Any, List, Optional
from typing import Any, List, Optional, Union, Callable
import ast

import pandas as pd
Expand All @@ -9,14 +9,11 @@
import pynwb

from allensdk.api.cache import Cache

from allensdk.core.authentication import DbCredentials
from allensdk.brain_observatory.ecephys.ecephys_project_api import (
EcephysProjectApi, EcephysProjectLimsApi, EcephysProjectWarehouseApi,
EcephysProjectApi, EcephysProjectLimsApi, EcephysProjectWarehouseApi,
EcephysProjectFixedApi
)
from allensdk.brain_observatory.ecephys.ecephys_project_api.rma_engine import (
AsyncRmaEngine,
)
from allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine import (
write_bytes_from_coroutine, write_from_stream
)
Expand Down Expand Up @@ -74,11 +71,13 @@ class EcephysProjectCache(Cache):
)

def __init__(
self,
fetch_api: EcephysProjectApi = EcephysProjectWarehouseApi.default(),
fetch_tries: int = 2,
stream_writer = write_from_stream,
**kwargs):
self,
fetch_api: EcephysProjectApi = EcephysProjectWarehouseApi.default(),
fetch_tries: int = 2,
stream_writer: Callable = write_from_stream,
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True):
""" Entrypoint for accessing ecephys (neuropixels) data. Supports
access to cross-session data (like stimulus templates) and high-level
summaries of sessionwise data and provides tools for downloading detailed
Expand All @@ -88,33 +87,34 @@ def __init__(
==========
fetch_api :
Used to pull data from remote sources, after which it is locally
cached. Any object exposing the EcephysProjectApi interface is
cached. Any object exposing the EcephysProjectApi interface is
suitable. Standard options are:
EcephysProjectWarehouseApi :: The default. Fetches publically
EcephysProjectWarehouseApi :: The default. Fetches publically
available Allen Institute data
EcephysProjectFixedApi :: Refuses to fetch any data - only the
existing local cache is accessible. Useful if you want to
settle on a fixed dataset for analysis.
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute's internal database. Only works if you are
EcephysProjectFixedApi :: Refuses to fetch any data - only the
existing local cache is accessible. Useful if you want to
settle on a fixed dataset for analysis
EcephysProjectLimsApi :: Fetches bleeding-edge data from the
Allen Institute's internal database. Only works if you are
on our internal network.
fetch_tries :
Maximum number of times to attempt a download before giving up and
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
**kwargs :
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
other kwargs are passed to allensdk.api.cache.Cache

manifest : str or Path
full path at which manifest json will be stored (default =
"ecephys_project_manifest.json" in the local directory.)
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
"""
manifest_ = manifest or "ecephys_project_manifest.json"
version_ = version or self.MANIFEST_VERSION

kwargs['manifest'] = kwargs.get('manifest', 'ecephys_project_manifest.json')
kwargs['version'] = kwargs.get('version', self.MANIFEST_VERSION)

super(EcephysProjectCache, self).__init__(**kwargs)
super(EcephysProjectCache, self).__init__(manifest=manifest_,
version=version_,
cache=cache)
self.fetch_api = fetch_api
self.fetch_tries = fetch_tries
self.stream_writer = stream_writer
Expand Down Expand Up @@ -516,7 +516,7 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
"asynchronous": True
} if fetch_api_kwargs is None else fetch_api_kwargs

if "stream_writer" not in kwargs:
if kwargs.get("stream_writer") is None:
if fetch_api_kwargs.get("asynchronous", True):
kwargs["stream_writer"] = write_bytes_from_coroutine
else:
Expand All @@ -528,21 +528,124 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
)

@classmethod
def from_lims(cls, lims_kwargs=None, **kwargs):
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
scheme: Optional[str] = None,
host: Optional[str] = None,
asynchronous: bool = True,
manifest: Optional[str] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2):
"""
Create an instance of EcephysProjectCache with an
EcephysProjectLimsApi. Retrieves bleeding-edge data stored
locally on Allen Institute servers. Only available for use
on-site at the Allen Institute or through a vpn. Requires Allen
Institute database credentials.

Parameters
==========
lims_credentials : DbCredentials
Credentials to access LIMS database. If not provided will
attempt to find credentials in environment variables.
scheme : str
URI scheme, such as "http". Defaults to
EcephysProjectLimsApi.default value if unspecified.
Will not be used unless `host` is also specified.
host : str
Web host. Defaults to EcephysProjectLimsApi.default
value if unspecified. Will not be used unless `scheme` is
also specified.
asynchronous : bool
Whether to fetch file asynchronously. Defaults to True.
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
"""
if scheme and host:
app_kwargs = {"scheme": scheme, "host": host}
else:
app_kwargs = None
return cls._from_http_source_default(
EcephysProjectLimsApi, lims_kwargs, **kwargs
)
EcephysProjectLimsApi,
{"lims_credentials": lims_credentials,
"app_kwargs": app_kwargs,
"asynchronous": asynchronous,
}, # expects dictionary of kwargs
manifest=manifest, version=version, cache=cache,
fetch_tries=fetch_tries)

@classmethod
def from_warehouse(cls, warehouse_kwargs=None, **kwargs):
def from_warehouse(cls,
scheme: Optional[str] = None,
host: Optional[str] = None,
asynchronous: bool = True,
manifest: Optional[Union[str, Path]] = None,
version: Optional[str] = None,
cache: bool = True,
fetch_tries: int = 2):
"""
Create an instance of EcephysProjectCache with an
EcephysProjectWarehouseApi. Retrieves released data stored in
the warehouse.

Parameters
==========
scheme : str
URI scheme, such as "http". Defaults to
EcephysProjectWarehouseAPI.default value if unspecified.
Will not be used unless `host` is also specified.
host : str
Web host. Defaults to EcephysProjectWarehouseApi.default
value if unspecified. Will not be used unless `scheme` is also
specified.
asynchronous : bool
Whether to fetch file asynchronously. Defaults to True.
manifest : str or Path
full path at which manifest json will be stored
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
cache: bool
Whether to write to the cache (default=True)
fetch_tries : int
Maximum number of times to attempt a download before giving up and
raising an exception. Note that this is total tries, not retries
"""
if scheme and host:
app_kwargs = {"scheme": scheme, "host": host,
"asynchronous": asynchronous}
else:
app_kwargs = None
return cls._from_http_source_default(
EcephysProjectWarehouseApi, warehouse_kwargs, **kwargs
EcephysProjectWarehouseApi, app_kwargs, manifest=manifest,
version=version, cache=cache, fetch_tries=fetch_tries
)


@classmethod
def fixed(cls, **kwargs):
return cls(fetch_api=EcephysProjectFixedApi(), **kwargs)
def fixed(cls, manifest=None, version=None):
"""
Creates a EcephysProjectCache that refuses to fetch any data
- only the existing local cache is accessible. Useful if you
want to settle on a fixed dataset for analysis.

Parameters
==========
manifest : str or Path
full path to existing manifest json
version : str
version of manifest file. If this mismatches the version
recorded in the file at manifest, an error will be raised.
"""
return cls(fetch_api=EcephysProjectFixedApi(), manifest=manifest,
version=version)


def count_owned(this, other, foreign_key, count_key, inplace=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def test_from_lims_default(tmpdir_factory):
tmpdir = str(tmpdir_factory.mktemp("test_from_lims_default"))

cache = epc.EcephysProjectCache.from_lims(
manifest_path=os.path.join(tmpdir, "manifest.json")
manifest=os.path.join(tmpdir, "manifest.json")
)
assert isinstance(cache.fetch_api.app_engine, AsyncHttpEngine)
assert cache.stream_writer is epc.write_bytes_from_coroutine
Loading