Skip to content

Commit

Permalink
Init package libbeat/statestore (elastic#19117)
Browse files Browse the repository at this point in the history
Initialize support for the statestore package. The addition of the
statestore package is split up into multiple changeset to ease review.
The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore).

Once finalized, the libbeat/statestore package contains:
- The statestore frontend and interface for use within Beats
- Interfaces for the store backend
- A common set of tests store backends need to support
- a storetest package for testing new features that require a store. The
  testing helpers use map[string]interface{} that can be initialized or
  queried after the test run for validation purposes.
- The default memlog backend + tests

This change includes the frontend and backend interfaces only. Once merged we will add the tests and finally the memlog store.
  • Loading branch information
Steffen Siering committed Jun 12, 2020
1 parent eaf5e2f commit 1fc18c2
Show file tree
Hide file tree
Showing 50 changed files with 3,778 additions and 1,073 deletions.
48 changes: 46 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,15 @@ SOFTWARE
6.11 "Subscription" means the right to receive Support Services and a License
to the Commercial Software.

--------------------------------------------------------------------
Dependency: github.com/elastic/go-concert
Version: v0.0.2
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-concert/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/elastic/go-libaudit/v2
Version: v2.0.0
Expand Down Expand Up @@ -8135,7 +8144,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/lint
Revision: fdd1cda4f05f
Revision: 910be7a94367
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/lint/LICENSE:
--------------------------------------------------------------------
Expand Down Expand Up @@ -8167,6 +8176,41 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/mod
Version: v0.1.1
Revision: c90efee705ee
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/mod/LICENSE:
--------------------------------------------------------------------
Copyright (c) 2009 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/net
Revision: 16171245cfb2
Expand Down Expand Up @@ -8373,7 +8417,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/tools
Revision: 7b8e75db28f4
Revision: b320d3a0f5a2
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/tools/LICENSE:
--------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.5.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07
github.com/elastic/go-concert v0.0.2
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8
github.com/elastic/go-licenser v0.2.1
github.com/elastic/go-lookslike v0.3.0
Expand Down Expand Up @@ -150,14 +151,14 @@ require (
go.uber.org/multierr v1.3.0
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f
golang.org/x/lint v0.0.0-20200130185559-910be7a94367
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 h1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.2 h1:hJb9h99LS/lyjf7pE1wQ+eiNw+0CXVLCJR42yx+AvOQ=
github.com/elastic/go-concert v0.0.2/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8 h1:Jcnojiuok7Ea5hitJK9VWmBigganE2MMETOH0VZasEA=
github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8/go.mod h1:j2CZcVcluWDGhQTnq1SOPy1NKEIa74FtQ39Nnz87Jxk=
github.com/elastic/go-licenser v0.2.1 h1:K76YI6XR2LRpewLGwhrTqasXZcNJG2yHY4/jit/IXGY=
Expand Down Expand Up @@ -697,6 +699,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
Expand Down Expand Up @@ -735,10 +738,13 @@ golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 h1:0IiAsCRByjO2QjX7ZPkw5oU9x+n1YqRL802rjC0c3Aw=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee h1:WG0RUwxtNT4qqaXX3DPA8zHFNm/D9xaBpxzHt1WcA/E=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -843,9 +849,13 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 h1:Toz2IK7k8rbltAXwNAxKcn9OzqyNfMUhUNjz3sL0NMk=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2 h1:0sfSpGSa544Fwnbot3Oxq/U6SXqjty6Jy/3wRhVS7ig=
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
71 changes: 71 additions & 0 deletions libbeat/statestore/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backend

// Registry provides access to stores managed by the backend storage.
type Registry interface {
// Access opens a store. The store will be closed by the frontend, once all
// accessed stores have been closed.
//
// The Store instance returned must be threadsafe.
Access(name string) (Store, error)

// Close is called on shutdown after all stores have been closed.
// An implementation of Registry is not required to check for the stores to be closed.
Close() error
}

// ValueDecoder is used to decode values into go structs or maps within a transaction.
// A ValueDecoder is supposed to be invalidated by beats after the loop operations has returned.
type ValueDecoder interface {
Decode(to interface{}) error
}

// Store provides access to key value pairs.
type Store interface {
// Close should close the store and release all used resources.
Close() error

// Has checks if the key exists. No error must be returned if the key does
// not exists, but the bool return must be false.
// An error return value must indicate internal errors only. The store is
// assumed to be in a 'bad' but recoverable state if 'Has' fails.
Has(key string) (bool, error)

// Get decodes the value for the given key into value.
// Besides internal implementation specific errors an error is assumed
// to be returned if the key does not exist or the type of the value
// passed is incompatible to the actual value in the store (decoding error).
Get(key string, value interface{}) error

// Set inserts or overwrites a key pair in the store.
// The `value` parameters can be assumed to be a struct or a map. Besides
// internal implementation specific errors, an error should be returned if
// the value given can not be encoded.
Set(key string, value interface{}) error

// Remove removes and entry from the store.
Remove(string) error

// Each loops over all key value pairs in the store calling fn for each pair.
// The ValueDecoder is used by fn to optionally decode the value into a
// custom struct or map. The decoder must be executable multiple times, but
// is assumed to be invalidated once fn returns
// The loop shall return if fn returns an error or false.
Each(fn func(string, ValueDecoder) (bool, error)) error
}
91 changes: 91 additions & 0 deletions libbeat/statestore/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package statestore

import (
"errors"
"fmt"
)

// ErrorAccess indicates that an error occured when trying to open a Store.
type ErrorAccess struct {
name string
cause error
}

// Store reports the name of the store that could not been accessed.
func (e *ErrorAccess) Store() string { return e.name }

// Unwrap returns the cause for the error or nil if the cause is unknown or has
// not been reported by the backend
func (e *ErrorAccess) Unwrap() error { return e.cause }

// Error creates a descriptive error string.
func (e *ErrorAccess) Error() string {
if e.cause == nil {
return fmt.Sprintf("failed to open store '%v'", e.name)
}
return fmt.Sprintf("failed to open store '%v': %v", e.name, e.cause)
}

// ErrorClosed indicates that the operation failed because the store has already been closed.
type ErrorClosed struct {
name string
operation string
}

// Store reports the name of the store that has been closed.
func (e *ErrorClosed) Store() string { return e.name }

// Operation returns a 'readable' name for the operation that failed to access the closed store.
func (e *ErrorClosed) Operation() string { return e.operation }

// Error creates a descriptive error string.
func (e *ErrorClosed) Error() string {
return fmt.Sprintf("can not executed %v operation on closed store '%v'", e.operation, e.name)
}

// ErrorOperation is returned when a generic store operation failed.
type ErrorOperation struct {
name string
operation string
cause error
}

// Store reports the name of the store.
func (e *ErrorOperation) Store() string { return e.name }

// Operation returns a 'readable' name for the operation that failed.
func (e *ErrorOperation) Operation() string { return e.operation }

// Unwrap returns the cause of the failure.
func (e *ErrorOperation) Unwrap() error { return e.cause }

// Error creates a descriptive error string.
func (e *ErrorOperation) Error() string {
return fmt.Sprintf("failed in %v operation on store '%v': %v", e.operation, e.name, e.cause)
}

// IsClosed returns true if the cause for an Error is ErrorClosed.
func IsClosed(err error) bool {
var tmp *ErrorClosed
if errors.As(err, &tmp) {
return true
}
return false
}
87 changes: 87 additions & 0 deletions libbeat/statestore/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package statestore

import (
"sync"

"github.com/elastic/beats/v7/libbeat/statestore/backend"
)

// Registry manages multiple key-value stores.
// When working with a registry, one must access a store. Depending on backend
// a store can be an index, a table, or a directory. All access to a store is
// handled by transaction.
type Registry struct {
backend backend.Registry

mu sync.Mutex
active map[string]*sharedStore // active/open stores
wg sync.WaitGroup
}

// ValueDecoder is used to decode retrieved from an actual store. A
// ValueDecoder instance is valid for the lifetime of the transaction only.
type ValueDecoder = backend.ValueDecoder

// NewRegistry creates a new Registry with a configured backend.
func NewRegistry(backend backend.Registry) *Registry {
return &Registry{
backend: backend,
active: map[string]*sharedStore{},
}
}

// Close closes the backend storage. Close blocks until all stores in use are closed.
func (r *Registry) Close() error {
r.wg.Wait() // wait for all stores being closed
return r.backend.Close()
}

// Get opens a shared store. A store is closed and released only after all it's
// users have closed the store.
func (r *Registry) Get(name string) (*Store, error) {
r.mu.Lock()
defer r.mu.Unlock()

shared := r.active[name]
if shared == nil {
backend, err := r.backend.Access(name)
if err != nil {
return nil, &ErrorAccess{name: name, cause: err}
}

shared = newSharedStore(r, name, backend)
defer shared.Release()

r.active[name] = shared
r.wg.Add(1)
}

return newStore(shared), nil
}

func (r *Registry) unregisterStore(s *sharedStore) {
_, exists := r.active[s.name]
if !exists {
panic("removing an unknown store")
}

delete(r.active, s.name)
r.wg.Done()
}
Loading

0 comments on commit 1fc18c2

Please sign in to comment.