diff --git a/cli/internal/daemonclient/daemonclient.go b/cli/internal/daemonclient/daemonclient.go index c415cd3991f83..7e1afd0088be6 100644 --- a/cli/internal/daemonclient/daemonclient.go +++ b/cli/internal/daemonclient/daemonclient.go @@ -32,24 +32,24 @@ func New(client *connector.Client) *DaemonClient { } // GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs -func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) { +func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) { resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{ Hash: hash, OutputGlobs: repoRelativeOutputGlobs, }) if err != nil { - return nil, err + return nil, 0, err } - - return resp.ChangedOutputGlobs, nil + return resp.ChangedOutputGlobs, int(resp.TimeSaved), nil } // NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten -func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error { +func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error { _, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{ Hash: hash, OutputGlobs: repoRelativeOutputGlobs.Inclusions, OutputExclusionGlobs: repoRelativeOutputGlobs.Exclusions, + TimeSaved: uint64(timeSaved), }) return err } diff --git a/cli/internal/runcache/output_watcher.go b/cli/internal/runcache/output_watcher.go index 5f90f0ee1ddc9..d9f744b523c66 100644 --- a/cli/internal/runcache/output_watcher.go +++ b/cli/internal/runcache/output_watcher.go @@ -9,9 +9,9 @@ import ( // OutputWatcher instances are responsible for tracking changes to task outputs type OutputWatcher interface { // GetChangedOutputs returns which of the given globs have changed since the specified hash was last run - GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) + GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) // NotifyOutputsWritten tells the watcher that the given globs have been cached with the specified hash - NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error + NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error } // NoOpOutputWatcher implements OutputWatcher, but always considers every glob to have changed @@ -21,12 +21,12 @@ var _ OutputWatcher = (*NoOpOutputWatcher)(nil) // GetChangedOutputs implements OutputWatcher.GetChangedOutputs. // Since this is a no-op watcher, no tracking is done. -func (NoOpOutputWatcher) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) { - return repoRelativeOutputGlobs, nil +func (NoOpOutputWatcher) GetChangedOutputs(_ context.Context, _ string, repoRelativeOutputGlobs []string) ([]string, int, error) { + return repoRelativeOutputGlobs, 0, nil } // NotifyOutputsWritten implements OutputWatcher.NotifyOutputsWritten. // Since this is a no-op watcher, consider all globs to have changed -func (NoOpOutputWatcher) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error { +func (NoOpOutputWatcher) NotifyOutputsWritten(_ context.Context, _ string, _ fs.TaskOutputs, _ int) error { return nil } diff --git a/cli/internal/runcache/runcache.go b/cli/internal/runcache/runcache.go index f977245bfc3bd..d362f0bb80db9 100644 --- a/cli/internal/runcache/runcache.go +++ b/cli/internal/runcache/runcache.go @@ -118,7 +118,8 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe return cache.ItemStatus{Local: false, Remote: false}, 0, nil } - changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions) + changedOutputGlobs, timeSavedFromDaemon, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions) + if err != nil { progressLogger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)) prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err))) @@ -149,13 +150,14 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe return cache.ItemStatus{Local: false, Remote: false}, 0, nil } - if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil { + if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, timeSavedFromDaemon); err != nil { // Don't fail the whole operation just because we failed to watch the outputs prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err))) } } else { // If no outputs have changed, that means we have a local cache hit. cacheStatus.Local = true + timeSaved = timeSavedFromDaemon prefixedUI.Warn(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID)) } @@ -279,7 +281,7 @@ func (tc *TaskCache) SaveOutputs(ctx context.Context, logger hclog.Logger, termi if err = tc.rc.cache.Put(tc.rc.repoRoot, tc.hash, duration, relativePaths); err != nil { return err } - err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs) + err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, duration) if err != nil { // Don't fail the cache write because we also failed to record it, we will just do // extra I/O in the future restoring files that haven't changed from cache diff --git a/cli/internal/turbodprotocol/turbod.proto b/cli/internal/turbodprotocol/turbod.proto index cf7c55447197e..1de93158614cf 100644 --- a/cli/internal/turbodprotocol/turbod.proto +++ b/cli/internal/turbodprotocol/turbod.proto @@ -34,6 +34,7 @@ message NotifyOutputsWrittenRequest { repeated string output_globs = 1; string hash = 2; repeated string output_exclusion_globs = 3; + uint64 time_saved = 4; } message NotifyOutputsWrittenResponse {} @@ -45,6 +46,7 @@ message GetChangedOutputsRequest { message GetChangedOutputsResponse { repeated string changed_output_globs = 1; + uint64 time_saved = 2; } message DaemonStatus { diff --git a/crates/turborepo-lib/src/daemon/client.rs b/crates/turborepo-lib/src/daemon/client.rs index 1d44f7b4d182d..16ee2ca46a788 100644 --- a/crates/turborepo-lib/src/daemon/client.rs +++ b/crates/turborepo-lib/src/daemon/client.rs @@ -90,12 +90,14 @@ impl DaemonClient { hash: String, output_globs: Vec, output_exclusion_globs: Vec, + time_saved: u64, ) -> Result<(), DaemonError> { self.client .notify_outputs_written(proto::NotifyOutputsWrittenRequest { hash, output_globs, output_exclusion_globs, + time_saved, }) .await?; diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 7d1183dbbe877..16e67b50ca4e0 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -13,10 +13,10 @@ //! globs, and to query for changes for those globs. use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex as StdMutux, }, time::{Duration, Instant}, }; @@ -58,6 +58,8 @@ pub struct DaemonServer { shutdown_rx: Option>, running: Arc, + + times_saved: Arc>>, } #[derive(Debug)] @@ -98,6 +100,7 @@ impl DaemonServer { shutdown_rx: Some(recv_shutdown), running: Arc::new(AtomicBool::new(true)), + times_saved: Arc::new(StdMutux::new(HashMap::new())), }) } } @@ -255,6 +258,10 @@ impl proto::turbod_server::Turbod for DaemonServer< ) -> Result, tonic::Status> { let inner = request.into_inner(); + { + let mut times_saved = self.times_saved.lock().expect("times saved lock poisoned"); + times_saved.insert(inner.hash.clone(), inner.time_saved); + } match self .watcher .watch_globs( @@ -277,17 +284,21 @@ impl proto::turbod_server::Turbod for DaemonServer< request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); + let hash = Arc::new(inner.hash); let changed = self .watcher - .changed_globs( - &Arc::new(inner.hash), - HashSet::from_iter(inner.output_globs), - ) + .changed_globs(&hash, HashSet::from_iter(inner.output_globs)) .await; + let time_saved = { + let times_saved = self.times_saved.lock().expect("times saved lock poisoned"); + times_saved.get(hash.as_str()).copied().unwrap_or_default() + }; + match changed { Ok(changed) => Ok(tonic::Response::new(proto::GetChangedOutputsResponse { changed_output_globs: changed.into_iter().collect(), + time_saved: time_saved, })), Err(e) => { error!("flush directory operation failed: {:?}", e);