Skip to content

Commit

Permalink
Parallelize allmulti
Browse files Browse the repository at this point in the history
  • Loading branch information
jessepeterson committed Jun 30, 2021
1 parent 968a174 commit 970c5b3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 75 deletions.
49 changes: 35 additions & 14 deletions storage/allmulti/allmulti.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,57 @@ func New(logger log.Logger, stores ...storage.AllStorage) *MultiAllStorage {
return &MultiAllStorage{logger: logger, stores: stores}
}

type storageErrorer func(storage.AllStorage) error
type returnCollector struct {
storeNumber int
returnValue interface{}
err error
}

type errRunner func(storage.AllStorage) (interface{}, error)

func (ms *MultiAllStorage) runAndLogOthers(storageCallback storageErrorer) {
for n, storage := range ms.stores[1:] {
if err := storageCallback(storage); err != nil {
ms.logger.Info("msg", n+1, "err", err)
func (ms *MultiAllStorage) execStores(r errRunner) (interface{}, error) {
retChan := make(chan *returnCollector)
for i, store := range ms.stores {
go func(n int, s storage.AllStorage) {
val, err := r(s)
retChan <- &returnCollector{
storeNumber: n,
returnValue: val,
err: err,
}
}(i, store)
}
var finalErr error
var finalValue interface{}
for range ms.stores {
sErr := <-retChan
if sErr.storeNumber == 0 {
finalErr = sErr.err
finalValue = sErr.returnValue
} else if sErr.err != nil {
ms.logger.Info("n", sErr.storeNumber, "err", sErr.err)
}
}
return finalValue, finalErr
}

func (ms *MultiAllStorage) StoreAuthenticate(r *mdm.Request, msg *mdm.Authenticate) error {
err := ms.stores[0].StoreAuthenticate(r, msg)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.StoreAuthenticate(r, msg)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.StoreAuthenticate(r, msg)
})
return err
}

func (ms *MultiAllStorage) StoreTokenUpdate(r *mdm.Request, msg *mdm.TokenUpdate) error {
err := ms.stores[0].StoreTokenUpdate(r, msg)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.StoreTokenUpdate(r, msg)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.StoreTokenUpdate(r, msg)
})
return err
}

func (ms *MultiAllStorage) Disable(r *mdm.Request) error {
err := ms.stores[0].Disable(r)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.Disable(r)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.Disable(r)
})
return err
}
13 changes: 5 additions & 8 deletions storage/allmulti/bstoken.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ import (
)

func (ms *MultiAllStorage) StoreBootstrapToken(r *mdm.Request, msg *mdm.SetBootstrapToken) error {
err := ms.stores[0].StoreBootstrapToken(r, msg)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.StoreBootstrapToken(r, msg)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.StoreBootstrapToken(r, msg)
})
return err
}

func (ms *MultiAllStorage) RetrieveBootstrapToken(r *mdm.Request, msg *mdm.GetBootstrapToken) (*mdm.BootstrapToken, error) {
finalToken, finalErr := ms.stores[0].RetrieveBootstrapToken(r, msg)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.RetrieveBootstrapToken(r, msg)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.RetrieveBootstrapToken(r, msg)
})
return finalToken, finalErr
return val.(*mdm.BootstrapToken), err
}
29 changes: 11 additions & 18 deletions storage/allmulti/certauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,29 @@ import (
)

func (ms *MultiAllStorage) HasCertHash(r *mdm.Request, hash string) (bool, error) {
hasFinal, finalErr := ms.stores[0].HasCertHash(r, hash)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.HasCertHash(r, hash)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.HasCertHash(r, hash)
})
return hasFinal, finalErr
return val.(bool), err
}

func (ms *MultiAllStorage) EnrollmentHasCertHash(r *mdm.Request, hash string) (bool, error) {
hasFinal, finalErr := ms.stores[0].EnrollmentHasCertHash(r, hash)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.EnrollmentHasCertHash(r, hash)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.EnrollmentHasCertHash(r, hash)
})
return hasFinal, finalErr
return val.(bool), err
}

