Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix grpc ip direct connection status check #2316

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
}

func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr string) (err error) {
if pc != nil && pc.addr == addr && isHealthy(pc.conn) {
if pc != nil && pc.addr == addr && isHealthy(ctx, pc.conn) {

Check warning on line 287 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L287

Added line #L287 was not covered by tests
return nil
}
if pc != nil {
Expand All @@ -295,7 +295,7 @@
conn, err := p.dial(ctx, addr)
if err != nil {
if pc != nil {
if isHealthy(pc.conn) {
if isHealthy(ctx, pc.conn) {

Check warning on line 298 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L298

Added line #L298 was not covered by tests
log.Debugf("dialing new connection to %s failed,\terror: %v,\tbut existing connection to %s is healthy will keep existing connection", addr, err, pc.addr)
return nil
}
Expand Down Expand Up @@ -478,7 +478,7 @@
log.Debugf("failed to dial gRPC connection to %s,\terror: %v", addr, err)
return nil, err
}
if !isHealthy(conn) {
if !isHealthy(ctx, conn) {

Check warning on line 481 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L481

Added line #L481 was not covered by tests
if conn != nil {
err = conn.Close()
if err != nil && !errors.Is(err, grpc.ErrClientConnClosing) {
Expand Down Expand Up @@ -514,7 +514,7 @@
var cnt, unhealthy int
pl := p.len()
err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
if pc == nil || !isHealthy(pc.conn) {
if pc == nil || !isHealthy(ctx, pc.conn) {

Check warning on line 517 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L517

Added line #L517 was not covered by tests
if p.isIP {
if pc != nil && pc.addr != "" {
err := p.refreshConn(ctx, idx, pc, pc.addr)
Expand Down Expand Up @@ -603,11 +603,11 @@
if pl > 0 {
idx = int(p.current.Add(1) % pl)
}
if pc := p.load(idx); pc != nil && isHealthy(pc.conn) {
if pc := p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {

Check warning on line 606 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L606

Added line #L606 was not covered by tests
return pc.conn, true
}
conn, err := p.dial(ctx, p.addr)
if err == nil && conn != nil && isHealthy(conn) {
if err == nil && conn != nil && isHealthy(ctx, conn) {

Check warning on line 610 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L610

Added line #L610 was not covered by tests
p.store(idx, &poolConn{
conn: conn,
addr: p.addr,
Expand All @@ -619,7 +619,7 @@
}

if pl > 0 {
if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(pc.conn) {
if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(ctx, pc.conn) {

Check warning on line 622 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L622

Added line #L622 was not covered by tests
return pc.conn, true
}
}
Expand Down Expand Up @@ -699,7 +699,7 @@
net.JoinHostPort(p.host, port),
append(p.dopts, grpc.WithBlock())...)

if err == nil && isHealthy(conn) && conn.Close() == nil {
if err == nil && isHealthy(ctx, conn) && conn.Close() == nil {

Check warning on line 702 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L702

Added line #L702 was not covered by tests
// if no error and healthy the port is ready for gRPC
return port, nil
}
Expand Down Expand Up @@ -784,7 +784,7 @@
}
}

func isHealthy(conn *ClientConn) bool {
func isHealthy(ctx context.Context, conn *ClientConn) bool {

Check warning on line 787 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L787

Added line #L787 was not covered by tests
if conn == nil {
log.Warn("gRPC target connection is nil")
return false
Expand All @@ -797,7 +797,15 @@
log.Debugf("gRPC target %s's connection status will be Ready soon\tstatus: %s", conn.Target(), state.String())
return true
case connectivity.Idle:
log.Warnf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String())
log.Debugf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String())
conn.Connect()
if conn.WaitForStateChange(ctx, state) {
state = conn.GetState()
if state == connectivity.Ready || state == connectivity.Connecting {
log.Debugf("gRPC target %s's connection status enabled for target\tstatus: %s", conn.Target(), state.String())
return true
}

Check warning on line 807 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L800-L807

Added lines #L800 - L807 were not covered by tests
}
return false
case connectivity.Shutdown, connectivity.TransientFailure:
log.Errorf("gRPC target %s's connection status is unhealthy\tstatus: %s", conn.Target(), state.String())
Expand Down
Loading