Skip to content

Commit

Permalink
sketch out orchestratord side of the external orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
doy-materialize committed Dec 23, 2024
1 parent 1be6313 commit 9e96ed8
Showing 1 changed file with 184 additions and 3 deletions.
187 changes: 184 additions & 3 deletions src/orchestratord/src/controller/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
// by the Apache License, Version 2.0.

use std::{
collections::BTreeSet,
collections::{BTreeMap, BTreeSet},
fmt::Display,
str::FromStr,
sync::{Arc, Mutex},
time::Duration,
};

use http::HeaderValue;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
use kube::{api::PostParams, runtime::controller::Action, Api, Client, Resource, ResourceExt};
use serde::Deserialize;
use tracing::{debug, trace};
use tokio_postgres::NoTls;
use tracing::{debug, trace, warn};

use crate::metrics::Metrics;
use mz_cloud_provider::CloudProvider;
Expand All @@ -27,7 +29,12 @@ use mz_cloud_resources::crd::materialize::v1alpha1::{
};
use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
use mz_orchestrator_tracing::TracingCliArgs;
use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
use mz_ore::{
cast::CastFrom,
cli::KeyValueArg,
instrument,
task::{spawn, AbortOnDropHandle},
};

pub mod balancer;
pub mod console;
Expand All @@ -54,6 +61,8 @@ pub struct Args {
enable_prometheus_scrape_annotations: bool,
#[clap(long)]
disable_authentication: bool,
#[clap(long)]
use_external_orchestrator: bool,

#[clap(long)]
segment_api_key: Option<String>,
Expand Down Expand Up @@ -158,6 +167,14 @@ impl Args {
self.environmentd_internal_http_port
)
}

fn environmentd_internal_sql_address(&self, namespace: &str, service_name: &str) -> String {
format!(
"{}:{}",
self.environmentd_internal_hostname(namespace, service_name),
self.environmentd_internal_sql_port
)
}
}

#[derive(Deserialize, Default)]
Expand Down Expand Up @@ -223,12 +240,126 @@ impl Display for Error {
}
}

pub enum SubscribeMessage {
EnsureService(String),
DropService(String),
}

struct EnvironmentWorker {
client: Client,
namespace: String,
name: String,
internal_pgwire_url: String,
}

impl EnvironmentWorker {
fn new(client: Client, namespace: String, name: String, internal_pgwire_url: String) -> Self {
Self {
client,
namespace,
name,
internal_pgwire_url,
}
}

async fn run(&self, initial_generation: u64) {
// this is required to break the bootstrapping loop, since we can't
// run subscribe queries until there is a system cluster available to
// run them on
self.ensure_replica(
&format!("cluster-s1-replica-s1-gen-{}", initial_generation),
false,
)
.await;

let mut active_client = None;
loop {
if let Some(client) = active_client.as_mut() {
if let Err(e) = self.subscribe(client).await {
warn!("lost subscribe connection: {e}");
active_client = None;
}
} else {
if let Ok(client) = self.reconnect().await {
active_client = Some(client);
}
}
}
}

async fn ensure_replica(&self, replica_name: &str, write_status: bool) {
// ensure_service logic from orchestrator-kubernetes goes here
todo!()
}

async fn drop_replica(&self, replica_name: &str) {
// drop_service logic from orchestrator-kubernetes goes here
todo!()
}

async fn subscribe(&self, client: &mut tokio_postgres::Client) -> Result<(), anyhow::Error> {
let transaction = client.transaction().await?;
transaction
.execute(
"DECLARE c CURSOR FOR SUBSCRIBE (SELECT id, state FROM mz_internal.mz_external_orchestrator_services) ENVELOPE UPSERT (KEY (id));",
&[],
)
.await?;
loop {
let results = transaction.query("FETCH ALL c;", &[]).await?;
for row in results {
let id = row.get("id");
match row.get("mz_state") {
"upsert" => {
self.ensure_replica(id, true).await;
}
"delete" => {
self.drop_replica(id).await;
}
_ => {}
}
}
}
}

async fn reconnect(&self) -> Result<tokio_postgres::Client, anyhow::Error> {
let (client, connection) = match tokio::time::timeout(
Duration::from_secs(5),
tokio_postgres::connect(&self.internal_pgwire_url, NoTls),
)
.await
{
Ok(Ok((client, connection))) => (client, connection),
Ok(Err(err)) => {
warn!("failed to connect to environmentd: {err}");
return Err(err.into());
}
Err(err) => {
warn!("timed out connecting to environmentd");
return Err(err.into());
}
};

mz_ore::task::spawn(
|| format!("postgres connection for {}/{}", self.namespace, self.name),
async move {
if let Err(e) = connection.await {
panic!("connection error: {}", e);
}
},
);

Ok(client)
}
}

pub struct Context {
config: Args,
tracing: TracingCliArgs,
orchestratord_namespace: String,
metrics: Arc<Metrics>,
needs_update: Arc<Mutex<BTreeSet<String>>>,
environment_workers: Mutex<BTreeMap<String, AbortOnDropHandle<()>>>,
}

impl Context {
Expand All @@ -255,6 +386,7 @@ impl Context {
orchestratord_namespace,
metrics,
needs_update: Default::default(),
environment_workers: Mutex::new(BTreeMap::new()),
}
}

Expand Down Expand Up @@ -297,6 +429,51 @@ impl Context {
)
.await
}

fn start_environment_worker(&self, mz: &Materialize, client: Client) {
let namespace = mz.namespace();
let name = mz.name_unchecked();
let generation = mz
.status
.as_ref()
.map(|status| status.active_generation)
.unwrap_or(1);
let internal_pgwire_url = format!(
"postgres://mz_system@{}/materialize",
self.config.environmentd_internal_sql_address(
&mz.namespace(),
&mz.environmentd_service_name()
)
);
self.environment_workers
.lock()
.unwrap()
.entry(mz.metadata.uid.clone().unwrap())
.or_insert_with(|| {
spawn(
|| {
format!(
"environment worker for {}/{}",
mz.namespace(),
mz.name_unchecked()
)
},
async move {
EnvironmentWorker::new(client, namespace, name, internal_pgwire_url)
.run(generation)
.await
},
)
.abort_on_drop()
});
}

fn stop_environment_worker(&self, mz: &Materialize) {
self.environment_workers
.lock()
.unwrap()
.remove(mz.metadata.uid.as_ref().unwrap());
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -439,6 +616,9 @@ impl k8s_controller::Context for Context {
false,
)
.await?;
if self.config.use_external_orchestrator {
self.start_environment_worker(mz, client.clone());
}
Ok(None)
}
Err(e) => {
Expand Down Expand Up @@ -609,6 +789,7 @@ impl k8s_controller::Context for Context {
mz: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
self.set_needs_update(mz, false);
self.stop_environment_worker(mz);

Ok(None)
}
Expand Down

0 comments on commit 9e96ed8

Please sign in to comment.