Skip to content

Commit

Permalink
use new dns client to fix data race and reuse connect
Browse files Browse the repository at this point in the history
Signed-off-by: ii2day <ji.li@daocloud.io>
  • Loading branch information
ii2day committed Nov 22, 2023
1 parent d2f0879 commit 1f2b5e1
Show file tree
Hide file tree
Showing 46 changed files with 1,421 additions and 617 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,5 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/miekg/dns v1.1.50 => github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0 h1:V
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0/go.mod h1:nqCI7aelBJU61wiBeeZWJ6oi4bJy5nrjkM6lWIMA4j0=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40 h1:AlmnjvSvhzwJR9YpV3r0k0b0g6de4infGN0fOImcaO4=
github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down Expand Up @@ -380,8 +382,6 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
Expand Down Expand Up @@ -669,7 +669,6 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
Expand Down Expand Up @@ -835,7 +834,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8=
golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk=
Expand Down
99 changes: 69 additions & 30 deletions pkg/loadRequest/loadDns/dns_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package loadDns

import (
"context"
"crypto/tls"
"fmt"
"github.com/kdoctor-io/kdoctor/pkg/k8s/apis/system/v1beta1"
"github.com/kdoctor-io/kdoctor/pkg/utils/stats"
"github.com/miekg/dns"
Expand Down Expand Up @@ -144,55 +144,82 @@ func (b *Work) Finish() {
b.report.finalize(total)
}

func (b *Work) makeRequest(conn *dns.Conn, wg *sync.WaitGroup) {
func (b *Work) makeRequest(client *dns.Client, msg *dns.Msg, conn *dns.Conn, wg *sync.WaitGroup) {
defer wg.Done()
var msg *dns.Msg
var rtt time.Duration
var err error

// Due to the time limitation on long-lived TCP connections by CoreDNS, connection reuse is not adopted for the TCP protocol.
if RequestProtocol(b.Protocol) == RequestMethodUdp {
err := client.ExchangeWithReuseConn(msg, conn)
if err != nil {
b.results <- &result{
duration: 0,
err: err,
msg: nil,
}
}

Check warning on line 159 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L154-L159

Added lines #L154 - L159 were not covered by tests
} else {
r, rtt, err := client.Exchange(msg, b.ServerAddr)
b.results <- &result{
duration: rtt,
err: err,
msg: r,
}
}

}

func (b *Work) runWorker() {
var conn *dns.Conn
var err error
client := new(dns.Client)
client.Net = b.Protocol
client.Timeout = time.Duration(b.Timeout) * time.Millisecond
if b.Protocol == "tcp-tls" {
if RequestProtocol(b.Protocol) == RequestMethodTcpTls {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
client.TLSConfig = tlsConfig
}

if b.Protocol == "tcp" || b.Protocol == "tcp-tls" {
msg, rtt, err = client.Exchange(b.Msg, b.ServerAddr)

} else {
if conn == nil {
conn, _ = client.Dial(b.ServerAddr)
if RequestProtocol(b.Protocol) == RequestMethodUdp {
conn, err = b.makeConn(client)
if err != nil {
b.Logger.Sugar().Errorf("failed create dns conn,err=%v", err)
return

Check warning on line 188 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L187-L188

Added lines #L187 - L188 were not covered by tests
}
msg, rtt, err = client.ExchangeWithConn(b.Msg, conn)
}

b.results <- &result{
duration: rtt,
err: err,
msg: msg,
go conn.Receiver()
} else {
conn = new(dns.Conn)
}
}

func (b *Work) runWorker() {
conn, err := b.makeConn()
if err != nil {
b.Logger.Sugar().Errorf("failed create dns conn,err=%v", err)
return
}
wg := &sync.WaitGroup{}
for {
// Check if application is stopped. Do not send into a closed channel.
select {
case <-b.stopCh:
wg.Wait()
if RequestProtocol(b.Protocol) == RequestMethodUdp {
conn.ShutDownReceiver()
conn.Close()
}
return
case <-b.qosTokenBucket:
wg.Add(1)
go b.makeRequest(conn, wg)
msg := new(dns.Msg)
*msg = *b.Msg
msg.Id = dns.Id()
go b.makeRequest(client, msg, conn, wg)

case resp := <-conn.ResponseReceiver:
e := resp.Err
if resp.Rtt > time.Duration(b.Timeout)*time.Millisecond {
e = fmt.Errorf("timeout for request, %d more than %d", resp.Rtt.Milliseconds(), b.Timeout)
}

Check warning on line 217 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L216-L217

Added lines #L216 - L217 were not covered by tests
b.results <- &result{
duration: resp.Rtt,
err: e,
msg: resp.Msg,
}
}
}
}
Expand Down Expand Up @@ -258,11 +285,23 @@ func (b *Work) AggregateMetric() *v1beta1.DNSMetrics {
return metric
}

func (b *Work) makeConn() (*dns.Conn, error) {
func (b *Work) makeConn(c *dns.Client) (*dns.Conn, error) {
var err error
d := net.Dialer{Timeout: time.Duration(b.Timeout) * time.Millisecond}
d := net.Dialer{
Timeout: time.Duration(b.RequestTimeSecond) * time.Second,
}
conn := new(dns.Conn)
conn.Conn, err = d.DialContext(context.Background(), "udp", b.ServerAddr)
conn.ResponseReceiver = make(chan dns.Response, b.QPS)
conn.ShutDown = make(chan struct{})
conn.Conn, err = d.Dial(b.Protocol, b.ServerAddr)
// write with the appropriate write timeout
t := time.Now()
writeDeadline := t.Add(time.Duration(b.RequestTimeSecond) * time.Second)
readDeadline := t.Add(time.Duration(b.RequestTimeSecond) * time.Second)
_ = conn.SetWriteDeadline(writeDeadline)
_ = conn.SetReadDeadline(readDeadline)

conn.TsigSecret, conn.TsigProvider = c.TsigSecret, c.TsigProvider

return conn, err
}
10 changes: 5 additions & 5 deletions pkg/loadRequest/loadDns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -55,7 +55,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
EnableLatencyMetric: true,
}
Expand Down Expand Up @@ -86,7 +86,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -115,7 +115,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.no-existed.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -144,7 +144,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "wikipedia.org",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/loadRequest/loadHttp/http_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (b *Work) makeRequest(c *http.Client, wg *sync.WaitGroup) {
}
}
}

b.results <- &result{
duration: finish,
statusCode: statusCode,
Expand Down Expand Up @@ -310,7 +311,7 @@ func (b *Work) runWorker() {
tr.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
}
// Each goroutine uses the same HTTP Client instance
client := &http.Client{Transport: tr, Timeout: time.Duration(b.Timeout) * time.Millisecond}
client := &http.Client{Transport: tr}
if b.DisableRedirects {
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginManager/netdns/agentExecuteTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *PluginNetDns) AgentExecuteTask(logger *zap.Logger, ctx context.Context,
}
wg.Wait()

logger.Sugar().Infof("plugin finished all http request tests")
logger.Sugar().Infof("plugin finished all dns request tests")

// ----------------------- aggregate report
task := &v1beta1.NetDNSTask{}
Expand Down
12 changes: 8 additions & 4 deletions pkg/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ func (r *UsedResource) RunResourceCollector() {

func (r *UsedResource) Stats() v1beta1.SystemResource {
r.l.Lock()
defer r.l.Unlock()
useCPU := r.cpu
totalCPU := r.totalCPU
roundCount := r.roundCount
mem := r.mem
r.l.Unlock()
resource := v1beta1.SystemResource{
MaxCPU: fmt.Sprintf("%.3f%%", r.cpu),
MeanCPU: fmt.Sprintf("%.3f%%", r.totalCPU/float64(r.roundCount)),
MaxMemory: fmt.Sprintf("%.2fMB", float64(r.mem/(1024*1024))),
MaxCPU: fmt.Sprintf("%.3f%%", useCPU),
MeanCPU: fmt.Sprintf("%.3f%%", totalCPU/float64(roundCount)),
MaxMemory: fmt.Sprintf("%.2fMB", float64(mem/(1024*1024))),
}

return resource
Expand Down
7 changes: 4 additions & 3 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ deploy_project:
HELM_OPTION+=" --set kdoctorAgent.ingress.enable=false " ; \
fi ; \
HELM_OPTION+=" --set feature.aggregateReport.enabled=true " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.cpu=200m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.memory=256Mi " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.cpu=400m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.limits.cpu=1600m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.memory=512Mi " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.limits.memory=2048Mi " ; \
HELM_OPTION+=" --set feature.aggregateReport.controller.reportHostPath=/var/run/kdoctor/controller " ; \
HELM_OPTION+=" --set kdoctorAgent.debug.logLevel=debug --set kdoctorController.debug.logLevel=debug " ; \
HELM_OPTION+=" --set kdoctorAgent.prometheus.enabled=true --set kdoctorController.prometheus.enabled=true " ; \
Expand All @@ -172,7 +174,6 @@ deploy_project:
|| { KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) ./scripts/debugCluster.sh $(KIND_KUBECONFIG) "detail" $(E2E_INSTALL_NAMESPACE) ; exit 1 ; } ; \
exit 0


#=========================

.PHONY: deploy_multus
Expand Down
Loading

0 comments on commit 1f2b5e1

Please sign in to comment.