Skip to content

Commit

Permalink
Site map env var (#44)
Browse files Browse the repository at this point in the history
add sitemap env var and separate docker compose files
  • Loading branch information
C-Loftus authored Oct 3, 2024
1 parent c35f47f commit fa45957
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configs
GLEANERIO_HEADLESS_ENDPOINT=http://headless:9222
## this shouldn't be changed
GLEANERIO_HEADLESS_NETWORK=headless_gleanerio
GLEANERIO_HEADLESS_NETWORK=headless_gleanerio ## this shouldn't be changed
REMOTE_GLEANER_SITEMAP=https://geoconnex.us/sitemap.xml # remote sitemap tells us which sources we use to create the gleaner config

# Docker
GLEANERIO_GLEANER_IMAGE=internetofwater/gleaner:latest
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# How to use this repository

- `python3 main.py up`
- `python3 main.py local`
- Navigate to `localhost:3000` to view the dagster UI

Tear down the docker swarm stack with `python3 main.py down`
Expand Down
3 changes: 2 additions & 1 deletion code/lib/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def strict_env(key: str):


def strict_get_tag(context: OpExecutionContext, key: str) -> str:
"""Gets a tag and make sure it exists before running further jobs"""
"""Gets a tag from a dagster runand ensures it exists before running further jobs"""
src = context.run_tags[key]
if src is None:
raise Exception(f"Missing run tag {key}")
Expand Down Expand Up @@ -76,3 +76,4 @@ def strict_get_tag(context: OpExecutionContext, key: str) -> str:
GLEANERIO_NABU_IMAGE = strict_env("GLEANERIO_NABU_IMAGE")
GLEANERIO_DATAGRAPH_ENDPOINT = strict_env("GLEANERIO_DATAGRAPH_ENDPOINT")
GLEANERIO_PROVGRAPH_ENDPOINT = strict_env("GLEANERIO_PROVGRAPH_ENDPOINT")
REMOTE_GLEANER_SITEMAP = strict_env("REMOTE_GLEANER_SITEMAP")
18 changes: 8 additions & 10 deletions code/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ def run_scheduler_docker_image(
image_name: str, # the name of the docker image to pull and validate
args: List[str], # the list of arguments to pass to the gleaner/nabu command
action_name: str, # the name of the action to run inside gleaner/nabu
) -> int:
"""Run a docker image inside the context of dagster"""
container_name = f"sch_{source}_{action_name}"
):
"""Run a docker image inside the dagster docker runtime"""
container_name = f"{source}_{action_name}"

get_dagster_logger().info(f"Datagraph value: {GLEANERIO_DATAGRAPH_ENDPOINT}")
get_dagster_logger().info(f"PROVgraph value: {GLEANERIO_PROVGRAPH_ENDPOINT}")
get_dagster_logger().info(f"Provgraph value: {GLEANERIO_PROVGRAPH_ENDPOINT}")

