Skip to content

Commit

Permalink
feat: enable remotes and replication streams feature (#22990)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker authored Dec 13, 2021
1 parent 0e5b14f commit 5a919b6
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 109 deletions.
26 changes: 12 additions & 14 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,23 +359,21 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)

if feature.ReplicationStreamBackend().Enabled(ctx, m.flagger) {
m.reg.MustRegister(replicationsMetrics.PrometheusCollectors()...)
m.reg.MustRegister(replicationsMetrics.PrometheusCollectors()...)

if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err
}
if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err
}

m.closers = append(m.closers, labeledCloser{
label: "replications",
closer: func(context.Context) error {
return replicationSvc.Close()
},
})
m.closers = append(m.closers, labeledCloser{
label: "replications",
closer: func(context.Context) error {
return replicationSvc.Close()
},
})

pointsWriter = replicationSvc
}
pointsWriter = replicationSvc

deps, err := influxdb.NewDependencies(
storageflux.NewReader(storage2.NewStore(m.engine.TSDBStore(), m.engine.MetaClient())),
Expand Down
17 changes: 4 additions & 13 deletions cmd/influxd/launcher/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@ import (
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/stretchr/testify/require"
)

func TestValidateReplication_Valid(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)
client := l.APIClient(t)

Expand Down Expand Up @@ -84,9 +81,7 @@ func TestValidateReplication_Valid(t *testing.T) {
}

func TestValidateReplication_Invalid(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)
client := l.APIClient(t)

Expand Down Expand Up @@ -200,9 +195,7 @@ func TestReplicationStreamEndToEnd(t *testing.T) {
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,300,f,m,v3` + "\r\n" +
`,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,400,f,m,v4` + "\r\n\r\n"

l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)
client := l.APIClient(t)

Expand Down Expand Up @@ -312,9 +305,7 @@ func TestReplicationStreamEndToEnd(t *testing.T) {
}

func TestReplicationsLocalWriteAndShutdownBlocking(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
client := l.APIClient(t)

// Server that only returns an error will cause the remote write to retry on loop.
Expand Down
8 changes: 0 additions & 8 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,6 @@
expose: true
lifetime: temporary

- name: Replication Stream Backend
description: Enable replication-stream APIs and underlying synchronization queues
key: replicationStreamBackend
default: false
contact: Edge Team
expose: true
lifetime: temporary

- name: New Dashboard Autorefresh
description: Enables the new dashboard autorefresh controls in the UI
key: newAutoRefresh
Expand Down
16 changes: 0 additions & 16 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions remotes/transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
Expand Down Expand Up @@ -80,7 +79,6 @@ func newRemoteConnectionHandler(log *zap.Logger, svc RemoteConnectionService) *R
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
h.mwRemotesFlag, // Temporary, remove when feature flag for remote connections is perma-enabled.
)

r.Route("/", func(r chi.Router) {
Expand All @@ -102,19 +100,6 @@ func (h *RemoteConnectionHandler) Prefix() string {
return prefixRemotes
}

func (h *RemoteConnectionHandler) mwRemotesFlag(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flags := feature.FlagsFromContext(r.Context())

if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) {
h.api.Respond(w, r, http.StatusNotFound, nil)
return
}

next.ServeHTTP(w, r)
})
}

func (h *RemoteConnectionHandler) handleGetRemotes(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()

Expand Down
15 changes: 1 addition & 14 deletions remotes/transport/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (

"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/remotes/mock"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -181,21 +179,10 @@ func TestRemoteConnectionHandler(t *testing.T) {
func newTestServer(t *testing.T) (*httptest.Server, *mock.MockRemoteConnectionService) {
ctrlr := gomock.NewController(t)
svc := mock.NewMockRemoteConnectionService(ctrlr)
server := annotatedTestServer(newRemoteConnectionHandler(zaptest.NewLogger(t), svc))
server := newRemoteConnectionHandler(zaptest.NewLogger(t), svc)
return httptest.NewServer(server), svc
}

func annotatedTestServer(serv http.Handler) http.Handler {
replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true)

return feature.NewHandler(
zap.NewNop(),
feature.DefaultFlagger(),
[]feature.Flag{replicationFlag},
serv,
)
}

func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request {
dat, err := json.Marshal(body)
require.NoError(t, err)
Expand Down
15 changes: 0 additions & 15 deletions replications/transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
Expand Down Expand Up @@ -102,7 +101,6 @@ func newReplicationHandler(log *zap.Logger, svc ReplicationService) *Replication
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
h.mwReplicationsFlag,
)

r.Route("/", func(r chi.Router) {
Expand All @@ -125,19 +123,6 @@ func (h *ReplicationHandler) Prefix() string {
return prefixReplications
}

func (h *ReplicationHandler) mwReplicationsFlag(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flags := feature.FlagsFromContext(r.Context())

if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) {
h.api.Respond(w, r, http.StatusNotFound, nil)
return
}

next.ServeHTTP(w, r)
})
}

func (h *ReplicationHandler) handleGetReplications(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()

Expand Down
15 changes: 1 addition & 14 deletions replications/transport/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (

"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/replications/mock"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -316,21 +314,10 @@ func TestReplicationHandler(t *testing.T) {
func newTestServer(t *testing.T) (*httptest.Server, *mock.MockReplicationService) {
ctrl := gomock.NewController(t)
svc := mock.NewMockReplicationService(ctrl)
server := annotatedTestServer(newReplicationHandler(zaptest.NewLogger(t), svc))
server := newReplicationHandler(zaptest.NewLogger(t), svc)
return httptest.NewServer(server), svc
}

func annotatedTestServer(serv http.Handler) http.Handler {
replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true)

return feature.NewHandler(
zap.NewNop(),
feature.DefaultFlagger(),
[]feature.Flag{replicationFlag},
serv,
)
}

func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request {
dat, err := json.Marshal(body)
require.NoError(t, err)
Expand Down

0 comments on commit 5a919b6

Please sign in to comment.