Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait committed Dec 13, 2024
1 parent 33e8aff commit cdf6303
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Snapshot(ABC):
"""
Interface for creating and managing snapshots.
"""

def __init__(self, config: Dict, source_cluster: Cluster) -> None:
self.config = config
self.source_cluster = source_cluster
Expand All @@ -52,17 +53,14 @@ def __init__(self, config: Dict, source_cluster: Cluster) -> None:
@abstractmethod
def create(self, *args, **kwargs) -> CommandResult:
"""Create a snapshot."""
pass

@abstractmethod
def status(self, *args, **kwargs) -> CommandResult:
"""Get the status of the snapshot."""
pass

@abstractmethod
def delete(self, *args, **kwargs) -> CommandResult:
"""Delete a snapshot."""
pass

def _collect_universal_command_args(self) -> Dict:
command_args = {
Expand Down Expand Up @@ -249,9 +247,8 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str:

total_size_bytes = stats.get("total", {}).get("size_in_bytes", 0)
processed_size_bytes = stats.get("processed", {}).get("size_in_bytes",
# Processed size may not be returned for SUCCESS snapshots, in which case we set it to the full size
total_size_bytes if snapshot_state == "SUCCESS" else 0
)
total_size_bytes if snapshot_state == "SUCCESS" else 0
)

total_shards = snapshot_info.get('shards_stats', {}).get('total', 0)
successful_shards = snapshot_info.get('shards_stats', {}).get('done', 0)
Expand All @@ -271,14 +268,14 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str:
duration_formatted = format_duration(duration_millis)

# Safely calculate remaining duration to avoid division by zero
remaining_duration = ((duration_millis / percent_completed) * (100 - percent_completed)) if percent_completed > 0 else None
remaining_duration = ((duration_millis / percent_completed) *
(100 - percent_completed)) if percent_completed > 0 else None

# Calculate throughput
throughput_mib_per_sec = (
(processed_size_bytes / (1024 ** 2)) / (duration_millis / 1000)
if duration_millis > 0
else None
)
if duration_millis > 0:
throughput_mib_per_sec = (processed_size_bytes / (1024 * 1024)) / (duration_millis / 1000)
else:
throughput_mib_per_sec = 0

# Determine anticipated duration remaining and throughput format
if duration_millis > 1000 and percent_completed > 0 and remaining_duration is not None:
Expand All @@ -289,16 +286,18 @@ def get_snapshot_status_message(snapshot_info: Dict) -> str:
throughput_mib_per_sec = "N/A (not enough data to compute)"

# Compile status message
return (f"Snapshot is {snapshot_state}.\n"
f"Percent completed: {percent_completed:.2f}%\n"
f"Data GiB done: {processed_size_gib:.3f}/{total_size_gib:.3f}\n"
f"Total shards: {total_shards}\n"
f"Successful shards: {successful_shards}\n"
f"Failed shards: {failed_shards}\n"
f"Start time: {start_time_formatted}\n"
f"Duration: {duration_formatted}\n"
f"Anticipated duration remaining: {anticipated_remaining_formatted}\n"
f"Throughput: {throughput_mib_per_sec}")
return (
f"Snapshot is {snapshot_state}.\n"
f"Percent completed: {percent_completed:.2f}%\n"
f"Data GiB done: {processed_size_gib:.3f}/{total_size_gib:.3f}\n"
f"Total shards: {total_shards}\n"
f"Successful shards: {successful_shards}\n"
f"Failed shards: {failed_shards}\n"
f"Start time: {start_time_formatted}\n"
f"Duration: {duration_formatted}\n"
f"Anticipated duration remaining: {anticipated_remaining_formatted}\n"
f"Throughput: {throughput_mib_per_sec}"
)


def get_snapshot_status_full(cluster: Cluster, snapshot: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from console_link.models.factories import (UnsupportedSnapshotError,
get_snapshot)
from console_link.models.snapshot import (FileSystemSnapshot, S3Snapshot,
Snapshot, delete_snapshot)
Snapshot, delete_snapshot,
get_snapshot_status_message)
from tests.utils import create_valid_cluster


Expand Down Expand Up @@ -409,7 +410,7 @@ def test_handling_extra_args(mocker, request, snapshot_fixture):
snapshot = request.getfixturevalue(snapshot_fixture)
mock = mocker.patch('subprocess.run', autospec=True)
extra_args = ['--extra-flag', '--extra-arg', 'extra-arg-value', 'this-is-an-option']

result = snapshot.create(extra_args=extra_args)

assert result.success
Expand All @@ -419,7 +420,7 @@ def test_handling_extra_args(mocker, request, snapshot_fixture):

import unittest
from unittest.mock import MagicMock
from console_link.models.snapshot import get_snapshot_status_full, get_snapshot_status_message


class TestSnapshot(unittest.TestCase):
def setUp(self):
Expand All @@ -443,7 +444,8 @@ def test_throughput_calculation(self):
}

# Test with normal duration
snapshot_info["stats"]["processed"] = {"size_in_bytes": 1 * 1024 * 1024 * 1024} # Ensure processed size is set correctly
snapshot_info["stats"]["processed"] = {"size_in_bytes": 1 *
1024 * 1024 * 1024} # Ensure processed size is set correctly
message = get_snapshot_status_message(snapshot_info)
self.assertIn("Throughput: 512.00 MiB/s", message)

Expand Down

0 comments on commit cdf6303

Please sign in to comment.