run_container_context = DockerContainerContext.create_for_run(
context.dagster_run,
Expand All @@ -114,7 +114,7 @@ def run_scheduler_docker_image(

# Create a service var at the beginning of the function so we can check against
# it during cleanup to see if the service was created.
service = None
service: Optional[docker.models.services.Service] = None
try:
op_container_context = DockerContainerContext(
networks=[GLEANER_HEADLESS_NETWORK],
Expand Down Expand Up @@ -164,11 +164,8 @@ def run_scheduler_docker_image(
get_dagster_logger().info("Sent container Logs to s3: ")

if exit_status != 0:
get_dagster_logger().error(
f"Gleaner/Nabu container returned exit code {exit_status}. See logs in S3"
)
raise Exception(
f"Gleaner/Nabu container returned exit code {exit_status}. See logs in S3 "
f"{container_name} returned exit code '{exit_status}'. See logs in S3"
)
finally:
if service:
Expand All @@ -177,7 +174,7 @@ def run_scheduler_docker_image(


def slack_error_fn(context: RunFailureSensorContext) -> str:
get_dagster_logger().info("Sending notification to slack")
get_dagster_logger().info("Sending notification to Slack")
# The make_slack_on_run_failure_sensor automatically sends the job
# id and name so you can just send the error. We don't need other data in the string
source_being_crawled = context.partition_key
Expand All @@ -188,6 +185,7 @@ def slack_error_fn(context: RunFailureSensorContext) -> str:


def template_config(input_template_file_path: str) -> str:
"""Fill in a template with shared env vars and return the templated data"""
vars_in_both_nabu_and_gleaner_configs = {
var: strict_env(var)
for var in [
Expand Down
12 changes: 7 additions & 5 deletions code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
slack_error_fn,
template_config,
)
from urllib.parse import urlparse

from lib.env import (
GLEANER_GRAPH_URL,
REMOTE_GLEANER_SITEMAP,
GLEANERIO_DATAGRAPH_ENDPOINT,
GLEANERIO_GLEANER_IMAGE,
GLEANERIO_NABU_IMAGE,
Expand Down Expand Up @@ -63,17 +65,15 @@ def gleaner_config(context: AssetExecutionContext):
# Fill in the config with the common minio configuration
templated_base = yaml.safe_load(template_config(input_file))

sitemap_url = "https://geoconnex.us/sitemap.xml"
# Parse the sitemap index for the referenced sitemaps for a config file
r = requests.get(sitemap_url)
r = requests.get(REMOTE_GLEANER_SITEMAP)
xml = r.text
sitemapTags = BeautifulSoup(xml, features="xml").find_all("sitemap")
Lines: list[str] = [sitemap.findNext("loc").text for sitemap in sitemapTags]

sources = []
names = set()
for line in Lines:
basename = sitemap_url.removesuffix(".xml")
basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml")
name = (
line.removeprefix(basename)
.removesuffix(".xml")
Expand All @@ -86,14 +86,16 @@ def gleaner_config(context: AssetExecutionContext):
print(f"Warning! Skipping duplicate name {name}")
continue

parsed_url = urlparse(REMOTE_GLEANER_SITEMAP)
protocol, hostname = parsed_url.scheme, parsed_url.netloc
data = {
"sourcetype": "sitemap",
"name": name,
"url": line.strip(),
"headless": "false",
"pid": "https://gleaner.io/genid/geoconnex",
"propername": name,
"domain": "https://geoconnex.us",
"domain": f"{protocol}://{hostname}",
"active": "true",
}
names.add(name)
Expand Down
1 change: 1 addition & 0 deletions dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ run_launcher:
- GLEANERIO_NABU_IMAGE
- DAGSTER_SLACK_TOKEN
- DAGSTER_HOME
- REMOTE_GLEANER_SITEMAP

network: dagster_network
container_kwargs:
Expand Down
90 changes: 90 additions & 0 deletions docker-compose-core.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Core services needed for running scheduler
# both locally for dev or in a cloud environment for prod

services:
dagster_postgres:
# store the runtime dagster state
image: postgres:11
container_name: dagster_postgres
networks:
- dagster_network
env_file: ".env"
volumes:
- dagster_postgres_data:/var/lib/postgresql/data

dagster_user_code:
image: dagster_user_code_image
container_name: dagster_user_code
restart: always
environment:
DAGSTER_CURRENT_IMAGE: "dagster_user_code_image"
volumes:
- ./code:/opt/dagster/app/code
networks:
- dagster_network
env_file: ".env"

dagster_webserver:
image: dagster_webserver_image
entrypoint:
- dagster-webserver
- -h
- "0.0.0.0"
- -p
- "3000"
- -w
- workspace.yaml
container_name: dagster_webserver
ports:
- "3000:3000"
volumes: # Make docker client accessible so we can terminate containers from the webserver
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- dagster_network
depends_on:
- dagster_postgres
- dagster_user_code
env_file: ".env"

dagster_daemon:
image: dagster_daemon_image
entrypoint:
- dagster-daemon
- run
container_name: dagster_daemon
restart: on-failure
volumes: # Make docker client accessible so we can launch containers using host docker
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- dagster_network
depends_on:
- dagster_postgres
- dagster_user_code
env_file: ".env"

headless:
# image: chromedp/headless-shell:stable
# stable after 105 causes "devtool: CreateURL: Using unsafe HTTP verb GET to invoke /json/new. This action supports only PUT verb.",
# previous image: chromedp/headless-shell:105.0.5195.127 unclear if we can upgrade since neither seems to work
# seems like might need to pull this before if on macos since we cant specify platform in docker compose
# docker run -it --platform linux/amd64 -p 9222:9222 --rm --entrypoint /bin/bash chromedp/headless-shell:latest
image: chromedp/headless-shell:latest
ports:
- 9222:9222
environment:
- SERVICE_PORTS=9222
networks:
- headless_gleanerio
- dagster_network

networks:
dagster_network:
external: true
headless_gleanerio:
name: headless_gleanerio
external: true

volumes:
dagster_postgres_data:
78 changes: 78 additions & 0 deletions docker-compose-local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# A compose file which spins up
# local versions of services for scheduler
# useful for local dev or
# less optimized deployment to a simplified cloud environment

services:
minio:
image: minio/minio
container_name: minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
networks:
- headless_gleanerio
- dagster_network
env_file: ".env"

createbuckets:
# Creates the buckets on minio for the harvester to move assets into
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 10;
/usr/bin/mc alias set myminio http://minio:9000 minio_access_key minio_secret_key;
/usr/bin/mc mb myminio/gleanerbucket;
/usr/bin/mc anonymous set public myminio/gleanerbucket;
sleep infinity;
"
networks:
- dagster_network

createrepositories:
# Applies migrations aka "repositories" after the database is created
image: alpine/curl
depends_on:
- graphdb
# Apply the local config files into the graphdb. We sleep 5 since even after the service is up, the graphdb is not fully ready
entrypoint: >
/bin/sh -c "
sleep 5;
curl -X POST http://graphdb:7200/rest/repositories -H 'Content-Type: multipart/form-data' -F 'config=@templates/iow-config.ttl';
curl -X POST http://graphdb:7200/rest/repositories -H 'Content-Type: multipart/form-data' -F 'config=@templates/iowprov-config.ttl';
sleep infinity"
volumes:
- ./templates:/templates
networks:
- dagster_network

graphdb:
image: khaller/graphdb-free
container_name: graphdb
ports:
- 7200:7200
environment:
- JAVA_XMX=4g
- JAVA_XMS=2048m
volumes:
- graphdb_data:/opt/graphdb/data
networks:
- headless_gleanerio
- dagster_network

networks:
dagster_network:
external: true
headless_gleanerio:
name: headless_gleanerio
external: true

# Used for persistence
volumes:
minio_data:
graphdb_data:
2 changes: 1 addition & 1 deletion docs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ If the **job** for an asset generation fails, a slack notification will be sent.
## Running Dagster and the Geoconnex crawler

The functionality of this repository is in rapid development. That being said, the general pattern is to generate the necessary Docker stack and configurations using the Python `main.py up` CLI at the root of this repo, run the docker swarm via the same CLI, and then open the Dagster UI at localhost:3000.
The functionality of this repository is in rapid development. That being said, the general pattern is to generate the necessary Docker stack and configurations using the Python `main.py local` CLI at the root of this repo, run the docker swarm via the same CLI, and then open the Dagster UI at localhost:3000.

> NOTE: You will need to install the proper Python dependencies from `requirements.txt` before running the CLI. Other dependencies for code that is run inside Dagster are handled by Docker.
Expand Down
33 changes: 20 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


def run_subprocess(command: str):
"""Run a shell command and stream the output in realtime"""
process = subprocess.Popen(
command, shell=True, stdout=sys.stdout, stderr=sys.stderr
)
Expand All @@ -21,10 +22,10 @@ def down():
run_subprocess("docker swarm leave --force || true")


def up(env: str = ".env"):
"""Generate all config files and run the docker swarm stack"""
def up(local: bool):
"""Run the docker swarm stack"""

if not os.path.exists(env):
if not os.path.exists(".env"):
print("Missing .env file. Do you want to copy .env.example to .env ? (y/n)")
answer = input().lower()
if answer == "y" or answer == "yes":
Expand All @@ -42,7 +43,7 @@ def up(env: str = ".env"):
"docker network create --driver overlay --attachable dagster_network"
)

# hard code this so we don't need to use dotenv to load it
# hard code this so we don't need to use dotenv to load just one env var
# network_name = strict_env("GLEANERIO_HEADLESS_NETWORK")
network_name = "headless_gleanerio"

Expand Down Expand Up @@ -101,28 +102,34 @@ def up(env: str = ".env"):
run_subprocess(
"docker build -t dagster_daemon_image -f ./Docker/Dockerfile_dagster ."
)

if local:
compose_files = "-c docker-compose-core.yaml -c docker-compose-local.yaml"
else:
compose_files = "-c docker-compose-core.yaml"

run_subprocess(
"docker stack deploy -c docker-compose-swarm.yaml geoconnex_crawler --detach=false"
f"docker stack deploy {compose_files} geoconnex_crawler --detach=false"
)


def main():
parser = argparse.ArgumentParser(description="Docker Swarm Stack Management")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("down", help="Stop the docker swarm stack")

up_parser = subparsers.add_parser(
"up", help="Generate all config files and run the docker swarm stack"
)
up_parser.add_argument(
"--env", type=str, default=".env", help="File containing your env vars"
subparsers.add_parser("local", help="Spin up the docker swarm stack with local s3")
subparsers.add_parser(
"prod",
help="Spin up the docker swarm stack without local s3; requires remote s3 service in .env",
)

args = parser.parse_args()
if args.command == "down":
down()
elif args.command == "up":
up(args.env)
elif args.command == "local":
up(local=True)
elif args.command == "prod":
up(local=False)
else:
parser.print_help()

Expand Down

0 comments on commit fa45957

Please sign in to comment.