Skip to content

Commit

Permalink
[Console] Update format of services.yaml (opensearch-project#718)
Browse files Browse the repository at this point in the history
* Update schema for clusters

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* Update README

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* Update metrics sources

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* more readme updates

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* add a test for calling the api

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

* add a metrics source test for coverage

Signed-off-by: Mikayla Thompson <thomika@amazon.com>

---------

Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson authored Jun 15, 2024
1 parent 5a5b7e6 commit 924de8e
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@ For example:

```yaml
source_cluster:
endpoint: "https://capture-proxy-es:9200"
allow_insecure: true
endpoint: "https://capture-proxy-es:9200"
allow_insecure: true
no_auth:
target_cluster:
endpoint: "https://opensearchtarget:9200"
allow_insecure: true
authorization:
type: "basic_auth"
details:
username: "admin"
password: "myStrongPassword123!"
endpoint: "https://opensearchtarget:9200"
allow_insecure: true
basic_auth:
username: "admin"
password: "myStrongPassword123!"
metrics_source:
type: "prometheus"
endpoint: "http://prometheus:9090"
prometheus:
endpoint: "http://prometheus:9090"
backfill:
opensearch_ingestion:
pipeline_role_arn: "arn:aws:iam::123456789012:role/OSMigrations-aws-integ-us--pipelineRole"
vpc_subnet_ids:
- "subnet-123456789"
security_group_ids:
- "sg-123456789"
aws_region: "us-west-2"
pipeline_name: "test-cli-pipeline"
index_regex_selection:
- "test-index*"
log_group_name: "/aws/vendedlogs/osi-aws-integ-default"
tags:
- "migration_deployment=1.0.6"
opensearch_ingestion:
pipeline_role_arn: "arn:aws:iam::123456789012:role/OSMigrations-aws-integ-us--pipelineRole"
vpc_subnet_ids:
- "subnet-123456789"
security_group_ids:
- "sg-123456789"
aws_region: "us-west-2"
pipeline_name: "test-cli-pipeline"
index_regex_selection:
- "test-index*"
log_group_name: "/aws/vendedlogs/osi-aws-integ-default"
tags:
- "migration_deployment=1.0.6"
```
### Services.yaml spec
Expand All @@ -52,19 +51,26 @@ backfill:
Source and target clusters have the following options:
- `endpoint`: required, the endpoint to reach the cluster.
- `authorization`: required, the auth method to use, if no auth the no_auth type must be specified.
- `type`: required, the only currently implemented options are "no_auth" and "basic_auth", but "sigv4" should be available soon
- `details`: for basic auth, the details should be a `username` and `password`

Exactly one of the following blocks must be present:
- `no_auth`: may be empty, no authorization to use.
- `basic_auth`:
- `username`
- `password`
- `sigv4`: may be empty, not yet implemented

Having a `source_cluster` and `target_cluster` is required.

#### Metrics Source

Currently, the two supported metrics source types are `prometheus` and `cloudwatch`.
Exactly one of the following blocks must be present:
- `prometheus`:
- `endpoint`: required

- `cloudwatch`: may be empty if region is not specified
- `aws_region`: optional. if not provided, the usual rules are followed for determining aws region. (`AWS_DEFAULT_REGION`, `~/.aws/config`, etc.)

- `type`: required, `prometheus` or `cloudwatch`
- `endpoint`: required for `prometheus` (ignored for `cloudwatch`)
- `aws_region`: optional for `cloudwatch` (ignored for `prometheus`). if not provided, the usual rules are followed for determining aws region (`AWS_DEFAULT_REGION`, `~/.aws/config`)

#### Backfill

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from requests.auth import HTTPBasicAuth
from cerberus import Validator
import logging
from console_link.models.schema_tools import contains_one_of

requests.packages.urllib3.disable_warnings() # ignore: type

Expand All @@ -12,38 +13,47 @@
AuthMethod = Enum("AuthMethod", ["NO_AUTH", "BASIC_AUTH", "SIGV4"])
HttpMethod = Enum("HttpMethod", ["GET", "POST", "PUT", "DELETE"])


NO_AUTH_SCHEMA = {
"nullable": True,
}

BASIC_AUTH_SCHEMA = {
"type": "dict",
"schema": {
"username": {
"type": "string",
"required": False,
},
"password": {
"type": "string",
"required": False,
"excludes": ["aws_secret_arn"]
},
"aws_secret_arn": {
"type": "string",
"required": False,
"excludes": ["password"]
}
},
}

SIGV4_SCHEMA = {
"nullable": True,
}

SCHEMA = {
"endpoint": {"type": "string", "required": True},
"allow_insecure": {"type": "boolean", "required": False},
"authorization": {
"cluster": {
"type": "dict",
"required": True,
"schema": {
"type": {
"type": "string",
"required": True,
"allowed": [e.name.lower() for e in AuthMethod]
},
"details": {
"type": "dict",
"required": False,
"schema": {
"username": {
"type": "string",
"required": False
},
"password": {
"type": "string",
"required": False
},
"aws_secret_arn": {
"type": "string",
"required": False
},
}
}
}
},
"endpoint": {"type": "string", "required": True},
"allow_insecure": {"type": "boolean", "required": False},
"no_auth": NO_AUTH_SCHEMA,
"basic_auth": BASIC_AUTH_SCHEMA,
"sigv4": SIGV4_SCHEMA
},
"check_with": contains_one_of({auth.name.lower() for auth in AuthMethod})
}
}


Expand All @@ -60,15 +70,19 @@ class Cluster:
def __init__(self, config: Dict) -> None:
logger.info(f"Initializing cluster with config: {config}")
v = Validator(SCHEMA)
if not v.validate(config):
if not v.validate({'cluster': config}):
raise ValueError("Invalid config file for cluster", v.errors)

