Skip to content

Commit

Permalink
Initial commit to the altinity-datasets project
Browse files Browse the repository at this point in the history
Added prototype implementation to describe and load datasets to
ClickHouse.  Also added three simple datasets.
  • Loading branch information
granadata committed Mar 5, 2019
1 parent 750ff82 commit ede9653
Show file tree
Hide file tree
Showing 21 changed files with 739 additions and 0 deletions.
70 changes: 70 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
.venv

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover

# Translations
*.mo
*.pot

# Django stuff:
*.log

# Sphinx documentation
docs/_build/

# PyBuilder
target/

.idea/

docs/.bundle
docs/_site
docs/vendor
docs/Gemfile.lock

# Automatically generated system test config
**/auto.*
7 changes: 7 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Manifest including built-in datasets.

# Include all Python files.
recursive-include altinity_datasets *.py

# Include contents of built-in datasets directory.
graft built-ins
Binary file added altinity_datasets/.ad_cli.py.swp
Binary file not shown.
Empty file added altinity_datasets/__init__.py
Empty file.
87 changes: 87 additions & 0 deletions altinity_datasets/ad_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) 2019 Altinity LTLD
#
# This product is licensed to you under the
# Apache License, Version 2.0 (the "License").
# You may not use this product except in compliance with the License.
#
# This product may include a number of subcomponents with
# separate copyright notices and license terms. Your use of the source
# code for the these subcomponents is subject to the terms and
# conditions of the subcomponent's license, as noted in the LICENSE file.
#
import click
import pkg_resources
import platform

import altinity_datasets.api as api

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.group(context_settings=CONTEXT_SETTINGS, invoke_without_command=True)
@click.pass_context
@click.option(
'-V', '--verbose', is_flag=True, default=False, help='Print verbose output')
def ad_cli(ctx, verbose):
"""Altinity Dataset CLI"""
if ctx.invoked_subcommand is None:
click.secho(ctx.get_help())
return

@ad_cli.command(short_help='Show version')
@click.pass_context
def version(ctx):
"""Show version"""
try:
version = pkg_resources.require("ds-cli")[0].version
except:
version = '0.0'
version_string = 'ds-cli {0}, Python {1}'.format(version, platform.python_version())
print(version_string)

@ad_cli.command(short_help='List dataset repositories')
@click.pass_context
def repos(ctx):
"""Show available dataset repositories"""
repos = api.repos()
_print_dict_vertical(repos, ['name', 'description'])

@ad_cli.command(short_help='Describe dataset(s)')
@click.pass_context
@click.argument('name', metavar='<name>', required=False)
@click.option(
'-r', '--repo', default='built-ins', help='Datasets repository')
def describe(ctx, name, repo):
datasets = api.describe(name, repo=repo)
_print_dict_vertical(datasets,
['repo', 'name', 'title', 'description', 'size', 'sources'])

@ad_cli.command(short_help='Load dataset')
@click.pass_context
@click.argument('name', metavar='<name>', required=True)
@click.option(
'-r', '--repo', default='built-ins', help='Datasets repository')
@click.option(
'-h', '--host', default='localhost', help='Server host')
@click.option(
'-p', '--parallel', default=5, help='Number of threads to run in parallel')
@click.option(
'-C', '--clean', is_flag=True, default=False, help='Clean existing database')
@click.option(
'-D', '--dry_run', is_flag=True, default=False, help='Print commands only')
def load(ctx, name, repo, host, parallel, clean, dry_run):
print("{0} {1} {2} {3} {4}".format(name, host, parallel, clean, dry_run))
api.load(name, repo=repo, host=host, parallel=parallel, clean=clean, dry_run=dry_run)

def _print_dict_vertical(dictionaries, columns):
"""Print dictionary contents vertically"""
max_width = 0
for col in columns:
max_width = max(len(col), max_width)
format_string = "{{0:<{0}}}: {{1}}".format(max_width)
# Print data.
for d in dictionaries:
print("--------------------------------------------------------")
for col in columns:
print(format_string.format(col, d.get(col)))

if __name__ == '__main__':
ad_cli()
128 changes: 128 additions & 0 deletions altinity_datasets/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python3

import glob
import os
import pkg_resources
import re
import sys
import time
import yaml

from clickhouse_driver import Client

from altinity_datasets.proc_pool import ProcessPool

# A list of known repo locations.
REPOS = [
{'name': 'built-ins', 'description': 'Baked into package'},
]
# Base directory of the installation.
BASE = os.path.join(os.path.dirname(__file__), '..')

def _sql(conn, sql, verbose=False, dry_run=False):
"""Execute a SQL command"""
if verbose:
print("DEBUG: {0}".format(sql))
if not dry_run:
conn.execute(sql)

def repos():
"""List known repos"""
return REPOS

def describe(name, repo=None):
"""Describe one or more data sets
:param name: (str): If specified show only datasets that match this name
:param host: (str): If specified search only this repo
"""
datasets = []
if repo is None:
search_list = ['built-ins']
else:
search_list = [repo]

