diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index c19ca01f6..29a61600a 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -42,6 +42,7 @@ class Snapshot(ABC): """ Interface for creating and managing snapshots. """ + def __init__(self, config: Dict, source_cluster: Cluster) -> None: self.config = config self.source_cluster = source_cluster @@ -52,17 +53,14 @@ def __init__(self, config: Dict, source_cluster: Cluster) -> None: @abstractmethod def create(self, *args, **kwargs) -> CommandResult: """Create a snapshot.""" - pass @abstractmethod def status(self, *args, **kwargs) -> CommandResult: """Get the status of the snapshot.""" - pass @abstractmethod def delete(self, *args, **kwargs) -> CommandResult: """Delete a snapshot.""" - pass def _collect_universal_command_args(self) -> Dict: command_args = { @@ -249,9 +247,8 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str: total_size_bytes = stats.get("total", {}).get("size_in_bytes", 0) processed_size_bytes = stats.get("processed", {}).get("size_in_bytes", - # Processed size may not be returned for SUCCESS snapshots, in which case we set it to the full size - total_size_bytes if snapshot_state == "SUCCESS" else 0 - ) + total_size_bytes if snapshot_state == "SUCCESS" else 0 + ) total_shards = snapshot_info.get('shards_stats', {}).get('total', 0) successful_shards = snapshot_info.get('shards_stats', {}).get('done', 0) @@ -271,14 +268,14 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str: duration_formatted = format_duration(duration_millis) # Safely calculate remaining duration to avoid division by zero - remaining_duration = ((duration_millis / percent_completed) * (100 - percent_completed)) if percent_completed > 0 else None + remaining_duration = ((duration_millis / percent_completed) * + (100 - percent_completed)) if percent_completed > 0 else None # Calculate throughput - throughput_mib_per_sec = ( - (processed_size_bytes / (1024 ** 2)) / (duration_millis / 1000) - if duration_millis > 0 - else None - ) + if duration_millis > 0: + throughput_mib_per_sec = (processed_size_bytes / (1024 * 1024)) / (duration_millis / 1000) + else: + throughput_mib_per_sec = 0 # Determine anticipated duration remaining and throughput format if duration_millis > 1000 and percent_completed > 0 and remaining_duration is not None: @@ -289,16 +286,18 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str: throughput_mib_per_sec = "N/A (not enough data to compute)" # Compile status message - return (f"Snapshot is {snapshot_state}.\n" - f"Percent completed: {percent_completed:.2f}%\n" - f"Data GiB done: {processed_size_gib:.3f}/{total_size_gib:.3f}\n" - f"Total shards: {total_shards}\n" - f"Successful shards: {successful_shards}\n" - f"Failed shards: {failed_shards}\n" - f"Start time: {start_time_formatted}\n" - f"Duration: {duration_formatted}\n" - f"Anticipated duration remaining: {anticipated_remaining_formatted}\n" - f"Throughput: {throughput_mib_per_sec}") + return ( + f"Snapshot is {snapshot_state}.\n" + f"Percent completed: {percent_completed:.2f}%\n" + f"Data GiB done: {processed_size_gib:.3f}/{total_size_gib:.3f}\n" + f"Total shards: {total_shards}\n" + f"Successful shards: {successful_shards}\n" + f"Failed shards: {failed_shards}\n" + f"Start time: {start_time_formatted}\n" + f"Duration: {duration_formatted}\n" + f"Anticipated duration remaining: {anticipated_remaining_formatted}\n" + f"Throughput: {throughput_mib_per_sec}" + ) def get_snapshot_status_full(cluster: Cluster, snapshot: str, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py index dd681da4f..a164b70d2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -9,7 +9,8 @@ from console_link.models.factories import (UnsupportedSnapshotError, get_snapshot) from console_link.models.snapshot import (FileSystemSnapshot, S3Snapshot, - Snapshot, delete_snapshot) + Snapshot, delete_snapshot, + get_snapshot_status_message) from tests.utils import create_valid_cluster @@ -409,7 +410,7 @@ def test_handling_extra_args(mocker, request, snapshot_fixture): snapshot = request.getfixturevalue(snapshot_fixture) mock = mocker.patch('subprocess.run', autospec=True) extra_args = ['--extra-flag', '--extra-arg', 'extra-arg-value', 'this-is-an-option'] - + result = snapshot.create(extra_args=extra_args) assert result.success @@ -419,7 +420,7 @@ def test_handling_extra_args(mocker, request, snapshot_fixture): import unittest from unittest.mock import MagicMock -from console_link.models.snapshot import get_snapshot_status_full, get_snapshot_status_message + class TestSnapshot(unittest.TestCase): def setUp(self): @@ -443,7 +444,8 @@ def test_throughput_calculation(self): } # Test with normal duration - snapshot_info["stats"]["processed"] = {"size_in_bytes": 1 * 1024 * 1024 * 1024} # Ensure processed size is set correctly + snapshot_info["stats"]["processed"] = {"size_in_bytes": 1 * + 1024 * 1024 * 1024} # Ensure processed size is set correctly message = get_snapshot_status_message(snapshot_info) self.assertIn("Throughput: 512.00 MiB/s", message)