Skip to content

Commit

Permalink
feat: Harden collector shutdown while updating (#597)
Browse files Browse the repository at this point in the history
* change service timeouts

* update non-windows with new timeout

* fix windows test

* stop the service before rollback

* fix install tests
  • Loading branch information
BinaryFissionGames committed Aug 4, 2022
1 parent 133ed52 commit 0aa145e
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 84 deletions.
2 changes: 2 additions & 0 deletions opamp/observiq/updater_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"io"
"os"
"path/filepath"
"time"

"go.uber.org/zap"
)

const updaterDir = "latest"
const defaultShutdownWaitTimeout = 30 * time.Second

// updaterManager handles working with the Updater binary
type updaterManager interface {
Expand Down
22 changes: 12 additions & 10 deletions opamp/observiq/updater_manager_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ const defaultOthersUpdaterName = "updater"

// othersUpdaterManager handles starting a Updater binary and watching it for failure with a timeout
type othersUpdaterManager struct {
tmpPath string
cwd string
updaterName string
logger *zap.Logger
tmpPath string
cwd string
updaterName string
logger *zap.Logger
shutdownWaitTimeout time.Duration
}

// newUpdaterManager creates a new UpdaterManager
Expand All @@ -49,10 +50,11 @@ func newUpdaterManager(defaultLogger *zap.Logger, tmpPath string) (updaterManage
}

return &othersUpdaterManager{
tmpPath: filepath.Clean(tmpPath),
logger: defaultLogger.Named("updater manager"),
updaterName: defaultOthersUpdaterName,
cwd: cwd,
tmpPath: filepath.Clean(tmpPath),
logger: defaultLogger.Named("updater manager"),
updaterName: defaultOthersUpdaterName,
cwd: cwd,
shutdownWaitTimeout: defaultShutdownWaitTimeout,
}, nil
}

Expand All @@ -79,8 +81,8 @@ func (m othersUpdaterManager) StartAndMonitorUpdater() error {
return fmt.Errorf("updater had an issue while starting: %w", err)
}

// See if we're still alive after 5 seconds
time.Sleep(5 * time.Second)
// See if we're still alive after waiting for the timeout to pass
time.Sleep(m.shutdownWaitTimeout)