for repo in search_list:
dir = os.path.join(BASE, repo)
children = [os.path.join(dir, child) for child in os.listdir(dir)]
for child in children:
manifest_yaml = os.path.join(child, 'manifest.yaml')
if not os.path.exists(manifest_yaml):
print("Not found " + manifest_yaml)
continue
with open(manifest_yaml, "r") as f:
manifest = yaml.safe_load(f)
manifest['repo'] = repo
manifest['name'] = os.path.basename(child)
datasets.append(manifest)

return datasets

def load(name, repo=None, host='localhost', parallel=5, clean=False,
verbose=False, dry_run=False):
"""Load a sample data set
:param name: (str): Name of dataset
:param repo: (str): Repo name or None to search all repos
:param host: (str): ClickHouse server host name
:param parallel: (int): Number of processes to run in parallen when loading
:param clean: (boolean): If True wipe out existing data
:param dry_run: (boolean): If True print commands instead of executing them
"""
if not os.path.exists(name) or not os.path.isdir(name):
raise("Invalid load path: {0}".format(name))

# Database is the name of the directory.
database = os.path.basename(name)
print("Loading to host: {0} database: {1}".format(host, database))

# Clear database if requested.
client_0 = Client(host)
if clean:
print("Dropping database if it exists: {0}".format(database))
_sql(client_0,
"DROP DATABASE IF EXISTS {0}".format(database),
dry_run=dry_run)

# Create database.
print("Creating database if it does not exist: {0}".format(database))
_sql(client_0,
"CREATE DATABASE IF NOT EXISTS {0}".format(database),
dry_run)

# We can now safely reference the database.
client = Client(host, database=database)

# Load table definitions in sequence.
ddl_path = os.path.join(name, "ddl")
for sql_file in glob.glob(ddl_path + "/*"):
print("Executing SQL script: {0}".format(sql_file))
with open(sql_file, 'r') as f:
script = f.read()
_sql(client, script, dry_run=dry_run)

# Define load scripts for each CSV load file.
data_path = os.path.join(name, "data")
load_commands = []
for table_dir in glob.glob(data_path + "/*"):
print("Processing table data: {0}".format(table_dir))
table = os.path.basename(table_dir)
load_sql = "INSERT INTO {0} FORMAT CSVWithNames".format(table)
csv_files = glob.glob(table_dir + "/*csv*")
for csv_file in sorted(csv_files):
if re.match(r'^.*csv\.gz', csv_file):
cat_cmd = "gzip -d -c {0}".format(csv_file)
elif re.match(r'^.*csv', csv_file):
cat_cmd = "cat {0}".format(csv_file)
else:
print("Unable figure out how to open file: {0}".format(csv_file))
sys.exit(1)

client_cmd = "clickhouse-client --database={0} --host={1} --query='{2}'".format(database, host, load_sql)
load_command = cat_cmd + " | " + client_cmd
load_commands.append(load_command)

# Execute the load commands.
pool = ProcessPool(size=parallel, dry_run=dry_run)
for cmd in load_commands:
pool.exec(cmd)
pool.drain()
print(pool.outputs)
61 changes: 61 additions & 0 deletions altinity_datasets/proc_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python3

import argparse
import glob
import os
import re
import subprocess
import sys
import time

class ProcessPool:
"""Service for executing processes in parallel"""
def __init__(self, size=5, dry_run=None):
"""Instantiate a new pool
:param size: (int): Number of concurrent processes to run
:param dry_run: (boolean): If true just show what we would run
"""
self.size = size
self.slots = []
self.outputs = []
if dry_run is None:
self.dry_run = False
else:
self.dry_run = dry_run

def exec(self, command):
"""Submit a command for execution, blocking if pool is full
:param command: (str): Shell command to execute
"""
if len(self.slots) >= self.size:
self._wait()
if self.dry_run:
print("Dry run: " + command)
else:
print("Starting a new process: " + command)
process = subprocess.Popen(command, shell=True)
self.slots.append(process)

def drain(self):
"""Wait for all pending commands to finish"""
while len(self.slots) > 0:
self._wait()

def _wait(self):
print("Waiting for command to finish")
cur_len = len(self.slots)
while cur_len > 0 and cur_len == len(self.slots):
for p in self.slots:
status = p.poll()
if status is None:
time.sleep(1)
elif status == 0:
print("Process completed: {}".format(p.args))
self.outputs.append(status)
self.slots.remove(p)
break
else:
print("Process failed: {}".format(p.args))
self.outputs.append(status)
self.slots.remove(p)
break
4 changes: 4 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#/bin/bash
# Build a new release.
rm -r dist
python3 setup.py sdist
Binary file added built-ins/airline/data/airports/Airports.csv.gz
Binary file not shown.
Binary file not shown.
19 changes: 19 additions & 0 deletions built-ins/airline/ddl/airports.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS airports (
AirportID String,
Name String,
City String,
Country String,
IATA String,
ICAO String,
Latitude Float32,
Longitude Float32,
Altitude Int32,
Timezone Float32,
DST String,
Tz String,
Type String,
Source String)
Engine=MergeTree()
PRIMARY KEY AirportID
PARTITION BY Country
ORDER BY AirportID
Loading

0 comments on commit ede9653

Please sign in to comment.