Skip to content

Commit

Permalink
Merge pull request #3898 from kinvolk/asymmetric/user-toml-watcher
Browse files Browse the repository at this point in the history
Approved by: @nobody from Nowhere
Merged by: The Sentinels
  • Loading branch information
thesentinels committed Jan 17, 2018
2 parents 1730271 + fcdec28 commit e7aa623
Show file tree
Hide file tree
Showing 11 changed files with 687 additions and 60 deletions.
2 changes: 2 additions & 0 deletions components/core/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const PKG_PATH: &'static str = "hab/pkgs";
/// be used with extreme caution.
pub const FS_ROOT_ENVVAR: &'static str = "FS_ROOT";
pub const SYSTEMDRIVE_ENVVAR: &'static str = "SYSTEMDRIVE";
/// The file where user-defined configuration for each service is found.
pub const USER_CONFIG_FILE: &'static str = "user.toml";

lazy_static! {
/// The default filesystem root path.
Expand Down
48 changes: 45 additions & 3 deletions components/sup/src/manager/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};

use manager::debug::{IndentedStructFormatter, IndentedToString};

const WATCHER_DELAY_MS: u64 = 2_000;
pub const WATCHER_DELAY_MS: u64 = 2_000;
static LOGKEY: &'static str = "FW";

/// A set of callbacks for the watched file events.
Expand Down Expand Up @@ -1223,16 +1223,54 @@ where
return FileWatcher::<C, RecommendedWatcher>::create(path, callbacks);
}

pub fn default_file_watcher_with_no_initial_event<P, C>(
path: P,
callbacks: C,
) -> Result<FileWatcher<C, RecommendedWatcher>>
where
P: Into<PathBuf>,
C: Callbacks,
{
return FileWatcher::<C, RecommendedWatcher>::create_with_no_initial_event(path, callbacks);
}