// Updater might already be killed
if err := cmd.Process.Kill(); err != nil {
Expand Down
17 changes: 11 additions & 6 deletions opamp/observiq/updater_manager_others_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package observiq
import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -39,10 +40,11 @@ func TestNewOthersUpdaterManager(t *testing.T) {
require.NoError(t, err)

expected := &othersUpdaterManager{
tmpPath: tmpPath,
logger: logger.Named("updater manager"),
updaterName: "updater",
cwd: cwd,
tmpPath: tmpPath,
logger: logger.Named("updater manager"),
updaterName: "updater",
cwd: cwd,
shutdownWaitTimeout: 30 * time.Second,
}

actual, err := newUpdaterManager(logger, tmpPath)
Expand Down Expand Up @@ -73,8 +75,8 @@ func TestStartAndMonitorUpdater(t *testing.T) {
updateManager, err := newUpdaterManager(zap.NewNop(), tmpDir)
require.NoError(t, err)

oum := updateManager.(*othersUpdaterManager)
oum.cwd = tmpDir
updateManager.(*othersUpdaterManager).cwd = tmpDir
updateManager.(*othersUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

Expand All @@ -93,6 +95,7 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*othersUpdaterManager).cwd = tmpDir
updateManager.(*othersUpdaterManager).updaterName = "badupdater"
updateManager.(*othersUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

Expand All @@ -111,6 +114,7 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*othersUpdaterManager).cwd = tmpDir
updateManager.(*othersUpdaterManager).updaterName = "quickupdater"
updateManager.(*othersUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

Expand All @@ -129,6 +133,7 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*othersUpdaterManager).cwd = tmpDir
updateManager.(*othersUpdaterManager).updaterName = "slowupdater"
updateManager.(*othersUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

Expand Down
22 changes: 12 additions & 10 deletions opamp/observiq/updater_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ var _ updaterManager = (*windowsUpdaterManager)(nil)

// windowsUpdaterManager handles starting a Updater binary and watching it for failure with a timeout
type windowsUpdaterManager struct {
tmpPath string
updaterName string
cwd string
logger *zap.Logger
tmpPath string
updaterName string
cwd string
logger *zap.Logger
shutdownWaitTimeout time.Duration
}

// newUpdaterManager creates a new updaterManager
Expand All @@ -48,10 +49,11 @@ func newUpdaterManager(defaultLogger *zap.Logger, tmpPath string) (updaterManage
}

return &windowsUpdaterManager{
tmpPath: filepath.Clean(tmpPath),
logger: defaultLogger.Named("updater manager"),
updaterName: defaultWindowsUpdaterName,
cwd: cwd,
tmpPath: filepath.Clean(tmpPath),
logger: defaultLogger.Named("updater manager"),
updaterName: defaultWindowsUpdaterName,
cwd: cwd,
shutdownWaitTimeout: defaultShutdownWaitTimeout,
}, nil
}

Expand All @@ -72,8 +74,8 @@ func (m windowsUpdaterManager) StartAndMonitorUpdater() error {
return fmt.Errorf("updater had an issue while starting: %w", err)
}

// See if we're still alive after 5 seconds
time.Sleep(5 * time.Second)
// See if we're still alive after waiting for the timeout to pass
time.Sleep(m.shutdownWaitTimeout)

// Updater might already be killed
if err := cmd.Process.Kill(); err != nil {
Expand Down
17 changes: 13 additions & 4 deletions opamp/observiq/updater_manager_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func TestNewWindowsUpdaterManager(t *testing.T) {
require.NoError(t, err)

expected := &windowsUpdaterManager{
tmpPath: tmpPath,
logger: logger.Named("updater manager"),
updaterName: "updater.exe",
cwd: cwd,
tmpPath: tmpPath,
logger: logger.Named("updater manager"),
updaterName: "updater.exe",
cwd: cwd,
shutdownWaitTimeout: 30 * time.Second,
}

actual, err := newUpdaterManager(logger, tmpPath)
Expand Down Expand Up @@ -75,6 +76,8 @@ func TestStartAndMonitorUpdater(t *testing.T) {
require.NoError(t, err)

updateManager.(*windowsUpdaterManager).cwd = tmpDir
updateManager.(*windowsUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

assert.ErrorContains(t, err, "failed to copy updater to cwd")
Expand All @@ -91,6 +94,8 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*windowsUpdaterManager).cwd = tmpDir
updateManager.(*windowsUpdaterManager).updaterName = "badupdater"
updateManager.(*windowsUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

assert.ErrorContains(t, err, "updater had an issue while starting:")
Expand All @@ -107,6 +112,8 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*windowsUpdaterManager).cwd = tmpDir
updateManager.(*windowsUpdaterManager).updaterName = "quickupdater.exe"
updateManager.(*windowsUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

assert.EqualError(t, err, "updater failed to update collector")
Expand All @@ -123,6 +130,8 @@ func TestStartAndMonitorUpdater(t *testing.T) {

updateManager.(*windowsUpdaterManager).cwd = tmpDir
updateManager.(*windowsUpdaterManager).updaterName = "slowupdater.exe"
updateManager.(*windowsUpdaterManager).shutdownWaitTimeout = 5 * time.Second

err = updateManager.StartAndMonitorUpdater()

assert.ErrorContains(t, err, "updater failed to update collector")
Expand Down
2 changes: 2 additions & 0 deletions service/com.observiq.collector.plist
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@
<true/>
<key>WorkingDirectory</key>
<string>[INSTALLDIR]</string>
<key>ExitTimeOut</key>
<integer>20</integer>
</dict>
</plist>
2 changes: 1 addition & 1 deletion service/observiq-otel-collector.service
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Environment=OIQ_OTEL_COLLECTOR_STORAGE=/opt/observiq-otel-collector/storage
WorkingDirectory=/opt/observiq-otel-collector
ExecStart=/opt/observiq-otel-collector/observiq-otel-collector --config config.yaml
SuccessExitStatus=0
TimeoutSec=120
TimeoutSec=20
StandardOutput=journal
Restart=on-failure
RestartSec=5s
Expand Down
26 changes: 25 additions & 1 deletion updater/cmd/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"time"

"github.com/observiq/observiq-otel-collector/packagestate"
"github.com/observiq/observiq-otel-collector/updater/internal/action"
"github.com/observiq/observiq-otel-collector/updater/internal/install"
"github.com/observiq/observiq-otel-collector/updater/internal/logging"
"github.com/observiq/observiq-otel-collector/updater/internal/path"
"github.com/observiq/observiq-otel-collector/updater/internal/rollback"
"github.com/observiq/observiq-otel-collector/updater/internal/service"
"github.com/observiq/observiq-otel-collector/updater/internal/state"
"github.com/observiq/observiq-otel-collector/updater/internal/version"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -68,12 +70,34 @@ func main() {
}

rb := rollback.NewRollbacker(logger, installDir)
// Stop the service before backing up the install directory;
// We want to stop as early as possible so that we don't hit the collector's timeout
// while it waits to be shutdown.
service := service.NewService(logger, installDir)
if err := service.Stop(); err != nil {
logger.Error("Failed to stop service", zap.Error(err))
fail(logger, installDir)
}
// Record that we stopped the service
rb.AppendAction(action.NewServiceStopAction(service))

logger.Debug("Stopped the service")

if err := rb.Backup(); err != nil {
logger.Error("Failed to backup", zap.Error(err))

// Set the state to failed before rollback so collector knows it failed
if setErr := monitor.SetState(packagestate.CollectorPackageName, protobufs.PackageStatus_InstallFailed, err); setErr != nil {
logger.Error("Failed to set state on backup failure", zap.Error(setErr))
}

rb.Rollback()

logger.Error("Rollback complete")
fail(logger, installDir)
}

installer := install.NewInstaller(logger, installDir)
installer := install.NewInstaller(logger, installDir, service)
if err := installer.Install(rb); err != nil {
logger.Error("Failed to install", zap.Error(err))

Expand Down
17 changes: 5 additions & 12 deletions updater/internal/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,20 @@ type Installer struct {
}

// NewInstaller returns a new instance of an Installer.
func NewInstaller(logger *zap.Logger, installDir string) *Installer {
namedLogger := logger.Named("installer")
func NewInstaller(logger *zap.Logger, installDir string, service service.Service) *Installer {
return &Installer{
latestDir: path.LatestDir(installDir),
svc: service.NewService(namedLogger, installDir),
svc: service,
installDir: installDir,
backupDir: path.BackupDir(installDir),
logger: namedLogger,
logger: logger.Named("installer"),
}
}

// Install installs the unpacked artifacts in latestDir to installDir,
// as well as installing the new service file using the installer's Service interface
// as well as installing the new service file using the installer's Service interface.
// It then starts the service.
func (i Installer) Install(rb rollback.ActionAppender) error {
// Stop service
if err := i.svc.Stop(); err != nil {
return fmt.Errorf("failed to stop service: %w", err)
}
rb.AppendAction(action.NewServiceStopAction(i.svc))
i.logger.Debug("Service stopped")

// If JMX jar exists outside of install directory, make sure that gets backed up
if err := i.attemptSpecialJMXJarInstall(rb); err != nil {
return fmt.Errorf("failed to process special JMX jar: %w", err)
Expand Down
Loading

0 comments on commit 0aa145e

Please sign in to comment.