Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow users to pass neptune data directory as env var #1409

Merged
merged 12 commits into from
Aug 8, 2023
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
## [UNRELEASED] neptune 1.5.0
## [UNRELEASED] neptune 1.5.0

### Features
- Users can pass neptune data directory path by env variable ([#1409](https://github.com/neptune-ai/neptune-client/pull/1409))

### Fixes
- Load CLI plug-ins in try..except block to avoid a failure in loading a plug-in to crash entire CLI ([#1392](https://github.com/neptune-ai/neptune-client/pull/1392))
- Fixed cleaning operation storage when using sync mode and forking ([#1413](https://github.com/neptune-ai/neptune-client/pull/1413))



## neptune 1.4.1

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
Optional,
)

from neptune.constants import (
ASYNC_DIRECTORY,
NEPTUNE_DATA_DIRECTORY,
)
from neptune.constants import ASYNC_DIRECTORY
from neptune.envs import NEPTUNE_SYNC_AFTER_STOP_TIMEOUT
from neptune.exceptions import NeptuneSynchronizationAlreadyStoppedException
from neptune.internal.backends.neptune_backend import NeptuneBackend
Expand All @@ -41,7 +38,10 @@
from neptune.internal.id_formats import UniqueId
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import OperationStorage
from neptune.internal.operation_processors.operation_storage import (
OperationStorage,
get_container_dir,
)
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.logger import logger

Expand Down Expand Up @@ -84,10 +84,8 @@ def __init__(
@staticmethod
def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Path:
now = datetime.now()
container_dir = f"{NEPTUNE_DATA_DIRECTORY}/{ASYNC_DIRECTORY}/{container_type.create_dir_name(container_id)}"
data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
data_path = data_path.replace(" ", "_").replace(":", ".")
return Path(data_path)
process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path)

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
from pathlib import Path
from typing import Optional

from neptune.constants import (
NEPTUNE_DATA_DIRECTORY,
OFFLINE_DIRECTORY,
)
from neptune.constants import OFFLINE_DIRECTORY
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import UniqueId
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import OperationStorage
from neptune.internal.operation_processors.operation_storage import (
OperationStorage,
get_container_dir,
)


class OfflineOperationProcessor(OperationProcessor):
Expand All @@ -44,7 +44,7 @@ def __init__(self, container_id: UniqueId, container_type: ContainerType, lock:

@staticmethod
def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Path:
return Path(f"{NEPTUNE_DATA_DIRECTORY}/{OFFLINE_DIRECTORY}/{container_type.create_dir_name(container_id)}")
return get_container_dir(OFFLINE_DIRECTORY, container_id, container_type)

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._queue.put(op)
Expand Down
20 changes: 13 additions & 7 deletions src/neptune/internal/operation_processors/operation_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = [
"OperationStorage",
]
__all__ = ["OperationStorage", "get_container_dir"]

import os
import shutil
from pathlib import Path
from typing import Optional

from neptune.constants import NEPTUNE_DATA_DIRECTORY
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.utils.logger import logger


def get_container_dir(
type_dir: str, container_id: UniqueId, container_type: ContainerType, process_path: Optional[str] = None
) -> Path:
neptune_data_dir = os.getenv("NEPTUNE_DATA_DIRECTORY", NEPTUNE_DATA_DIRECTORY)
container_dir = Path(f"{neptune_data_dir}/{type_dir}/{container_type.create_dir_name(container_id)}")
if process_path:
container_dir /= Path(process_path)

return container_dir


class OperationStorage:
UPLOAD_PATH: str = "upload_path"

Expand All @@ -44,10 +54,6 @@ def data_path(self) -> Path:
def upload_path(self) -> Path:
return self.data_path / "upload_path"

@staticmethod
def _get_container_dir(type_dir: str, container_id: UniqueId, container_type: ContainerType):
return f"{NEPTUNE_DATA_DIRECTORY}/{type_dir}/{container_type.create_dir_name(container_id)}"

def close(self):
shutil.rmtree(self.data_path, ignore_errors=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
from pathlib import Path
from typing import Optional

from neptune.constants import (
NEPTUNE_DATA_DIRECTORY,
SYNC_DIRECTORY,
)
from neptune.constants import SYNC_DIRECTORY
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import OperationStorage
from neptune.internal.operation_processors.operation_storage import (
OperationStorage,
get_container_dir,
)


class SyncOperationProcessor(OperationProcessor):
Expand All @@ -42,9 +42,8 @@ def __init__(self, container_id: UniqueId, container_type: ContainerType, backen
@staticmethod
def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Path:
now = datetime.now()
container_dir = f"{NEPTUNE_DATA_DIRECTORY}/{SYNC_DIRECTORY}/{container_type.create_dir_name(container_id)}"
data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return Path(data_path)
process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(SYNC_DIRECTORY, container_id, container_type, process_path)

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
_, errors = self._backend.execute_operations(
Expand Down