Skip to content

Commit

Permalink
feat: remote write function for replications (#22942)
Browse files Browse the repository at this point in the history
* feat: remote write function for replications

* chore: implement UpdateResponseInfo store method

* chore: only set gzip heading for non-empty requests

* fix: address review feedback
  • Loading branch information
williamhbaker authored Nov 30, 2021
1 parent 5ce164f commit 9873ccd
Show file tree
Hide file tree
Showing 17 changed files with 917 additions and 146 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect
)

require github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3
require github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d

require (
cloud.google.com/go v0.82.0 // indirect
Expand Down Expand Up @@ -137,6 +137,7 @@ require (
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
github.com/c-bata/go-prompt v0.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/daixiang0/gci v0.2.8 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
github.com/dimchansky/utfbom v1.1.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/daixiang0/gci v0.2.8 h1:1mrIGMBQsBu0P7j7m1M8Lb+ZeZxsZL+jyGX4YoMJJpg=
github.com/daixiang0/gci v0.2.8/go.mod h1:+4dZ7TISfSmqfAGv59ePaHfNzgGtIkHAhhdKggP1JAc=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -432,6 +433,10 @@ github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0f
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3 h1:DJFtOP/Gji5K6iut794K1pTKPd9SqM9J+Cb7vXgsnq0=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3/go.mod h1:piIN/dAOSRqdZZc2sHO7CORuWUQ0UXdNrjugF3cEr8k=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d h1:An2Su6JpQwYTmONvndYkkjxtfAE5w04rUyH1kf/tWcg=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
github.com/influxdata/influx-cli/v2 v2.2.1 h1:K4kzXqPwfe0Qv3eY0TSiI9LEplwFGWiAKi4VfKy8KFs=
github.com/influxdata/influx-cli/v2 v2.2.1/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 h1:8io3jjCJ0j9NFvq3/m/rMrDiEILpsfOqWDPItUt/078=
Expand Down
11 changes: 11 additions & 0 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,14 @@ func (r *UpdateReplicationRequest) OK() error {

return nil
}

// ReplicationHTTPConfig contains all info needed by a client to make HTTP requests against the
// remote bucket targeted by a replication.
type ReplicationHTTPConfig struct {
RemoteURL string `db:"remote_url"`
RemoteToken string `db:"remote_api_token"`
RemoteOrgID platform.ID `db:"remote_org_id"`
AllowInsecureTLS bool `db:"allow_insecure_tls"`
RemoteBucketID platform.ID `db:"remote_bucket_id"`
DropNonRetryableData bool `db:"drop_non_retryable_data"`
}
13 changes: 0 additions & 13 deletions replications/internal/http_config.go

This file was deleted.

58 changes: 30 additions & 28 deletions replications/internal/queue_management.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -11,18 +12,23 @@ import (
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
"github.com/influxdata/influxdb/v2/replications/metrics"
"github.com/influxdata/influxdb/v2/replications/remotewrite"
"go.uber.org/zap"
)

type remoteWriter interface {
Write(context.Context, []byte) error
}

type replicationQueue struct {
id platform.ID
queue *durablequeue.Queue
wg sync.WaitGroup
done chan struct{}
receive chan struct{}
logger *zap.Logger
metrics *metrics.ReplicationsMetrics
writeFunc func([]byte) error
id platform.ID
queue *durablequeue.Queue
wg sync.WaitGroup
done chan struct{}
receive chan struct{}
logger *zap.Logger
metrics *metrics.ReplicationsMetrics
remoteWriter remoteWriter
}

type durableQueueManager struct {
Expand All @@ -31,15 +37,15 @@ type durableQueueManager struct {
queuePath string
mutex sync.RWMutex
metrics *metrics.ReplicationsMetrics
writeFunc func([]byte) error
configStore remotewrite.HttpConfigStore
}

var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
var errShutdown = errors.New("shutdown tasks for replications durable queues failed, see server logs for details")

// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
// replication streams.
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, writeFunc func([]byte) error) *durableQueueManager {
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, configStore remotewrite.HttpConfigStore) *durableQueueManager {
replicationQueues := make(map[platform.ID]*replicationQueue)

os.MkdirAll(queuePath, 0777)
Expand All @@ -49,7 +55,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.
logger: log,
queuePath: queuePath,
metrics: metrics,
writeFunc: writeFunc,
configStore: configStore,
}
}

Expand Down Expand Up @@ -116,13 +122,6 @@ func (rq *replicationQueue) Close() error {
return rq.queue.Close()
}

// WriteFunc is currently a placeholder for the "default" behavior
// of the queue scanner sending data from the durable queue to a remote host.
func WriteFunc(b []byte) error {
// TODO: Add metrics updates for BytesSent, BytesDropped, and ErrorCodes
return nil
}

func (rq *replicationQueue) run() {
defer rq.wg.Done()

Expand All @@ -131,7 +130,7 @@ func (rq *replicationQueue) run() {
case <-rq.done: // end the goroutine when done is messaged
return
case <-rq.receive: // run the scanner on data append
for rq.SendWrite(rq.writeFunc) {
for rq.SendWrite() {
}
}
}
Expand All @@ -141,7 +140,7 @@ func (rq *replicationQueue) run() {
// SendWrite is responsible for processing all data in the queue at the time of calling.
// Retryable errors should be handled and retried in the dp function.
// Unprocessable data should be dropped in the dp function.
func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
func (rq *replicationQueue) SendWrite() bool {
// Any error in creating the scanner should exit the loop in run()
// Either it is io.EOF indicating no data, or some other failure in making
// the Scanner object that we don't know how to handle.
Expand Down Expand Up @@ -170,7 +169,8 @@ func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
// An error here indicates an unhandlable error. Data is not corrupt, and
// the remote write is not retryable. A potential example of an error here
// is an authentication error with the remote host.
if err = dp(scan.Bytes()); err != nil {
// TODO: Propagate context if needed to allow for graceful shutdowns, see https://github.com/influxdata/influxdb/issues/22944
if err = rq.remoteWriter.Write(context.Background(), scan.Bytes()); err != nil {
rq.logger.Error("Error in replication stream", zap.Error(err))
return false
}
Expand Down Expand Up @@ -369,13 +369,15 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
}

func (qm *durableQueueManager) newReplicationQueue(id platform.ID, queue *durablequeue.Queue) *replicationQueue {
logger := qm.logger.With(zap.String("replication_id", id.String()))

return &replicationQueue{
id: id,
queue: queue,
done: make(chan struct{}),
receive: make(chan struct{}),
logger: qm.logger.With(zap.String("replication_id", id.String())),
metrics: qm.metrics,
writeFunc: qm.writeFunc,
id: id,
queue: queue,
done: make(chan struct{}),
receive: make(chan struct{}),
logger: logger,
metrics: qm.metrics,
remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger),
}
}
126 changes: 87 additions & 39 deletions replications/internal/queue_management_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package internal

import (
"context"
"errors"
"io"
"os"
"path/filepath"
"testing"
Expand All @@ -11,6 +14,7 @@ import (
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
"github.com/influxdata/influxdb/v2/replications/metrics"
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand All @@ -37,38 +41,69 @@ func TestCreateNewQueueDirExists(t *testing.T) {
func TestEnqueueScan(t *testing.T) {
t.Parallel()

queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))

// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)

// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
}

func TestEnqueueScanMultiple(t *testing.T) {
t.Parallel()

queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))

// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)

// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
data := "weather,location=us-midwest temperature=82 1465839830100400200"

tests := []struct {
name string
testData []string
writeFuncReturn error
}{
{
name: "single point with successful write",
testData: []string{data},
writeFuncReturn: nil,
},
{
name: "multiple points with successful write",
testData: []string{data, data, data},
writeFuncReturn: nil,
},
{
name: "single point with unsuccessful write",
testData: []string{data},
writeFuncReturn: errors.New("some error"),
},
{
name: "multiple points with unsuccessful write",
testData: []string{data, data, data},
writeFuncReturn: errors.New("some error"),
},
}

err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))

// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
rq := qm.replicationQueues[id1]
rq.remoteWriter = getTestRemoteWriter(t, data, tt.writeFuncReturn)

// Enqueue the data
for _, dat := range tt.testData {
err = qm.EnqueueData(id1, []byte(dat), 1)
require.NoError(t, err)
}

// Check queue position
close(rq.done)
rq.wg.Wait()
scan, err := rq.queue.NewScanner()

if tt.writeFuncReturn == nil {
require.ErrorIs(t, io.EOF, err)
} else {
// Queue should not have advanced at all
for range tt.testData {
require.True(t, scan.Next())
}
// Should now be at the end of the queue
require.False(t, scan.Next())
}
})
}
}

