Skip to content

Commit

Permalink
Merge branch 'master' into linasn/bootstrap-out-of-retention-blocks-fix
Browse files Browse the repository at this point in the history
* master:
  [dtests] Docker tests integration with docker-compose (#3031)
  [dbnode] Comments / remove unused var (#3124)
  [query] Handle context.Canceled and map to 499 http status (#3069)
  • Loading branch information
soundvibe committed Jan 27, 2021
2 parents e2484a6 + 4abe51e commit 9b9102c
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 48 deletions.
64 changes: 59 additions & 5 deletions src/cmd/tools/dtest/docker/harness/resources/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
"net/http"
"time"

"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
xhttp "github.com/m3db/m3/src/x/net/http"
"github.com/ory/dockertest/v3"
"github.com/prometheus/common/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,6 +90,8 @@ type Admin interface {
GetPlacement() (admin.PlacementGetResponse, error)
// WaitForInstances blocks until the given instance is available.
WaitForInstances(ids []string) error
// WaitForShardsReady waits until all shards gets ready.
WaitForShardsReady() error
// Close closes the wrapper and releases any held resources, including
// deleting docker containers.
Close() error
Expand Down Expand Up @@ -236,6 +238,32 @@ func (c *coordinator) WaitForInstances(
})
}

func (c *coordinator) WaitForShardsReady() error {
if c.resource.closed {
return errClosed
}

logger := c.resource.logger.With(zapMethod("waitForShards"))
return c.resource.pool.Retry(func() error {
placement, err := c.GetPlacement()
if err != nil {
logger.Error("retrying get placement", zap.Error(err))
return err
}

for _, instance := range placement.Placement.Instances {
for _, shard := range instance.Shards {
if shard.State == placementpb.ShardState_INITIALIZING {
err = fmt.Errorf("at least shard %d of dbnode %s still initializing", shard.Id, instance.Id)
logger.Error("shards still are initializing", zap.Error(err))
return err
}
}
}
return nil
})
}

func (c *coordinator) CreateDatabase(
addRequest admin.DatabaseCreateRequest,
) (admin.DatabaseCreateResponse, error) {
Expand All @@ -260,6 +288,14 @@ func (c *coordinator) CreateDatabase(
return admin.DatabaseCreateResponse{}, err
}

if err = c.setNamespaceReady(addRequest.NamespaceName); err != nil {
logger.Error("failed to set namespace to ready state",
zap.Error(err),
zap.String("namespace", addRequest.NamespaceName),
)
return response, err
}

logger.Info("created database")
return response, nil
}
Expand Down Expand Up @@ -287,9 +323,28 @@ func (c *coordinator) AddNamespace(
return admin.NamespaceGetResponse{}, err
}

if err = c.setNamespaceReady(addRequest.Name); err != nil {
logger.Error("failed to set namespace to ready state", zap.Error(err), zap.String("namespace", addRequest.Name))
return response, err
}

return response, nil
}

func (c *coordinator) setNamespaceReady(name string) error {
url := c.resource.getURL(7201, "api/v1/services/m3db/namespace/ready")
logger := c.resource.logger.With(
zapMethod("setNamespaceReady"), zap.String("url", url),
zap.String("namespace", name))

_, err := makePostRequest(logger, url, // nolint: bodyclose
&admin.NamespaceReadyRequest{
Name: name,
Force: true,
})
return err
}

func (c *coordinator) WriteCarbon(
port int, metric string, v float64, t time.Time,
) error {
Expand Down Expand Up @@ -446,7 +501,6 @@ func (c *coordinator) RunQuery(

return err
})

if err != nil {
logger.Error("failed run", zap.Error(err))
}
Expand Down
60 changes: 48 additions & 12 deletions src/cmd/tools/dtest/docker/harness/resources/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resources

import (
"fmt"
"strings"
"sync"

"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
Expand All @@ -30,6 +31,7 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,6 +94,11 @@ type Node interface {
WaitForBootstrap() error
// WritePoint writes a datapoint to the node directly.
WritePoint(req *rpc.WriteRequest) error
// WriteTaggedPoint writes a datapoint with tags to the node directly.
WriteTaggedPoint(req *rpc.WriteTaggedRequest) error
// AggregateTiles starts tiles aggregation, waits until it will complete
// and returns the amount of aggregated tiles.
AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error)
// Fetch fetches datapoints.
Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error)
// Exec executes the given commands on the node container, returning
Expand All @@ -108,8 +115,6 @@ type Node interface {
}

type dbNode struct {
opts dockerResourceOptions

tchanClient *integration.TestTChannelClient
resource *dockerResource
}
Expand Down Expand Up @@ -140,26 +145,25 @@ func newDockerHTTPNode(
resource.logger.Info("set up tchanClient", zap.String("node_addr", addr))
completed = true
return &dbNode{
opts: opts,

tchanClient: tchanClient,
resource: resource,
}, nil
}

func (c *dbNode) HostDetails(p int) (*admin.Host, error) {
port, err := c.resource.getPort(p)
if err != nil {
return nil, err
var network docker.ContainerNetwork
for _, n := range c.resource.resource.Container.NetworkSettings.Networks { // nolint: gocritic
network = n
}

host := strings.TrimLeft(c.resource.resource.Container.Name, "/")
return &admin.Host{
Id: "m3db_local",
IsolationGroup: "rack-a",
Id: host,
IsolationGroup: "rack-a-" + c.resource.resource.Container.Name,
Zone: "embedded",
Weight: 1024,
Address: c.opts.containerName,
Port: uint32(port),
Address: network.IPAddress,
Port: uint32(p),
}, nil
}

Expand Down Expand Up @@ -215,6 +219,38 @@ func (c *dbNode) WritePoint(req *rpc.WriteRequest) error {
return nil
}

func (c *dbNode) WriteTaggedPoint(req *rpc.WriteTaggedRequest) error {
if c.resource.closed {
return errClosed
}

logger := c.resource.logger.With(zapMethod("write-tagged"))
err := c.tchanClient.TChannelClientWriteTagged(timeout, req)
if err != nil {
logger.Error("could not write-tagged", zap.Error(err))
return err
}

logger.Info("wrote")
return nil
}

func (c *dbNode) AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) {
if c.resource.closed {
return 0, errClosed
}

logger := c.resource.logger.With(zapMethod("aggregate-tiles"))
rsp, err := c.tchanClient.TChannelClientAggregateTiles(timeout, req)
if err != nil {
logger.Error("could not aggregate tiles", zap.Error(err))
return 0, err
}

logger.Info("wrote")
return rsp.ProcessedTileCount, nil
}

func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) {
if c.resource.closed {
return nil, errClosed
Expand All @@ -236,7 +272,7 @@ func (c *dbNode) Restart() error {
return errClosed
}

cName := c.opts.containerName
cName := c.resource.resource.Container.Name
logger := c.resource.logger.With(zapMethod("restart"))
logger.Info("restarting container", zap.String("container", cName))
err := c.resource.pool.Client.RestartContainer(cName, 60)
Expand Down
Loading

0 comments on commit 9b9102c

Please sign in to comment.