self.endpoint = config["endpoint"]
if self.endpoint.startswith("https"):
self.allow_insecure = config.get("allow_insecure", False)
self.auth_type = AuthMethod[config["authorization"]["type"].upper()]
self.auth_details = config["authorization"].get("details", None)
self.aws_secret_arn = None if self.auth_details is None else self.auth_details.get("aws_secret_arn", None)
if 'no_auth' in config:
self.auth_type = AuthMethod.NO_AUTH
elif 'basic_auth' in config:
self.auth_type = AuthMethod.BASIC_AUTH
self.auth_details = config["basic_auth"]
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> requests.Response:
"""
Expand All @@ -80,7 +94,7 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> requests.Respon
self.auth_details.get("username", None),
self.auth_details.get("password", None)
)
elif self.auth_type is None:
elif self.auth_type is AuthMethod.NO_AUTH:
auth = None
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import requests
import logging

from console_link.models.schema_tools import contains_one_of

logger = logging.getLogger(__name__)


Expand All @@ -28,45 +30,55 @@ class Component(Enum):
REPLAYER = "replayer"


SCHEMA = {
"type": {
"type": "string",
"allowed": [m.name.lower() for m in MetricsSourceType],
"required": True,
},
"endpoint": {
"type": "string",
"required": False,
CLOUDWATCH_SCHEMA = {
"type": "dict",
"schema": {
"aws_region": {
"type": "string",
"required": False,
},
},
"aws_region": {
"type": "string",
"required": False
"nullable": True
}

PROMETHEUS_SCHEMA = {
"type": "dict",
"schema": {
"endpoint": {
"type": "string",
"required": True,
},
},
}

PROMETHEUS_SCHEMA = {k: v.copy() for k, v in SCHEMA.items()}
PROMETHEUS_SCHEMA["endpoint"]["required"] = True
SCHEMA = {
"metrics": {
"type": "dict",
"check_with": contains_one_of({ms.name.lower() for ms in MetricsSourceType}),
"schema": {
"prometheus": PROMETHEUS_SCHEMA,
"cloudwatch": CLOUDWATCH_SCHEMA
},
}
}


def get_metrics_source(config):
try:
metric_source_type = MetricsSourceType[config["type"].upper()]
except KeyError:
raise UnsupportedMetricsSourceError(config["type"])

if metric_source_type == MetricsSourceType.CLOUDWATCH:
return CloudwatchMetricsSource(config)
elif metric_source_type == MetricsSourceType.PROMETHEUS:
if 'prometheus' in config:
return PrometheusMetricsSource(config)
elif 'cloudwatch' in config:
return CloudwatchMetricsSource(config)
else:
logger.error(f"An unsupported metrics source type was provided: {config['type']}")
raise UnsupportedMetricsSourceError(config["type"])
logger.error(f"An unsupported metrics source type was provided: {config.keys()}")
if len(config.keys()) > 1:
raise UnsupportedMetricsSourceError(', '.join(config.keys()))
raise UnsupportedMetricsSourceError(next(iter(config.keys())))


class MetricsSource:
def __init__(self, config: Dict) -> None:
v = Validator(SCHEMA)
if not v.validate(config):
if not v.validate({"metrics": config}):
raise ValueError("Invalid config file for MetricsSource", v.errors)

def get_metrics(self) -> Dict[str, List[str]]:
Expand Down Expand Up @@ -118,8 +130,8 @@ class CloudwatchMetricsSource(MetricsSource):
def __init__(self, config: Dict) -> None:
super().__init__(config)
logger.info(f"Initializing CloudwatchMetricsSource from config {config}")
if "aws_region" in config:
self.aws_region = config["aws_region"]
if "aws_region" in config["cloudwatch"]:
self.aws_region = config["cloudwatch"]["aws_region"]
self.boto_config = botocore.config.Config(region_name=self.aws_region)
else:
self.aws_region = None
Expand Down Expand Up @@ -211,10 +223,7 @@ def __init__(self, config: Dict) -> None:
super().__init__(config)
logger.info(f"Initializing PrometheusMetricsSource from config {config}")

v = Validator(PROMETHEUS_SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for PrometheusMetricsSource", v.errors)
self.endpoint = config["endpoint"]
self.endpoint = config["prometheus"]["endpoint"]

def get_metrics(self, recent=False) -> Dict[str, List[str]]:
logger.info(f"{self.__class__.__name__}.get_metrics called with {recent=}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import Set


def contains_one_of(values_to_restrict: Set):
"""
Generates a validator that checks if a value contains exactly one of the specified keys.
"""
def one_of(field, value, error):
found_objects = values_to_restrict.intersection(value.keys())
if len(found_objects) > 1:
error(field, f"More than one value is present: {sorted(found_objects)}")
elif len(found_objects) < 1:
error(field, f"No values are present from set: {sorted(values_to_restrict)}")
return one_of
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
source_cluster:
endpoint: "https://capture-proxy-es:9200"
allow_insecure: true
authorization:
type: "basic_auth"
details:
basic_auth:
username: "admin"
password: "admin"
target_cluster:
endpoint: "https://opensearchtarget:9200"
allow_insecure: true
authorization:
type: "basic_auth"
details:
basic_auth:
username: "admin"
password: "myStrongPassword123!"
metrics_source:
type: "prometheus"
endpoint: "http://prometheus:9090"
prometheus:
endpoint: "http://prometheus:9090"
replayer:
deployment_type: "docker"
docker:
Loading

0 comments on commit 924de8e

Please sign in to comment.