Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update-now functionality to TUF autoupdater #1579

Merged
merged 12 commits into from
Feb 8, 2024
Merged
6 changes: 5 additions & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl

// Create the control service and services that depend on it
var runner *desktopRunner.DesktopUsersProcessesRunner
var actionsQueue *actionqueue.ActionQueue
directionless marked this conversation as resolved.
Show resolved Hide resolved
if k.ControlServerURL() == "" {
slogger.Log(ctx, slog.LevelDebug,
"control server URL not set, will not create control service",
Expand Down Expand Up @@ -374,7 +375,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
controlService.RegisterConsumer(desktopMenuSubsystemName, runner)

// create an action queue for all other action style commands
actionsQueue := actionqueue.New(
actionsQueue = actionqueue.New(
k,
actionqueue.WithContext(ctx),
actionqueue.WithStore(k.ControlServerActionsStore()),
Expand Down Expand Up @@ -475,6 +476,9 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
}

runGroup.Add("tufAutoupdater", tufAutoupdater.Execute, tufAutoupdater.Interrupt)
if actionsQueue != nil {
actionsQueue.RegisterActor(tuf.AutoupdateSubsystemName, tufAutoupdater)
}
}

// Run the legacy autoupdater only if autoupdating is enabled and the new autoupdater
Expand Down
36 changes: 18 additions & 18 deletions ee/control/actionqueue/actionqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (a action) String() string {
return fmt.Sprintf("ID: %s; type: %s; valid until: %d", a.ID, a.Type, a.ValidUntil)
}

type actionqueue struct {
type ActionQueue struct {
ctx context.Context // nolint:containedctx
actors map[string]actor
store types.KVStore
Expand All @@ -47,34 +47,34 @@ type actionqueue struct {
cancel context.CancelFunc
}

type actionqueueOption func(*actionqueue)
type actionqueueOption func(*ActionQueue)

func WithStore(store types.KVStore) actionqueueOption {
return func(aq *actionqueue) {
return func(aq *ActionQueue) {
aq.store = store
}
}

func WithOldNotificationsStore(store types.KVStore) actionqueueOption {
return func(aq *actionqueue) {
return func(aq *ActionQueue) {
aq.oldNotificationsStore = store
}
}

func WithCleanupInterval(cleanupInterval time.Duration) actionqueueOption {
return func(aq *actionqueue) {
return func(aq *ActionQueue) {
aq.actionCleanupInterval = cleanupInterval
}
}

func WithContext(ctx context.Context) actionqueueOption {
return func(aq *actionqueue) {
return func(aq *ActionQueue) {
aq.ctx = ctx
}
}

func New(k types.Knapsack, opts ...actionqueueOption) *actionqueue {
aq := &actionqueue{
func New(k types.Knapsack, opts ...actionqueueOption) *ActionQueue {
aq := &ActionQueue{
ctx: context.Background(),
actors: make(map[string]actor, 0),
actionCleanupInterval: defaultCleanupInterval,
Expand All @@ -92,7 +92,7 @@ func New(k types.Knapsack, opts ...actionqueueOption) *actionqueue {
return aq
}

func (aq *actionqueue) Update(data io.Reader) error {
func (aq *ActionQueue) Update(data io.Reader) error {
// We want to unmarshal each action separately, so that we don't fail to send all actions
// if only some are malformed.
var rawActionsToProcess []json.RawMessage
Expand Down Expand Up @@ -139,16 +139,16 @@ func (aq *actionqueue) Update(data io.Reader) error {
return nil
}

func (aq *actionqueue) RegisterActor(actorType string, actorToRegister actor) {
func (aq *ActionQueue) RegisterActor(actorType string, actorToRegister actor) {
aq.actors[actorType] = actorToRegister
}

func (aq *actionqueue) StartCleanup() error {
func (aq *ActionQueue) StartCleanup() error {
aq.runCleanup()
return nil
}

func (aq *actionqueue) runCleanup() {
func (aq *ActionQueue) runCleanup() {
ctx, cancel := context.WithCancel(aq.ctx)
aq.cancel = cancel

Expand All @@ -168,11 +168,11 @@ func (aq *actionqueue) runCleanup() {
}
}

func (aq *actionqueue) StopCleanup(err error) {
func (aq *ActionQueue) StopCleanup(err error) {
aq.cancel()
}

func (aq *actionqueue) storeActionRecord(actionToStore action) {
func (aq *ActionQueue) storeActionRecord(actionToStore action) {
rawAction, err := json.Marshal(actionToStore)
if err != nil {
aq.slogger.Log(context.TODO(), slog.LevelError,
Expand All @@ -190,7 +190,7 @@ func (aq *actionqueue) storeActionRecord(actionToStore action) {
}
}

func (aq *actionqueue) isActionNew(id string) bool {
func (aq *ActionQueue) isActionNew(id string) bool {
completedActionRaw, err := aq.store.Get([]byte(id))
if err != nil {
aq.slogger.Log(context.TODO(), slog.LevelError,
Expand Down Expand Up @@ -232,7 +232,7 @@ func (aq *actionqueue) isActionNew(id string) bool {
return completedActionRaw == nil
}

func (aq *actionqueue) isActionValid(a action) bool {
func (aq *ActionQueue) isActionValid(a action) bool {
if a.ID == "" {
aq.slogger.Log(context.TODO(), slog.LevelWarn,
"action ID is empty",
Expand All @@ -252,7 +252,7 @@ func (aq *actionqueue) isActionValid(a action) bool {
return a.ValidUntil > time.Now().Unix()
}

func (aq *actionqueue) actorForAction(a action) (actor, error) {
func (aq *ActionQueue) actorForAction(a action) (actor, error) {
if len(aq.actors) == 0 {
return nil, errors.New("no actor registered")
}
Expand All @@ -269,7 +269,7 @@ func (aq *actionqueue) actorForAction(a action) (actor, error) {
return actor, nil
}

func (aq *actionqueue) cleanupActions() {
func (aq *ActionQueue) cleanupActions() {
// Read through all keys in bucket to determine which ones are old enough to be deleted
keysToDelete := make([][]byte, 0)

Expand Down
88 changes: 85 additions & 3 deletions ee/tuf/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"

"github.com/kolide/kit/version"
Expand Down Expand Up @@ -44,10 +46,29 @@ const (

var binaries = []autoupdatableBinary{binaryLauncher, binaryOsqueryd}
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved

var autoupdatableBinaryMap = map[string]autoupdatableBinary{
"launcher": binaryLauncher,
"osqueryd": binaryOsqueryd,
}

type ReleaseFileCustomMetadata struct {
Target string `json:"target"`
}

// Control server subsystem (used to send "update now" commands)
const AutoupdateSubsystemName = "autoupdate"

type (
controlServerAutoupdateRequest struct {
BinariesToUpdate []binaryToUpdate `json:"binaries_to_update"`
}

// In the future, we may allow for setting a particular version here as well
binaryToUpdate struct {
Name string `json:"name"`
}
)

type librarian interface {
Available(binary autoupdatableBinary, targetFilename string) bool
AddToLibrary(binary autoupdatableBinary, currentVersion string, targetFilename string, targetMetadata data.TargetFileMeta) error
Expand All @@ -65,6 +86,7 @@ type TufAutoupdater struct {
osquerierRetryInterval time.Duration
knapsack types.Knapsack
store types.KVStore // stores autoupdater errors for kolide_tuf_autoupdater_errors table
updateLock *sync.Mutex
interrupt chan struct{}
interrupted bool
signalRestart chan error
Expand Down Expand Up @@ -93,6 +115,7 @@ func NewTufAutoupdater(ctx context.Context, k types.Knapsack, metadataHttpClient
interrupt: make(chan struct{}, 1),
signalRestart: make(chan error, 1),
store: k.AutoupdateErrorsStore(),
updateLock: &sync.Mutex{},
osquerier: osquerier,
osquerierRetryInterval: 30 * time.Second,
slogger: k.Slogger().With("component", "tuf_autoupdater"),
Expand Down Expand Up @@ -196,7 +219,7 @@ func (ta *TufAutoupdater) Execute() (err error) {
defer cleanupTicker.Stop()

for {
if err := ta.checkForUpdate(); err != nil {
if err := ta.checkForUpdate(binaries); err != nil {
ta.storeError(err)
ta.slogger.Log(context.TODO(), slog.LevelError,
"error checking for update",
Expand Down Expand Up @@ -233,6 +256,62 @@ func (ta *TufAutoupdater) Interrupt(_ error) {
ta.interrupt <- struct{}{}
}

// Do satisfies the actionqueue.actor interface; it allows the control server to send
// requests down to autoupdate immediately.
func (ta *TufAutoupdater) Do(data io.Reader) error {
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
var updateRequest controlServerAutoupdateRequest
if err := json.NewDecoder(data).Decode(&updateRequest); err != nil {
ta.slogger.Log(context.TODO(), slog.LevelWarn,
"received update request in unexpected format from control server, discarding",
"err", err,
)
// We don't return an error because we don't want the actionqueue to retry this request
return nil
}

binariesToUpdate := make([]autoupdatableBinary, 0)
for _, b := range updateRequest.BinariesToUpdate {
if val, ok := autoupdatableBinaryMap[b.Name]; ok {
Comment on lines +272 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could invert this, and use https://pkg.go.dev/slices#DeleteFunc to remove non matches from updateRequest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got to build a new slice from updateRequest anyway, might as well leave the request unmodified

binariesToUpdate = append(binariesToUpdate, val)
continue
}
ta.slogger.Log(context.TODO(), slog.LevelWarn,
"received request from control server autoupdate unknown binary, ignoring",
"unknown_binary", b.Name,
)
}

if len(binariesToUpdate) == 0 {
ta.slogger.Log(context.TODO(), slog.LevelDebug,
"received request from control server to check for update now, but no valid binaries specified in request",
)
return nil
}

ta.slogger.Log(context.TODO(), slog.LevelInfo,
"received request from control server to check for update now",
"binaries_to_update", fmt.Sprintf("%+v", binariesToUpdate),
)

if err := ta.checkForUpdate(binariesToUpdate); err != nil {
ta.storeError(err)
ta.slogger.Log(context.TODO(), slog.LevelError,
"error checking for update per control server request",
"binaries_to_update", fmt.Sprintf("%+v", binariesToUpdate),
"err", err,
)

return fmt.Errorf("could not check for update: %w", err)
}

ta.slogger.Log(context.TODO(), slog.LevelInfo,
"successfully checked for update per control server request",
"binaries_to_update", fmt.Sprintf("%+v", binariesToUpdate),
)

return nil
}

// tidyLibrary gets the current running version for each binary (so that the current version is not removed)
// and then asks the update library manager to tidy the update library.
func (ta *TufAutoupdater) tidyLibrary() {
Expand Down Expand Up @@ -287,7 +366,10 @@ func (ta *TufAutoupdater) currentRunningVersion(binary autoupdatableBinary) (str

// checkForUpdate fetches latest metadata from the TUF server, then checks to see if there's
// a new release that we should download. If so, it will add the release to our updates library.
func (ta *TufAutoupdater) checkForUpdate() error {
func (ta *TufAutoupdater) checkForUpdate(binariesToCheck []autoupdatableBinary) error {
ta.updateLock.Lock()
defer ta.updateLock.Unlock()

// Attempt an update a couple times before returning an error -- sometimes we just hit caching issues.
errs := make([]error, 0)
successfulUpdate := false
Expand All @@ -314,7 +396,7 @@ func (ta *TufAutoupdater) checkForUpdate() error {
// Check for and download any new releases that are available
updatesDownloaded := make(map[autoupdatableBinary]string)
updateErrors := make([]error, 0)
for _, binary := range binaries {
for _, binary := range binariesToCheck {
directionless marked this conversation as resolved.
Show resolved Hide resolved
downloadedUpdateVersion, err := ta.downloadUpdate(binary, targets)
if err != nil {
updateErrors = append(updateErrors, fmt.Errorf("could not download update for %s: %w", binary, err))
Expand Down
Loading
Loading