Skip to content

Commit

Permalink
add single connection client for agent & vald
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed May 25, 2021
1 parent 4e4367c commit d54a653
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 4 deletions.
112 changes: 108 additions & 4 deletions internal/client/v1/client/agent/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vdaas/vald/internal/client/v1/client/vald"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
)

// Client represents agent NGT client interface.
Expand All @@ -41,6 +42,15 @@ type agentClient struct {
c grpc.Client
}

type singleAgentClient struct {
vald.Client
ac agent.AgentClient
}

const (
apiName = "vald/internal/client/v1/client/agent/core"
)

// New returns Client implementation if no error occurs.
func New(opts ...Option) (Client, error) {
c := new(agentClient)
Expand Down Expand Up @@ -76,12 +86,29 @@ func New(opts ...Option) (Client, error) {
return c, nil
}

func NewAgentClient(cc *grpc.ClientConn) interface {
vald.Client
client.ObjectReader
client.Indexer
} {
return &singleAgentClient{
Client: vald.NewValdClient(cc),
ac: agent.NewAgentClient(cc)}
}

func (c *agentClient) CreateIndex(
ctx context.Context,
req *client.ControlCreateIndexRequest,
opts ...grpc.CallOption,
) (*client.Empty, error) {
_, err := c.c.RoundRobin(ctx, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.CreateIndex")
defer func() {
if span != nil {
span.End()
}
}()
_, err := c.c.RoundRobin(ctx, func(ctx context.Context,
conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return agent.NewAgentClient(conn).CreateIndex(ctx, req, copts...)
})
return nil, err
Expand All @@ -92,7 +119,14 @@ func (c *agentClient) SaveIndex(
req *client.Empty,
opts ...grpc.CallOption,
) (*client.Empty, error) {
_, err := c.c.RoundRobin(ctx, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.SaveIndex")
defer func() {
if span != nil {
span.End()
}
}()
_, err := c.c.RoundRobin(ctx, func(ctx context.Context,
conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return agent.NewAgentClient(conn).SaveIndex(ctx, new(client.Empty), copts...)
})
return nil, err
Expand All @@ -103,7 +137,14 @@ func (c *agentClient) CreateAndSaveIndex(
req *client.ControlCreateIndexRequest,
opts ...grpc.CallOption,
) (*client.Empty, error) {
_, err := c.c.RoundRobin(ctx, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.CreateAndSaveIndex")
defer func() {
if span != nil {
span.End()
}
}()
_, err := c.c.RoundRobin(ctx, func(ctx context.Context,
conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return agent.NewAgentClient(conn).CreateAndSaveIndex(ctx, req, copts...)
})
return nil, err
Expand All @@ -114,7 +155,14 @@ func (c *agentClient) IndexInfo(
req *client.Empty,
opts ...grpc.CallOption,
) (res *client.InfoIndexCount, err error) {
_, err = c.c.RoundRobin(ctx, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.IndexInfo")
defer func() {
if span != nil {
span.End()
}
}()
_, err = c.c.RoundRobin(ctx, func(ctx context.Context,
conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
res, err := agent.NewAgentClient(conn).IndexInfo(ctx, new(client.Empty), copts...)
if err != nil {
return nil, err
Expand All @@ -126,3 +174,59 @@ func (c *agentClient) IndexInfo(
}
return res, nil
}

func (c *singleAgentClient) CreateIndex(
ctx context.Context,
req *client.ControlCreateIndexRequest,
opts ...grpc.CallOption,
) (*client.Empty, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.CreateIndex")
defer func() {
if span != nil {
span.End()
}
}()
return c.ac.CreateIndex(ctx, req, opts...)
}

func (c *singleAgentClient) SaveIndex(
ctx context.Context,
req *client.Empty,
opts ...grpc.CallOption,
) (*client.Empty, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.SaveIndex")
defer func() {
if span != nil {
span.End()
}
}()
return c.ac.SaveIndex(ctx, new(client.Empty), opts...)
}

func (c *singleAgentClient) CreateAndSaveIndex(
ctx context.Context,
req *client.ControlCreateIndexRequest,
opts ...grpc.CallOption,
) (*client.Empty, error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.CreateAndSaveIndex")
defer func() {
if span != nil {
span.End()
}
}()
return c.ac.CreateAndSaveIndex(ctx, req, opts...)
}

func (c *singleAgentClient) IndexInfo(
ctx context.Context,
req *client.Empty,
opts ...grpc.CallOption,
) (res *client.InfoIndexCount, err error) {
ctx, span := trace.StartSpan(ctx, apiName+"/agentClient.IndexInfo")
defer func() {
if span != nil {
span.End()
}
}()
return c.ac.IndexInfo(ctx, new(client.Empty), opts...)
}
Loading

0 comments on commit d54a653

Please sign in to comment.