Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Package improvements #65

Merged
merged 10 commits into from
Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 150 additions & 113 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import pathlib
import typing

from pydantic import BaseModel, validator

from clp_py_utils.pretty_size import pretty_size
from .core import get_config_value, make_config_path_absolute, read_yaml_config_file, validate_path_could_be_dir

# Constants
CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path('etc') / 'credentials.yml'


class Database(BaseModel):
type: str
host: str
port: int
username: str
password: str
name: str
type: str = 'mariadb'
host: str = 'localhost'
port: int = 3306
name: str = 'clp-db'
ssl_cert: typing.Optional[str] = None
auto_commit: bool = False
compress: bool = True

username: typing.Optional[str] = None
password: typing.Optional[str] = None

@validator('type')
def validate_database_type(cls, field):
supported_database_type = ['mysql', 'mariadb', 'bundled']
if field not in supported_database_type:
raise ValueError(f'must be one of the following {"|".join(supported_database_type)}')
supported_database_types = ['mysql', 'mariadb']
if field not in supported_database_types:
raise ValueError(f"database.type must be one of the following {'|'.join(supported_database_types)}")
return field

def get_mysql_connection_params(self):
# Currently, mysql's connector parameter is the same as mariadb
connection_params = {
'host': self.host,
'port': self.port,
'user': self.username,
'password': self.password,
'database': self.name,
'compress': self.compress,
'autocommit': self.auto_commit
}
if self.ssl_cert:
connection_params['ssl_cert'] = self.ssl_cert
return connection_params
@validator('name')
def validate_database_name(cls, field):
if '' == field:
raise ValueError("database.name cannot be empty.")
return field

def get_mariadb_connection_params(self):
# Currently, mysql's connector parameter is the same as mysql
@validator('host')
def validate_database_host(cls, field):
if '' == field:
raise ValueError("database.host cannot be empty.")
return field

def ensure_credentials_loaded(self):
if self.username is None or self.password is None:
raise ValueError("Credentials not loaded.")

def get_mysql_connection_params(self, disable_localhost_socket_connection: bool = False):
self.ensure_credentials_loaded()

host = self.host
if disable_localhost_socket_connection and 'localhost' == self.host:
host = '127.0.0.1'

