Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dtests] Docker tests integration with docker-compose #3031

Merged
merged 10 commits into from
Jan 27, 2021
64 changes: 59 additions & 5 deletions src/cmd/tools/dtest/docker/harness/resources/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
"net/http"
"time"

"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
xhttp "github.com/m3db/m3/src/x/net/http"
"github.com/ory/dockertest/v3"
"github.com/prometheus/common/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,6 +90,8 @@ type Admin interface {
GetPlacement() (admin.PlacementGetResponse, error)
// WaitForInstances blocks until the given instance is available.
WaitForInstances(ids []string) error
// WaitForShardsReady waits until all shards gets ready.
WaitForShardsReady() error
// Close closes the wrapper and releases any held resources, including
// deleting docker containers.
Close() error
Expand Down Expand Up @@ -236,6 +238,32 @@ func (c *coordinator) WaitForInstances(
})
}

func (c *coordinator) WaitForShardsReady() error {
if c.resource.closed {
return errClosed
}

logger := c.resource.logger.With(zapMethod("waitForShards"))
return c.resource.pool.Retry(func() error {
placement, err := c.GetPlacement()
if err != nil {
logger.Error("retrying get placement", zap.Error(err))
return err
}

for _, instance := range placement.Placement.Instances {
for _, shard := range instance.Shards {
if shard.State == placementpb.ShardState_INITIALIZING {
err = fmt.Errorf("at least shard %d of dbnode %s still initializing", shard.Id, instance.Id)
logger.Error("shards still are initializing", zap.Error(err))
return err
}
}
}
return nil
})
}

func (c *coordinator) CreateDatabase(
addRequest admin.DatabaseCreateRequest,
) (admin.DatabaseCreateResponse, error) {
Expand All @@ -260,6 +288,14 @@ func (c *coordinator) CreateDatabase(
return admin.DatabaseCreateResponse{}, err
}

if err = c.setNamespaceReady(addRequest.NamespaceName); err != nil {
logger.Error("failed to set namespace to ready state",
zap.Error(err),
zap.String("namespace", addRequest.NamespaceName),
)
return response, err
}

logger.Info("created database")
return response, nil
}
Expand Down Expand Up @@ -287,9 +323,28 @@ func (c *coordinator) AddNamespace(
return admin.NamespaceGetResponse{}, err
}

if err = c.setNamespaceReady(addRequest.Name); err != nil {
logger.Error("failed to set namespace to ready state", zap.Error(err), zap.String("namespace", addRequest.Name))
return response, err
}

return response, nil
}

func (c *coordinator) setNamespaceReady(name string) error {
url := c.resource.getURL(7201, "api/v1/services/m3db/namespace/ready")
logger := c.resource.logger.With(
zapMethod("setNamespaceReady"), zap.String("url", url),
zap.String("namespace", name))

_, err := makePostRequest(logger, url, // nolint: bodyclose
&admin.NamespaceReadyRequest{
Name: name,
Force: true,
})
return err
}

func (c *coordinator) WriteCarbon(
port int, metric string, v float64, t time.Time,
) error {
Expand Down Expand Up @@ -446,7 +501,6 @@ func (c *coordinator) RunQuery(

return err
})

if err != nil {
logger.Error("failed run", zap.Error(err))
}
Expand Down
60 changes: 48 additions & 12 deletions src/cmd/tools/dtest/docker/harness/resources/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resources

import (
"fmt"
"strings"
"sync"

"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
Expand All @@ -30,6 +31,7 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,6 +94,11 @@ type Node interface {
WaitForBootstrap() error
// WritePoint writes a datapoint to the node directly.
WritePoint(req *rpc.WriteRequest) error
// WriteTaggedPoint writes a datapoint with tags to the node directly.
WriteTaggedPoint(req *rpc.WriteTaggedRequest) error
// AggregateTiles starts tiles aggregation, waits until it will complete
// and returns the amount of aggregated tiles.
AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error)
// Fetch fetches datapoints.
Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error)
// Exec executes the given commands on the node container, returning
Expand All @@ -108,8 +115,6 @@ type Node interface {
}

type dbNode struct {
opts dockerResourceOptions

tchanClient *integration.TestTChannelClient
resource *dockerResource
}
Expand Down Expand Up @@ -140,26 +145,25 @@ func newDockerHTTPNode(
resource.logger.Info("set up tchanClient", zap.String("node_addr", addr))
completed = true
return &dbNode{
opts: opts,

tchanClient: tchanClient,
resource: resource,
}, nil
}

func (c *dbNode) HostDetails(p int) (*admin.Host, error) {
port, err := c.resource.getPort(p)
if err != nil {
return nil, err
var network docker.ContainerNetwork
for _, n := range c.resource.resource.Container.NetworkSettings.Networks { // nolint: gocritic
network = n
}

host := strings.TrimLeft(c.resource.resource.Container.Name, "/")
return &admin.Host{
Id: "m3db_local",
IsolationGroup: "rack-a",
Id: host,
IsolationGroup: "rack-a-" + c.resource.resource.Container.Name,
Zone: "embedded",
Weight: 1024,
Address: c.opts.containerName,
Port: uint32(port),
Address: network.IPAddress,
Port: uint32(p),
}, nil
}

Expand Down Expand Up @@ -215,6 +219,38 @@ func (c *dbNode) WritePoint(req *rpc.WriteRequest) error {
return nil
}

func (c *dbNode) WriteTaggedPoint(req *rpc.WriteTaggedRequest) error {
if c.resource.closed {
return errClosed
}

logger := c.resource.logger.With(zapMethod("write-tagged"))
err := c.tchanClient.TChannelClientWriteTagged(timeout, req)
if err != nil {
logger.Error("could not write-tagged", zap.Error(err))
return err
}

logger.Info("wrote")
return nil
}

func (c *dbNode) AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) {
if c.resource.closed {
return 0, errClosed
}

logger := c.resource.logger.With(zapMethod("aggregate-tiles"))
rsp, err := c.tchanClient.TChannelClientAggregateTiles(timeout, req)
if err != nil {
logger.Error("could not aggregate tiles", zap.Error(err))
return 0, err
}

logger.Info("wrote")
return rsp.ProcessedTileCount, nil
}

func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) {
if c.resource.closed {
return nil, errClosed
Expand All @@ -236,7 +272,7 @@ func (c *dbNode) Restart() error {
return errClosed
}

cName := c.opts.containerName
cName := c.resource.resource.Container.Name
logger := c.resource.logger.With(zapMethod("restart"))
logger.Info("restarting container", zap.String("container", cName))
err := c.resource.pool.Client.RestartContainer(cName, 60)
Expand Down
Loading