Skip to content

Commit

Permalink
Cherry-pick #19117 to 7.x: Init package libbeat/statestore (#19655)
Browse files Browse the repository at this point in the history
Initialize support for the statestore package. The addition of the
statestore package is split up into multiple changeset to ease review.
The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore).

Once finalized, the libbeat/statestore package contains:
- The statestore frontend and interface for use within Beats
- Interfaces for the store backend
- A common set of tests store backends need to support
- a storetest package for testing new features that require a store. The
  testing helpers use map[string]interface{} that can be initialized or
  queried after the test run for validation purposes.
- The default memlog backend + tests

This change includes the frontend and backend interfaces only. Once merged we will add the tests and finally the memlog store.

(cherry picked from commit 1fc18c2)
  • Loading branch information
Steffen Siering authored Jul 6, 2020
1 parent 6525a91 commit adefe09
Show file tree
Hide file tree
Showing 8 changed files with 1,131 additions and 1,489 deletions.
726 changes: 695 additions & 31 deletions NOTICE.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.5.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07
github.com/elastic/go-concert v0.0.2
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8
github.com/elastic/go-licenser v0.3.1
github.com/elastic/go-lookslike v0.3.0
Expand Down Expand Up @@ -153,7 +154,7 @@ require (
go.uber.org/multierr v1.3.0
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f
golang.org/x/lint v0.0.0-20200130185559-910be7a94367
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 h1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.2 h1:hJb9h99LS/lyjf7pE1wQ+eiNw+0CXVLCJR42yx+AvOQ=
github.com/elastic/go-concert v0.0.2/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8 h1:Jcnojiuok7Ea5hitJK9VWmBigganE2MMETOH0VZasEA=
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8/go.mod h1:j2CZcVcluWDGhQTnq1SOPy1NKEIa74FtQ39Nnz87Jxk=
github.com/elastic/go-licenser v0.3.1 h1:RmRukU/JUmts+rpexAw0Fvt2ly7VVu6mw8z4HrEzObU=
Expand Down Expand Up @@ -652,12 +654,16 @@ github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b/go.mod h1:jAqhj/JBVC
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786 h1:B/IVHYiI0d04dudYw+CvCAGqSMq8d0yWy56eD6p85BQ=
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786/go.mod h1:RIkfovP3Y7my19aXEjjbNd9E5TlHozzAyt7B8AaEcwg=
github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 h1:OHNw/6pXODJAB32NujjdQO/KIYQ3KAbHQfCzH81XdCs=
github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797/go.mod h1:pNWFTeQ+V1OYT/TzWpnWb6eQBdoXpdx+H+lrH97/Oyo=
github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e h1:NiofbjIUI5gR+ybDsGSVH1fWyjSeDYiYVJHT1+kcsak=
github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e/go.mod h1:6GfHrdWBQYjFRIznu7XuQH4lYB2w8nO4bnImVKkzPOM=
github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230 h1:Ft1EJ6JL0F/RV6o2qJ1Be+wYxjYUSfRA3srfHgSgojc=
github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230/go.mod h1:DFxTNgS/ExCGmmjVjSOgS2WjtfjKXgCyDzAFgbtovSA=
github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4 h1:hhA8EBThzz9PztawVTycKvfETVuBqxAQ5keFlAVtbAw=
github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4/go.mod h1:RspW+E2Yb7Fs7HclB2tiDaiu6Rp41BiIG4Wo1YaoXGc=
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec h1:HkZIDJrMKZHPsYhmH2XjTTSk1pbMCFfpxSnyzZUFm+k=
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec/go.mod h1:Wp40HwmjM59FkDIVFfcCb9LzBbnc0XAMp8++hJuWvSU=
github.com/vbatts/tar-split v0.11.1/go.mod h1:LEuURwDEiWjRjwu46yU3KVGuUdVv/dcnpcEPSzR8z6g=
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 h1:NeNpIvfvaFOh0BH7nMEljE5Rk/VJlxhm58M41SeOD20=
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
Expand Down Expand Up @@ -697,6 +703,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
Expand Down Expand Up @@ -733,6 +740,8 @@ golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 h1:0IiAsCRByjO2QjX7ZPkw5oU9x+n1YqRL802rjC0c3Aw=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down
71 changes: 71 additions & 0 deletions libbeat/statestore/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backend

// Registry provides access to stores managed by the backend storage.
type Registry interface {
// Access opens a store. The store will be closed by the frontend, once all
// accessed stores have been closed.
//
// The Store instance returned must be threadsafe.
Access(name string) (Store, error)

// Close is called on shutdown after all stores have been closed.
// An implementation of Registry is not required to check for the stores to be closed.
Close() error
}

// ValueDecoder is used to decode values into go structs or maps within a transaction.
// A ValueDecoder is supposed to be invalidated by beats after the loop operations has returned.
type ValueDecoder interface {
Decode(to interface{}) error
}

// Store provides access to key value pairs.
type Store interface {
// Close should close the store and release all used resources.
Close() error

// Has checks if the key exists. No error must be returned if the key does
// not exists, but the bool return must be false.
// An error return value must indicate internal errors only. The store is
// assumed to be in a 'bad' but recoverable state if 'Has' fails.
Has(key string) (bool, error)

// Get decodes the value for the given key into value.
// Besides internal implementation specific errors an error is assumed
// to be returned if the key does not exist or the type of the value
// passed is incompatible to the actual value in the store (decoding error).
Get(key string, value interface{}) error

// Set inserts or overwrites a key pair in the store.
// The `value` parameters can be assumed to be a struct or a map. Besides
// internal implementation specific errors, an error should be returned if
// the value given can not be encoded.
Set(key string, value interface{}) error

// Remove removes and entry from the store.
Remove(string) error

// Each loops over all key value pairs in the store calling fn for each pair.
// The ValueDecoder is used by fn to optionally decode the value into a
// custom struct or map. The decoder must be executable multiple times, but
// is assumed to be invalidated once fn returns
// The loop shall return if fn returns an error or false.
Each(fn func(string, ValueDecoder) (bool, error)) error
}
91 changes: 91 additions & 0 deletions libbeat/statestore/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package statestore

import (
"errors"
"fmt"
)

// ErrorAccess indicates that an error occured when trying to open a Store.
type ErrorAccess struct {
name string
cause error
}

// Store reports the name of the store that could not been accessed.
func (e *ErrorAccess) Store() string { return e.name }

// Unwrap returns the cause for the error or nil if the cause is unknown or has
// not been reported by the backend
func (e *ErrorAccess) Unwrap() error { return e.cause }

// Error creates a descriptive error string.
func (e *ErrorAccess) Error() string {
if e.cause == nil {
return fmt.Sprintf("failed to open store '%v'", e.name)
}
return fmt.Sprintf("failed to open store '%v': %v", e.name, e.cause)
}

// ErrorClosed indicates that the operation failed because the store has already been closed.
type ErrorClosed struct {
name string
operation string
}

// Store reports the name of the store that has been closed.
func (e *ErrorClosed) Store() string { return e.name }

// Operation returns a 'readable' name for the operation that failed to access the closed store.
func (e *ErrorClosed) Operation() string { return e.operation }

// Error creates a descriptive error string.
func (e *ErrorClosed) Error() string {
return fmt.Sprintf("can not executed %v operation on closed store '%v'", e.operation, e.name)
}

// ErrorOperation is returned when a generic store operation failed.
type ErrorOperation struct {
name string
operation string
cause error
}

// Store reports the name of the store.
func (e *ErrorOperation) Store() string { return e.name }

// Operation returns a 'readable' name for the operation that failed.
func (e *ErrorOperation) Operation() string { return e.operation }

// Unwrap returns the cause of the failure.
func (e *ErrorOperation) Unwrap() error { return e.cause }

// Error creates a descriptive error string.
func (e *ErrorOperation) Error() string {
return fmt.Sprintf("failed in %v operation on store '%v': %v", e.operation, e.name, e.cause)
}

// IsClosed returns true if the cause for an Error is ErrorClosed.
func IsClosed(err error) bool {
var tmp *ErrorClosed
if errors.As(err, &tmp) {
return true
}
return false
}
87 changes: 87 additions & 0 deletions libbeat/statestore/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package statestore

import (
"sync"

"github.com/elastic/beats/v7/libbeat/statestore/backend"
)

// Registry manages multiple key-value stores.
// When working with a registry, one must access a store. Depending on backend
// a store can be an index, a table, or a directory. All access to a store is
// handled by transaction.
type Registry struct {
backend backend.Registry

mu sync.Mutex
active map[string]*sharedStore // active/open stores
wg sync.WaitGroup
}

// ValueDecoder is used to decode retrieved from an actual store. A
// ValueDecoder instance is valid for the lifetime of the transaction only.
type ValueDecoder = backend.ValueDecoder

// NewRegistry creates a new Registry with a configured backend.
func NewRegistry(backend backend.Registry) *Registry {
return &Registry{
backend: backend,
active: map[string]*sharedStore{},
}
}

// Close closes the backend storage. Close blocks until all stores in use are closed.
func (r *Registry) Close() error {
r.wg.Wait() // wait for all stores being closed
return r.backend.Close()
}

// Get opens a shared store. A store is closed and released only after all it's
// users have closed the store.
func (r *Registry) Get(name string) (*Store, error) {
r.mu.Lock()
defer r.mu.Unlock()

shared := r.active[name]
if shared == nil {
backend, err := r.backend.Access(name)
if err != nil {
return nil, &ErrorAccess{name: name, cause: err}
}

shared = newSharedStore(r, name, backend)
defer shared.Release()

r.active[name] = shared
r.wg.Add(1)
}

return newStore(shared), nil
}

func (r *Registry) unregisterStore(s *sharedStore) {
_, exists := r.active[s.name]
if !exists {
panic("removing an unknown store")
}

delete(r.active, s.name)
r.wg.Done()
}
Loading

0 comments on commit adefe09

Please sign in to comment.