Skip to content

Commit

Permalink
feat: get logs from runtime (#459)
Browse files Browse the repository at this point in the history
* feat: get logs from runtime

* refactor: trim down deployer

* refactor: start runtime earlier

* refactor: connect to client earlier

* refactor: hook runtime logs to persistence

* bug: associate deployment id with logs

* refactor: cleanup

* feat: make sure grpc connection stays open
  • Loading branch information
chesedo authored Nov 14, 2022
1 parent 0e8ce8b commit ee342e4
Show file tree
Hide file tree
Showing 18 changed files with 347 additions and 339 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 63 additions & 66 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
use chrono::{DateTime, Utc};
use serde_json::json;
use shuttle_common::STATE_MESSAGE;
use std::{net::SocketAddr, str::FromStr};
use shuttle_proto::runtime;
use std::{net::SocketAddr, str::FromStr, time::SystemTime};
use tracing::{error, field::Visit, span, warn, Metadata, Subscriber};
use tracing_subscriber::Layer;
use uuid::Uuid;
Expand Down Expand Up @@ -126,6 +127,51 @@ impl From<Log> for DeploymentState {
}
}

impl From<runtime::LogItem> for Log {
fn from(log: runtime::LogItem) -> Self {
Self {
id: Uuid::from_slice(&log.id).unwrap(),
state: runtime::LogState::from_i32(log.state).unwrap().into(),
level: runtime::LogLevel::from_i32(log.level).unwrap().into(),
timestamp: DateTime::from(SystemTime::try_from(log.timestamp.unwrap()).unwrap()),
file: log.file,
line: log.line,
target: log.target,
fields: serde_json::from_slice(&log.fields).unwrap(),
r#type: LogType::Event,
address: None,
}
}
}

impl From<runtime::LogState> for State {
fn from(state: runtime::LogState) -> Self {
match state {
runtime::LogState::Queued => Self::Queued,
runtime::LogState::Building => Self::Building,
runtime::LogState::Built => Self::Built,
runtime::LogState::Loading => Self::Loading,
runtime::LogState::Running => Self::Running,
runtime::LogState::Completed => Self::Completed,
runtime::LogState::Stopped => Self::Stopped,
runtime::LogState::Crashed => Self::Crashed,
runtime::LogState::Unknown => Self::Unknown,
}
}
}