impl<C: Callbacks, W: Watcher> FileWatcher<C, W> {
/// Creates a new `FileWatcher`.
///
/// This will create an instance of `W` and start watching the
/// paths.
/// paths. When looping the file watcher, it will emit an initial
/// "file appeared" event if the watched file existed when the
/// file watcher was created.
///
/// Will return `Error::NotifyCreateError` if creating the watcher
/// fails. In case of watching errors, it returns
/// `Error::NotifyError`.
pub fn create<P>(path: P, callbacks: C) -> Result<Self>
where
P: Into<PathBuf>,
{
Self::create_instance(path, callbacks, true)
}

/// Creates a new `FileWatcher`.
///
/// This will create an instance of `W` and start watching the
/// paths. When looping the file watcher, it will not emit any
/// initial "file appeared" event even if the watched file existed
/// when the file watcher was created.
///
/// Will return `Error::NotifyCreateError` if creating the watcher
/// fails. In case of watching errors, it returns
/// `Error::NotifyError`.
pub fn create_with_no_initial_event<P>(path: P, callbacks: C) -> Result<Self>
where
P: Into<PathBuf>,
{
Self::create_instance(path, callbacks, false)
}

// Creates an instance of the FileWatcher.
fn create_instance<P>(path: P, callbacks: C, send_initial_event: bool) -> Result<Self>
where
P: Into<PathBuf>,
{
Expand All @@ -1253,7 +1291,11 @@ impl<C: Callbacks, W: Watcher> FileWatcher<C, W> {
.watch(&directory, RecursiveMode::NonRecursive)
.map_err(|err| sup_error!(Error::NotifyError(err)))?;
}
let initial_real_file = paths.real_file.clone();
let initial_real_file = if send_initial_event {
paths.real_file.clone()
} else {
None
};

Ok(Self {
callbacks: callbacks,
Expand Down
110 changes: 81 additions & 29 deletions components/sup/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod service_updater;
mod spec_watcher;
mod file_watcher;
mod peer_watcher;
mod user_config_watcher;
mod sys;

use std::collections::HashMap;
Expand All @@ -35,6 +36,8 @@ use std::thread;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::mem;
use std::ops::DerefMut;

use butterfly;
use butterfly::member::Member;
Expand All @@ -59,6 +62,7 @@ use self::service::{DesiredState, Pkg, ProcessState, StartStyle};
use self::service_updater::ServiceUpdater;
use self::spec_watcher::{SpecWatcher, SpecWatcherEvent};
use self::peer_watcher::PeerWatcher;
use self::user_config_watcher::UserConfigWatcher;
use VERSION;
use error::{Error, Result, SupError};
use config::GossipListenAddr;
Expand Down Expand Up @@ -146,12 +150,13 @@ pub struct Manager {
launcher: LauncherCli,
services: Arc<RwLock<Vec<Service>>>,
updater: ServiceUpdater,
watcher: SpecWatcher,
peer_watcher: Option<PeerWatcher>,
spec_watcher: SpecWatcher,
user_config_watcher: UserConfigWatcher,
organization: Option<String>,
self_updater: Option<SelfUpdater>,
service_states: HashMap<PackageIdent, Timespec>,
sys: Arc<Sys>,
peer_watcher: Option<PeerWatcher>,
}

impl Manager {
Expand Down Expand Up @@ -318,12 +323,13 @@ impl Manager {
events_group: cfg.eventsrv_group,
launcher: launcher,
services: services,
watcher: SpecWatcher::run(&fs_cfg.specs_path)?,
peer_watcher: peer_watcher,
spec_watcher: SpecWatcher::run(&fs_cfg.specs_path)?,
user_config_watcher: UserConfigWatcher::new(),
fs_cfg: Arc::new(fs_cfg),
organization: cfg.organization,
service_states: HashMap::new(),
sys: Arc::new(sys),
peer_watcher: peer_watcher,
})
}

Expand Down Expand Up @@ -520,6 +526,16 @@ impl Manager {
0,
);
}

if let Err(e) = self.user_config_watcher.add(&service) {
outputln!(
"Unable to start UserConfigWatcher for {}: {}",
service.spec_ident,
e
);
return;
}

self.updater.add(&service);
self.services
.write()
Expand All @@ -528,7 +544,7 @@ impl Manager {
}

pub fn run(&mut self) -> Result<()> {
self.start_initial_services_from_watcher()?;
self.start_initial_services_from_spec_watcher()?;

outputln!(
"Starting gossip-listener on {}",
Expand Down Expand Up @@ -564,8 +580,9 @@ impl Manager {
self.shutdown();
return Ok(());
}
self.update_running_services_from_watcher()?;
self.update_running_services_from_spec_watcher()?;
self.update_peers_from_watch_file()?;
self.update_running_services_from_user_config_watcher();
self.check_for_updated_packages();
self.restart_elections();
self.census_ring.update_from_rumors(
Expand Down Expand Up @@ -650,6 +667,7 @@ impl Manager {
}
}

// Creates a rumor for the specified service.
fn gossip_latest_service_rumor(&self, service: &Service) {
let mut incarnation = 1;
{
Expand Down Expand Up @@ -682,7 +700,7 @@ impl Manager {
active_services.push(service.spec_ident.clone());
}

for loaded in self.watcher
for loaded in self.spec_watcher
.specs_from_watch_path()
.unwrap()
.values()
Expand Down Expand Up @@ -790,7 +808,7 @@ impl Manager {
// add services that are not active but are being watched for changes
// These would include stopped persistent services or other
// persistent services that failed to load
for down in self.watcher
for down in self.spec_watcher
.specs_from_watch_path()
.unwrap()
.values()
Expand Down Expand Up @@ -831,7 +849,7 @@ impl Manager {
/// service. Passing a value of `false` will let the Launcher keep the service running. This
/// useful if you want the Supervisor to shutdown temporarily and then come back and re-attach
/// to all running processes.
fn remove_service(&self, service: &mut Service, term: bool) {
fn remove_service(&mut self, service: &mut Service, term: bool) {
// JW TODO: Update service rumor to remove service from cluster
if term {
service.stop(&self.launcher);
Expand All @@ -855,6 +873,13 @@ impl Manager {
err
);
}

if let Err(_) = self.user_config_watcher.remove(service) {
debug!(
"Error stopping user-config watcher thread for service {}",
service
);
}
}

fn write_service<W: ?Sized>(
Expand All @@ -879,20 +904,30 @@ impl Manager {
self.butterfly.restart_elections();
}

fn shutdown(&self) {
fn shutdown(&mut self) {
outputln!("Gracefully departing from butterfly network.");
self.butterfly.set_departed();

let mut services = self.services.write().expect("Services lock is poisend!");
let mut svcs = Vec::new();

for mut service in services.drain(..) {
// The problem we're trying to work around here by adding this block is that `write`
// creates an immutable borrow on `self`, and `self.remove_service` needs `&mut self`.
// The solution is to introduce the block to drop the immutable borrow before the call to
// `self.remove_service`, and use `mem::swap` to move the services to a variable defined
// outside the block while we have the lock.
{
let mut services = self.services.write().expect("Services lock is poisoned!");
mem::swap(services.deref_mut(), &mut svcs);
}

for mut service in svcs.drain(..) {
self.remove_service(&mut service, false);
}
release_process_lock(&self.fs_cfg);
}

fn start_initial_services_from_watcher(&mut self) -> Result<()> {
for service_event in self.watcher.initial_events()? {
fn start_initial_services_from_spec_watcher(&mut self) -> Result<()> {
for service_event in self.spec_watcher.initial_events()? {
match service_event {
SpecWatcherEvent::AddService(spec) => {
if spec.desired_state == DesiredState::Up {
Expand All @@ -906,7 +941,7 @@ impl Manager {
Ok(())
}

fn update_running_services_from_watcher(&mut self) -> Result<()> {
fn update_running_services_from_spec_watcher(&mut self) -> Result<()> {
let mut active_specs = HashMap::new();
for service in self.services
.read()
Expand All @@ -917,7 +952,7 @@ impl Manager {
active_specs.insert(spec.ident.name.clone(), spec);
}

for service_event in self.watcher.new_events(active_specs)? {
for service_event in self.spec_watcher.new_events(active_specs)? {
match service_event {
SpecWatcherEvent::AddService(spec) => {
if spec.desired_state == DesiredState::Up {
Expand Down Expand Up @@ -947,21 +982,38 @@ impl Manager {
}
}

fn remove_service_for_spec(&mut self, spec: &ServiceSpec) -> Result<()> {
fn update_running_services_from_user_config_watcher(&mut self) {
let mut services = self.services.write().expect("Services lock is poisoned");
// TODO fn: storing services as a `Vec` is a bit crazy when you have to do these
// shenanigans--maybe we want to consider changing the data structure in the future?
let services_idx = match services.iter().position(|ref s| s.spec_ident == spec.ident) {
Some(i) => i,
None => {
outputln!(
"Tried to remove service for {} but could not find it running, skipping",
&spec.ident
);
return Ok(());

for service in services.iter_mut() {
if self.user_config_watcher.have_events_for(service) {
outputln!("Reloading service {}", &service.spec_ident);
service.user_config_updated = true;
}
};
let mut service = services.remove(services_idx);
}
}

fn remove_service_for_spec(&mut self, spec: &ServiceSpec) -> Result<()> {
let mut service: Service;

{
let mut services = self.services.write().expect("Services lock is poisoned");
// TODO fn: storing services as a `Vec` is a bit crazy when you have to do these
// shenanigans--maybe we want to consider changing the data structure in the future?
let services_idx = match services.iter().position(|ref s| s.spec_ident == spec.ident) {
Some(i) => i,
None => {
outputln!(
"Tried to remove service for {} but could not find it running, skipping",
&spec.ident
);
return Ok(());
}
};

service = services.remove(services_idx);
}

self.remove_service(&mut service, true);
Ok(())
}
Expand Down
Loading

0 comments on commit e7aa623

Please sign in to comment.