func (ms *MultiAllStorage) IsCertHashAssociated(r *mdm.Request, hash string) (bool, error) {
isAssocFinal, finalErr := ms.stores[0].IsCertHashAssociated(r, hash)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.IsCertHashAssociated(r, hash)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.IsCertHashAssociated(r, hash)
})
return isAssocFinal, finalErr
return val.(bool), err
}

func (ms *MultiAllStorage) AssociateCertHash(r *mdm.Request, hash string) error {
err := ms.stores[0].AssociateCertHash(r, hash)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.AssociateCertHash(r, hash)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.AssociateCertHash(r, hash)
})
return err
}
8 changes: 3 additions & 5 deletions storage/allmulti/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
)

func (ms *MultiAllStorage) RetrievePushInfo(ctx context.Context, ids []string) (map[string]*mdm.Push, error) {
finalMap, finalErr := ms.stores[0].RetrievePushInfo(ctx, ids)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.RetrievePushInfo(ctx, ids)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.RetrievePushInfo(ctx, ids)
})
return finalMap, finalErr
return val.(map[string]*mdm.Push), err
}
31 changes: 17 additions & 14 deletions storage/allmulti/pushcert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,31 @@ import (
)

func (ms *MultiAllStorage) IsPushCertStale(ctx context.Context, topic string, staleToken string) (bool, error) {
finalStale, finalErr := ms.stores[0].IsPushCertStale(ctx, topic, staleToken)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.IsPushCertStale(ctx, topic, staleToken)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.IsPushCertStale(ctx, topic, staleToken)
})
return finalStale, finalErr
return val.(bool), err
}

type retrievePushCertReturns struct {
cert *tls.Certificate
staleToken string
}

func (ms *MultiAllStorage) RetrievePushCert(ctx context.Context, topic string) (cert *tls.Certificate, staleToken string, err error) {
finalCert, finalToken, finalErr := ms.stores[0].RetrievePushCert(ctx, topic)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, _, err := s.RetrievePushCert(ctx, topic)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
rets := new(retrievePushCertReturns)
var err error
rets.cert, rets.staleToken, err = s.RetrievePushCert(ctx, topic)
return rets, err
})

return finalCert, finalToken, finalErr
rets := val.(*retrievePushCertReturns)
return rets.cert, rets.staleToken, err
}

func (ms *MultiAllStorage) StorePushCert(ctx context.Context, pemCert, pemKey []byte) error {
err := ms.stores[0].StorePushCert(ctx, pemCert, pemKey)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.StorePushCert(ctx, pemCert, pemKey)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.StorePushCert(ctx, pemCert, pemKey)
})
return err
}
26 changes: 10 additions & 16 deletions storage/allmulti/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,29 @@ import (
)

func (ms *MultiAllStorage) StoreCommandReport(r *mdm.Request, report *mdm.CommandResults) error {
err := ms.stores[0].StoreCommandReport(r, report)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.StoreCommandReport(r, report)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.StoreCommandReport(r, report)
})
return err
}

func (ms *MultiAllStorage) RetrieveNextCommand(r *mdm.Request, skipNotNow bool) (*mdm.Command, error) {
skipFinal, finalErr := ms.stores[0].RetrieveNextCommand(r, skipNotNow)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.RetrieveNextCommand(r, skipNotNow)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.RetrieveNextCommand(r, skipNotNow)
})
return skipFinal, finalErr
return val.(*mdm.Command), err
}

func (ms *MultiAllStorage) ClearQueue(r *mdm.Request) error {
err := ms.stores[0].ClearQueue(r)
ms.runAndLogOthers(func(s storage.AllStorage) error {
return s.ClearQueue(r)
_, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return nil, s.ClearQueue(r)
})
return err
}

func (ms *MultiAllStorage) EnqueueCommand(ctx context.Context, id []string, cmd *mdm.Command) (map[string]error, error) {
finalMap, finalErr := ms.stores[0].EnqueueCommand(ctx, id, cmd)
ms.runAndLogOthers(func(s storage.AllStorage) error {
_, err := s.EnqueueCommand(ctx, id, cmd)
return err
val, err := ms.execStores(func(s storage.AllStorage) (interface{}, error) {
return s.EnqueueCommand(ctx, id, cmd)
})
return finalMap, finalErr
return val.(map[string]error), err
}

0 comments on commit 970c5b3

Please sign in to comment.