func TestCreateNewQueueDuplicateID(t *testing.T) {
Expand Down Expand Up @@ -274,7 +309,7 @@ func initQueueManager(t *testing.T) (string, *durableQueueManager) {
queuePath := filepath.Join(enginePath, "replicationq")

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil))

return queuePath, qm
}
Expand All @@ -291,12 +326,27 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
qm.replicationQueues = emptyMap
}

func getTestWriteFunc(t *testing.T, expected string) func([]byte) error {
type testRemoteWriter struct {
writeFn func(context.Context, []byte) error
}

func (tw *testRemoteWriter) Write(ctx context.Context, data []byte) error {
return tw.writeFn(ctx, data)
}

func getTestRemoteWriter(t *testing.T, expected string, returning error) remoteWriter {
t.Helper()
return func(b []byte) error {

writeFn := func(ctx context.Context, b []byte) error {
require.Equal(t, expected, string(b))
return nil
return returning
}

writer := &testRemoteWriter{}

writer.writeFn = writeFn

return writer
}

func TestEnqueueData(t *testing.T) {
Expand All @@ -307,7 +357,7 @@ func TestEnqueueData(t *testing.T) {
defer os.RemoveAll(queuePath)

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil))

require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
Expand Down Expand Up @@ -376,9 +426,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
require.NoError(t, rq.queue.SetMaxSegmentSize(8))

queueSizeBefore := rq.queue.DiskUsage()
rq.SendWrite(func(bytes []byte) error {
return nil
})
rq.SendWrite()

// Ensure that the smaller queue disk size was reflected in the metrics.
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
Expand Down
Loading

0 comments on commit 9873ccd

Please sign in to comment.