# Currently, mysql's connection parameters are the same as mariadb
connection_params = {
'host': self.host,
'host': host,
'port': self.port,
'user': self.username,
'password': self.password,
Expand All @@ -53,10 +65,17 @@ def get_mariadb_connection_params(self):
connection_params['ssl_cert'] = self.ssl_cert
return connection_params

def get_clp_connection_params_and_type(self):
def get_clp_connection_params_and_type(self, disable_localhost_socket_connection: bool = False):
self.ensure_credentials_loaded()

host = self.host
if disable_localhost_socket_connection and 'localhost' == self.host:
host = '127.0.0.1'

connection_params_and_type = {
'type': 'mysql', # hard code this as mysql as CLP only support "mysql" for global database
'host': self.host,
# NOTE: clp-core does not distinguish between mysql and mariadb
'type': 'mysql',
'host': host,
'port': self.port,
'username': self.username,
'password': self.password,
Expand All @@ -71,114 +90,132 @@ def get_clp_connection_params_and_type(self):


class Scheduler(BaseModel):
host: str
jobs_poll_delay: int
jobs_poll_delay: int = 1 # seconds


class SchedulerQueue(BaseModel):
host: str
port: int
username: str
password: str
class Queue(BaseModel):
host: str = 'localhost'
port: int = 5672

username: typing.Optional[str]
password: typing.Optional[str]

class ArchiveOutput(BaseModel):
type: str # Support only 'fs' type for now
directory: str
target_archive_size: int
target_dictionaries_size: int
target_encoded_file_size: int
target_segment_size: int

@validator('type')
def validate_type(cls, field):
if 'fs' != field:
raise ValueError('only fs type is supported in the opensource distribution')
return field
class ArchiveOutput(BaseModel):
directory: pathlib.Path = pathlib.Path('var') / 'data' / 'archives'
target_archive_size: int = 256 * 1024 * 1024 # 256 MB
target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB
target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB
target_segment_size: int = 256 * 1024 * 1024 # 256 MB

@validator('target_archive_size')
def validate_target_archive_size(cls, field):
if field <= 0:
raise ValueError('target_archive_size parameter must be greater than 0')
raise ValueError('target_archive_size must be greater than 0')
return field

@validator('target_dictionaries_size')
def validate_target_dictionaries_size(cls, field):
if field <= 0:
raise ValueError('target_dictionaries_size parameter must be greater than 0')
raise ValueError('target_dictionaries_size must be greater than 0')
return field

@validator('target_encoded_file_size')
def validate_target_encoded_file_size(cls, field):
if field <= 0:
raise ValueError('target_encoded_file_size parameter must be greater than 0')
raise ValueError('target_encoded_file_size must be greater than 0')
return field

@validator('target_segment_size')
def validate_target_segment_size(cls, field):
if field <= 0:
raise ValueError('target_segment_size parameter must be greater than 0')
raise ValueError('target_segment_size must be greater than 0')
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d['directory'] = str(d['directory'])
return d


class CLPConfig(BaseModel):
input_logs_dfs_path: str
database: Database
scheduler: Scheduler
scheduler_queue: SchedulerQueue
archive_output: ArchiveOutput
data_directory: str
logs_directory: str

def generate_config_file_content_with_comments(self):
file_content = [
f'# A path containing any logs you which to compress. Must be reachable by all workers.',
f'# - This path will be exposed inside the docker container.',
f'# - This path should not be any path that exists in the container image (an Ubuntu image) (e.g., /var/log).',
f'# - Limitations: Docker follow symlink outside context, therefore, we recommend avoiding symbolic links',
f'input_logs_dfs_path: {self.input_logs_dfs_path}',
f'',
f'database:',
f' type: {self.database.type}',
f' host: {self.database.host}',
f' port: {self.database.port}',
f' username: {self.database.username}',
f' password: {self.database.password}',
f' name: {self.database.name}',
f'',
f'scheduler:',
f' host: {self.scheduler.host}',
f' jobs_poll_delay: {self.scheduler.jobs_poll_delay} # Seconds',
f'',
f'scheduler_queue:',
f' host: {self.scheduler_queue.host}',
f' port: {self.scheduler_queue.port}',
f' username: {self.scheduler_queue.username}',
f' password: {self.scheduler_queue.password}',
f'',
f'# Where archives should be output to',
f'# Note: Only one output type may be specified',
f'archive_output:',
f' type: {self.archive_output.type}',
f' directory: "{self.archive_output.directory}"',
f'',
f' # How much data CLP should try to compress into each archive',
f' target_archive_size: {self.archive_output.target_archive_size} # {pretty_size(self.archive_output.target_archive_size)}',
f'',
f' # How large the dictionaries should be allowed to get before the archive is closed and a new one is created',
f' target_dictionaries_size: {self.archive_output.target_dictionaries_size} # {pretty_size(self.archive_output.target_dictionaries_size)}',
f'',
f' # How large each encoded file should be before being split into a new encoded file',
f' target_encoded_file_size: {self.archive_output.target_encoded_file_size} # {pretty_size(self.archive_output.target_encoded_file_size)}',
f'',
f' # How much data CLP should try to fit into each segment within an archive',
f' target_segment_size: {self.archive_output.target_segment_size} # {pretty_size(self.archive_output.target_segment_size)}',
f'',
f'# Location where other data is stored',
f'data_directory: "{self.data_directory}"',
f'',
f'# Location where logs are stored',
f'logs_directory: "{self.logs_directory}"',
f'',
]
return '\n'.join(file_content)
execution_container: str = 'ghcr.io/y-scope/clp/clp-execution-x86-ubuntu-focal:main'

input_logs_directory: pathlib.Path = pathlib.Path('/')

database: Database = Database()
scheduler: Scheduler = Scheduler()
queue: Queue = Queue()
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

archive_output: ArchiveOutput = ArchiveOutput()
data_directory: pathlib.Path = pathlib.Path('var') / 'data'
logs_directory: pathlib.Path = pathlib.Path('var') / 'log'

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)

def validate_input_logs_dir(self):
# NOTE: This can't be a pydantic validator since input_logs_dir might be a package-relative
# path that will only be resolved after pydantic validation
input_logs_dir = self.input_logs_directory
if not input_logs_dir.exists():
raise ValueError(f"input_logs_directory '{input_logs_dir}' doesn't exist.")
if not input_logs_dir.is_dir():
raise ValueError(f"input_logs_directory '{input_logs_dir}' is not a directory.")

def validate_archive_output_dir(self):
try:
validate_path_could_be_dir(self.archive_output.directory)
except ValueError as ex:
raise ValueError(f"archive_output.directory is invalid: {ex}")

def validate_data_dir(self):
try:
validate_path_could_be_dir(self.data_directory)
except ValueError as ex:
raise ValueError(f"data_directory is invalid: {ex}")

def validate_logs_dir(self):
try:
validate_path_could_be_dir(self.logs_directory)
except ValueError as ex:
raise ValueError(f"logs_directory is invalid: {ex}")

def load_database_credentials_from_file(self):
config = read_yaml_config_file(self.credentials_file_path)
if config is None:
raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.")
try:
self.database.username = get_config_value(config, 'db.user')
self.database.password = get_config_value(config, 'db.password')
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")

def load_queue_credentials_from_file(self):
config = read_yaml_config_file(self.credentials_file_path)
if config is None:
raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.")
try:
self.queue.username = get_config_value(config, "queue.user")
self.queue.password = get_config_value(config, "queue.password")
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")

def dump_to_primitive_dict(self):
d = self.dict()
d['archive_output'] = self.archive_output.dump_to_primitive_dict()
# Turn paths into primitive strings
d['input_logs_directory'] = str(self.input_logs_directory)
d['credentials_file_path'] = str(self.credentials_file_path)
d['data_directory'] = str(self.data_directory)
d['logs_directory'] = str(self.logs_directory)
return d
2 changes: 0 additions & 2 deletions components/clp-py-utils/clp_py_utils/clp_io_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ class PathsToCompress(BaseModel):


class InputConfig(BaseModel):
type: str
list_path: str
path_prefix_to_remove: str = None


class OutputConfig(BaseModel):
type: str
target_archive_size: int
target_dictionaries_size: int
target_segment_size: int
Expand Down
60 changes: 0 additions & 60 deletions components/clp-py-utils/clp_py_utils/clp_package_config.py

This file was deleted.

Loading