Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix snapshot status based on demo #1200

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Snapshot(ABC):
"""
Interface for creating and managing snapshots.
"""

def __init__(self, config: Dict, source_cluster: Cluster) -> None:
self.config = config
self.source_cluster = source_cluster
Expand All @@ -52,17 +53,14 @@ def __init__(self, config: Dict, source_cluster: Cluster) -> None:
@abstractmethod
def create(self, *args, **kwargs) -> CommandResult:
"""Create a snapshot."""
pass

@abstractmethod
def status(self, *args, **kwargs) -> CommandResult:
"""Get the status of the snapshot."""
pass

@abstractmethod
def delete(self, *args, **kwargs) -> CommandResult:
"""Delete a snapshot."""
pass

def _collect_universal_command_args(self) -> Dict:
command_args = {
Expand Down Expand Up @@ -232,9 +230,7 @@ def get_repository_for_snapshot(cluster: Cluster, snapshot: str) -> Optional[str


def format_date(millis: int) -> str:
if millis == 0:
return "N/A"
return datetime.datetime.fromtimestamp(millis / 1000).strftime('%Y-%m-%d %H:%M:%S')
return datetime.datetime.fromtimestamp(millis / 1000, datetime.timezone.utc).isoformat()


def format_duration(millis: int) -> str:
Expand All @@ -245,45 +241,62 @@ def format_duration(millis: int) -> str:


def get_snapshot_status_message(snapshot_info: Dict) -> str:
snapshot_state = snapshot_info.get("state")
# Extract basic snapshot information
snapshot_state = snapshot_info.get("state", "UNKNOWN")
stats = snapshot_info.get("stats", {})
total_size_in_bytes = stats.get("total", {}).get("size_in_bytes", 0)
processed_size_in_bytes = stats.get("processed", stats.get("incremental", {})).get("size_in_bytes", 0)
percent_completed = (processed_size_in_bytes / total_size_in_bytes) * 100 if total_size_in_bytes > 0 else 0
total_size_gibibytes = total_size_in_bytes / (1024 ** 3)
processed_size_gibibytes = processed_size_in_bytes / (1024 ** 3)

total_size_bytes = stats.get("total", {}).get("size_in_bytes", 0)
processed_size_bytes = stats.get("processed", {}).get("size_in_bytes",
total_size_bytes if snapshot_state == "SUCCESS" else 0
)

total_shards = snapshot_info.get('shards_stats', {}).get('total', 0)
successful_shards = snapshot_info.get('shards_stats', {}).get('done', 0)
failed_shards = snapshot_info.get('shards_stats', {}).get('failed', 0)
start_time = stats.get('start_time_in_millis', 0)
duration_millis = stats.get('time_in_millis', 0)

start_time = snapshot_info.get('stats', {}).get('start_time_in_millis', 0)
duration_in_millis = snapshot_info.get('stats', {}).get('time_in_millis', 0)

start_time_formatted = format_date(start_time)
duration_formatted = format_duration(duration_in_millis)

anticipated_duration_remaining_formatted = (
format_duration((duration_in_millis / percent_completed) * (100 - percent_completed))
if percent_completed > 0 else "N/A (not enough data to compute)"
)
# Calculate percentage completed
percent_completed = (processed_size_bytes / total_size_bytes * 100) if total_size_bytes > 0 else 0

throughput_mib_per_sec = (
(processed_size_in_bytes / (1024 ** 2)) / (duration_in_millis / 1000)
if duration_in_millis > 0 else 0
)
# Convert sizes to GiB
total_size_gib = total_size_bytes / (1024 ** 3)
processed_size_gib = processed_size_bytes / (1024 ** 3)

# Format time and duration
start_time_formatted = format_date(start_time)
duration_formatted = format_duration(duration_millis)

# Safely calculate remaining duration to avoid division by zero
remaining_duration = ((duration_millis / percent_completed) *
(100 - percent_completed)) if percent_completed > 0 else None

# Calculate throughput
if duration_millis > 0:
throughput_mib_per_sec = (processed_size_bytes / (1024 * 1024)) / (duration_millis / 1000)
else:
throughput_mib_per_sec = 0

# Determine anticipated duration remaining and throughput format
if duration_millis > 1000 and percent_completed > 0 and remaining_duration is not None:
anticipated_remaining_formatted = format_duration(remaining_duration)
throughput_mib_per_sec = f"{throughput_mib_per_sec:.2f} MiB/s"
else:
anticipated_remaining_formatted = "N/A (not enough data to compute)"
throughput_mib_per_sec = "N/A (not enough data to compute)"

# Compile status message
return (
f"Snapshot is {snapshot_state}.\n"
f"Percent completed: {percent_completed:.2f}%\n"
f"Data GiB done: {processed_size_gibibytes:.3f}/{total_size_gibibytes:.3f}\n"
f"Data GiB done: {processed_size_gib:.3f}/{total_size_gib:.3f}\n"
f"Total shards: {total_shards}\n"
f"Successful shards: {successful_shards}\n"
f"Failed shards: {failed_shards}\n"
f"Start time: {start_time_formatted}\n"
f"Duration: {duration_formatted}\n"
f"Anticipated duration remaining: {anticipated_duration_remaining_formatted}\n"
f"Throughput: {throughput_mib_per_sec:.2f} MiB/sec"
f"Anticipated duration remaining: {anticipated_remaining_formatted}\n"
f"Throughput: {throughput_mib_per_sec}"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from console_link.models.factories import (UnsupportedSnapshotError,
get_snapshot)
from console_link.models.snapshot import (FileSystemSnapshot, S3Snapshot,
Snapshot, delete_snapshot)
Snapshot, delete_snapshot,
get_snapshot_status_message)
from tests.utils import create_valid_cluster


Expand Down Expand Up @@ -409,9 +410,69 @@ def test_handling_extra_args(mocker, request, snapshot_fixture):
snapshot = request.getfixturevalue(snapshot_fixture)
mock = mocker.patch('subprocess.run', autospec=True)
extra_args = ['--extra-flag', '--extra-arg', 'extra-arg-value', 'this-is-an-option']

result = snapshot.create(extra_args=extra_args)

assert result.success
mock.assert_called_once()
assert all([arg in mock.call_args.args[0] for arg in extra_args])


import unittest
from unittest.mock import MagicMock


class TestSnapshot(unittest.TestCase):
def setUp(self):
self.cluster = MagicMock()

def test_throughput_calculation(self):
# Mock response for a fully completed snapshot
snapshot_info = {
"state": "IN_PROGRESS",
"stats": {
"total": {"size_in_bytes": 2 * 1024 * 1024 * 1024}, # 2 GiB
"processed": {"size_in_bytes": 1 * 1024 * 1024 * 1024}, # 1 GiB
"start_time_in_millis": 0,
"time_in_millis": 2000, # 2 seconds
},
"shards_stats": {
"total": 5,
"done": 2,
"failed": 0
}
}

# Test with normal duration
snapshot_info["stats"]["processed"] = {"size_in_bytes": 1 *
1024 * 1024 * 1024} # Ensure processed size is set correctly
message = get_snapshot_status_message(snapshot_info)
self.assertIn("Throughput: 512.00 MiB/s", message)

# Expected output assertions
self.assertIn("Snapshot is IN_PROGRESS.", message)
self.assertIn("Percent completed: 50.00%", message)
self.assertIn("Data GiB done: 1.000/2.000", message)
self.assertIn("Total shards: 5", message)
self.assertIn("Successful shards: 2", message)
self.assertIn("Failed shards: 0", message)
self.assertIn("Start time: 1970-01-01T00:00:00", message) # Assuming epoch start time
self.assertIn("Duration: 0h 0m 2s", message)
self.assertIn("Anticipated duration remaining: 0h 0m 2s", message)
self.assertIn("Throughput: 512.00 MiB/s", message)

# Test with very short duration
snapshot_info["stats"]["time_in_millis"] = 500 # 0.5 seconds
message = get_snapshot_status_message(snapshot_info)

# Assertions for short duration
self.assertIn("Snapshot is IN_PROGRESS.", message)
self.assertIn("Percent completed: 50.00%", message)
self.assertIn("Data GiB done: 1.000/2.000", message)
self.assertIn("Total shards: 5", message)
self.assertIn("Successful shards: 2", message)
self.assertIn("Failed shards: 0", message)
self.assertIn("Start time: 1970-01-01T00:00:00", message) # Assuming epoch start time
self.assertIn("Duration: 0h 0m 0s", message)
self.assertIn("Anticipated duration remaining: N/A (not enough data to compute)", message)
self.assertIn("Throughput: N/A (not enough data to compute)", message)
Loading