impl From<runtime::LogLevel> for LogLevel {
fn from(level: runtime::LogLevel) -> Self {
match level {
runtime::LogLevel::Trace => Self::Trace,
runtime::LogLevel::Debug => Self::Debug,
runtime::LogLevel::Info => Self::Info,
runtime::LogLevel::Warn => Self::Warn,
runtime::LogLevel::Error => Self::Error,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum LogType {
Event,
Expand Down Expand Up @@ -345,7 +391,6 @@ impl Visit for JsonVisitor {
#[cfg(test)]
mod tests {
use std::{
collections::BTreeMap,
fs::read_dir,
path::PathBuf,
sync::{Arc, Mutex},
Expand All @@ -356,15 +401,15 @@ mod tests {
use ctor::ctor;
use flate2::{write::GzEncoder, Compression};
use futures::FutureExt;
use shuttle_service::Logger;
use tokio::{select, sync::mpsc, time::sleep};
use shuttle_proto::runtime::runtime_client::RuntimeClient;
use tokio::{select, time::sleep};
use tonic::transport::Channel;
use tracing_subscriber::prelude::*;
use uuid::Uuid;

use crate::{
deployment::{
deploy_layer::LogType, provisioner_factory, runtime_logger, ActiveDeploymentsGetter,
Built, DeploymentManager, Queued,
deploy_layer::LogType, ActiveDeploymentsGetter, Built, DeploymentManager, Queued,
},
persistence::{SecretRecorder, State},
};
Expand Down Expand Up @@ -430,6 +475,12 @@ mod tests {
}
}

async fn get_runtime_client() -> RuntimeClient<Channel> {
RuntimeClient::connect("http://127.0.0.1:6001")
.await
.unwrap()
}

#[async_trait::async_trait]
impl SecretRecorder for Arc<Mutex<RecorderMock>> {
type Err = std::io::Error;
Expand All @@ -450,54 +501,6 @@ mod tests {
}
}

struct StubAbstractProvisionerFactory;

impl provisioner_factory::AbstractFactory for StubAbstractProvisionerFactory {
type Output = StubProvisionerFactory;

fn get_factory(&self) -> Self::Output {
StubProvisionerFactory
}
}

struct StubProvisionerFactory;

#[async_trait::async_trait]
impl shuttle_service::Factory for StubProvisionerFactory {
async fn get_db_connection_string(
&mut self,
_db_type: shuttle_common::database::Type,
) -> Result<String, shuttle_service::Error> {
panic!("did not expect any deploy_layer test to connect to the database")
}

async fn get_secrets(
&mut self,
) -> Result<BTreeMap<String, String>, shuttle_service::Error> {
panic!("did not expect any deploy_layer test to get secrets")
}

fn get_service_name(&self) -> shuttle_service::ServiceName {
panic!("did not expect any deploy_layer test to get the service name")
}
}

struct StubRuntimeLoggerFactory;

impl runtime_logger::Factory for StubRuntimeLoggerFactory {
fn get_logger(&self, id: Uuid) -> Logger {
let (tx, mut rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
while let Some(log) = rx.recv().await {
println!("{log}")
}
});

Logger::new(tx, id)
}
}

#[derive(Clone)]
struct StubActiveDeploymentGetter;

Expand All @@ -516,8 +519,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn deployment_to_be_queued() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down Expand Up @@ -635,8 +637,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn deployment_self_stop() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down Expand Up @@ -715,8 +716,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn deployment_bind_panic() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down Expand Up @@ -795,8 +795,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn deployment_main_panic() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down Expand Up @@ -870,8 +869,7 @@ mod tests {
#[tokio::test]
async fn deployment_from_run() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down Expand Up @@ -924,8 +922,7 @@ mod tests {
#[tokio::test]
async fn scope_with_nil_id() {
let deployment_manager = DeploymentManager::new(
StubAbstractProvisionerFactory,
StubRuntimeLoggerFactory,
get_runtime_client().await,
RECORDER.clone(),
RECORDER.clone(),
StubActiveDeploymentGetter,
Expand Down
16 changes: 6 additions & 10 deletions deployer/src/deployment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
pub mod deploy_layer;
pub mod provisioner_factory;
mod queue;
mod run;
pub mod runtime_logger;

use std::path::PathBuf;

pub use queue::Queued;
pub use run::{ActiveDeploymentsGetter, Built};
use shuttle_proto::runtime::runtime_client::RuntimeClient;
use tonic::transport::Channel;
use tracing::instrument;

use crate::persistence::{SecretRecorder, State};
Expand All @@ -30,8 +30,7 @@ impl DeploymentManager {
/// Create a new deployment manager. Manages one or more 'pipelines' for
/// processing service building, loading, and deployment.
pub fn new(
abstract_dummy_factory: impl provisioner_factory::AbstractFactory,
runtime_logger_factory: impl runtime_logger::Factory,
runtime_client: RuntimeClient<Channel>,
build_log_recorder: impl LogRecorder,
secret_recorder: impl SecretRecorder,
active_deployment_getter: impl ActiveDeploymentsGetter,
Expand All @@ -42,8 +41,7 @@ impl DeploymentManager {
DeploymentManager {
pipeline: Pipeline::new(
kill_send.clone(),
abstract_dummy_factory,
runtime_logger_factory,
runtime_client,
build_log_recorder,
secret_recorder,
active_deployment_getter,
Expand Down Expand Up @@ -97,8 +95,7 @@ impl Pipeline {
/// deployments between the aforementioned tasks.
fn new(
kill_send: KillSender,
abstract_factory: impl provisioner_factory::AbstractFactory,
runtime_logger_factory: impl runtime_logger::Factory,
runtime_client: RuntimeClient<Channel>,
build_log_recorder: impl LogRecorder,
secret_recorder: impl SecretRecorder,
active_deployment_getter: impl ActiveDeploymentsGetter,
Expand All @@ -118,9 +115,8 @@ impl Pipeline {
));
tokio::spawn(run::task(
run_recv,
runtime_client,
kill_send,
abstract_factory,
runtime_logger_factory,
active_deployment_getter,
artifacts_path,
));
Expand Down
62 changes: 0 additions & 62 deletions deployer/src/deployment/provisioner_factory.rs

This file was deleted.

Loading

0 comments on commit ee342e4

Please sign in to comment.