Skip to content

Commit

Permalink
use new dns client to fix data race and reuse connect
Browse files Browse the repository at this point in the history
Signed-off-by: ii2day <ji.li@daocloud.io>
  • Loading branch information
ii2day committed Nov 21, 2023
1 parent d2f0879 commit f515cf9
Show file tree
Hide file tree
Showing 44 changed files with 1,363 additions and 594 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,5 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/miekg/dns v1.1.50 => github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0 h1:V
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.4.0/go.mod h1:nqCI7aelBJU61wiBeeZWJ6oi4bJy5nrjkM6lWIMA4j0=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40 h1:AlmnjvSvhzwJR9YpV3r0k0b0g6de4infGN0fOImcaO4=
github.com/kdoctor-io/dns v0.0.0-20231117104519-7085dea69b40/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down Expand Up @@ -380,8 +382,6 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
Expand Down Expand Up @@ -669,7 +669,6 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
Expand Down Expand Up @@ -835,7 +834,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8=
golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk=
Expand Down
99 changes: 69 additions & 30 deletions pkg/loadRequest/loadDns/dns_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package loadDns

import (
"context"
"crypto/tls"
"fmt"
"github.com/kdoctor-io/kdoctor/pkg/k8s/apis/system/v1beta1"
"github.com/kdoctor-io/kdoctor/pkg/utils/stats"
"github.com/miekg/dns"
Expand Down Expand Up @@ -144,55 +144,82 @@ func (b *Work) Finish() {
b.report.finalize(total)
}

func (b *Work) makeRequest(conn *dns.Conn, wg *sync.WaitGroup) {
func (b *Work) makeRequest(client *dns.Client, msg *dns.Msg, conn *dns.Conn, wg *sync.WaitGroup) {
defer wg.Done()
var msg *dns.Msg
var rtt time.Duration
var err error

// Due to the time limitation on long-lived TCP connections by CoreDNS, connection reuse is not adopted for the TCP protocol.
if RequestProtocol(b.Protocol) == RequestMethodUdp {
err := client.ExchangeWithReuseConn(msg, conn)
if err != nil {
b.results <- &result{
duration: 0,
err: err,
msg: nil,
}
}

Check warning on line 159 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L154-L159

Added lines #L154 - L159 were not covered by tests
} else {
r, rtt, err := client.Exchange(msg, b.ServerAddr)
b.results <- &result{
duration: rtt,
err: err,
msg: r,
}
}

}

func (b *Work) runWorker() {
var conn *dns.Conn
var err error
client := new(dns.Client)
client.Net = b.Protocol
client.Timeout = time.Duration(b.Timeout) * time.Millisecond
if b.Protocol == "tcp-tls" {
if b.Protocol == string(RequestMethodTcpTls) {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
client.TLSConfig = tlsConfig
}

if b.Protocol == "tcp" || b.Protocol == "tcp-tls" {
msg, rtt, err = client.Exchange(b.Msg, b.ServerAddr)

} else {
if conn == nil {
conn, _ = client.Dial(b.ServerAddr)
if RequestProtocol(b.Protocol) == RequestMethodUdp {
conn, err = b.makeConn(client)
if err != nil {
b.Logger.Sugar().Errorf("failed create dns conn,err=%v", err)
return

Check warning on line 188 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L187-L188

Added lines #L187 - L188 were not covered by tests
}
msg, rtt, err = client.ExchangeWithConn(b.Msg, conn)
}

b.results <- &result{
duration: rtt,
err: err,
msg: msg,
go conn.Receiver()
} else {
conn = new(dns.Conn)
}
}

func (b *Work) runWorker() {
conn, err := b.makeConn()
if err != nil {
b.Logger.Sugar().Errorf("failed create dns conn,err=%v", err)
return
}
wg := &sync.WaitGroup{}
for {
// Check if application is stopped. Do not send into a closed channel.
select {
case <-b.stopCh:
wg.Wait()
if RequestProtocol(b.Protocol) == RequestMethodUdp {
conn.ShutDownReceiver()
conn.Close()
}
return
case <-b.qosTokenBucket:
wg.Add(1)
go b.makeRequest(conn, wg)
msg := new(dns.Msg)
*msg = *b.Msg
msg.Id = dns.Id()
go b.makeRequest(client, msg, conn, wg)

case resp := <-conn.ResponseReceiver:
e := resp.Err
if resp.Rtt > time.Duration(b.Timeout)*time.Millisecond {
e = fmt.Errorf("timeout for request, %d more than %d", resp.Rtt.Milliseconds(), b.Timeout)
}

Check warning on line 217 in pkg/loadRequest/loadDns/dns_requester.go

View check run for this annotation

Codecov / codecov/patch

pkg/loadRequest/loadDns/dns_requester.go#L216-L217

Added lines #L216 - L217 were not covered by tests
b.results <- &result{
duration: resp.Rtt,
err: e,
msg: resp.Msg,
}
}
}
}
Expand Down Expand Up @@ -258,11 +285,23 @@ func (b *Work) AggregateMetric() *v1beta1.DNSMetrics {
return metric
}

func (b *Work) makeConn() (*dns.Conn, error) {
func (b *Work) makeConn(c *dns.Client) (*dns.Conn, error) {
var err error
d := net.Dialer{Timeout: time.Duration(b.Timeout) * time.Millisecond}
d := net.Dialer{
Timeout: time.Duration(b.RequestTimeSecond) * time.Second,
}
conn := new(dns.Conn)
conn.Conn, err = d.DialContext(context.Background(), "udp", b.ServerAddr)
conn.ResponseReceiver = make(chan dns.Response, b.QPS)
conn.ShutDown = make(chan struct{}, 1)
conn.Conn, err = d.Dial(b.Protocol, b.ServerAddr)
// write with the appropriate write timeout
t := time.Now()
writeDeadline := t.Add(time.Duration(b.RequestTimeSecond) * time.Second)
readDeadline := t.Add(time.Duration(b.RequestTimeSecond) * time.Second)
_ = conn.SetWriteDeadline(writeDeadline)
_ = conn.SetReadDeadline(readDeadline)

conn.TsigSecret, conn.TsigProvider = c.TsigSecret, c.TsigProvider

return conn, err
}
10 changes: 5 additions & 5 deletions pkg/loadRequest/loadDns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -55,7 +55,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
EnableLatencyMetric: true,
}
Expand Down Expand Up @@ -86,7 +86,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.baidu.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -115,7 +115,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "www.no-existed.com",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down Expand Up @@ -144,7 +144,7 @@ var _ = Describe("test dns ", Label("dns"), func() {
TargetDomain: "wikipedia.org",
DnsServerAddr: dnsServer,
PerRequestTimeoutInMs: 5000,
DurationInSecond: 1,
DurationInSecond: 10,
Qps: 10,
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/loadRequest/loadHttp/http_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package loadHttp

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand Down Expand Up @@ -251,9 +250,6 @@ func (b *Work) makeRequest(c *http.Client, wg *sync.WaitGroup) {
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
ctx, cancel := context.WithTimeout(req.Context(), time.Duration(b.Timeout)*time.Millisecond)
defer cancel()
req = req.WithContext(ctx)
resp, err := c.Do(req)
t := b.now()
resDuration = t - resStart
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginManager/netdns/agentExecuteTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *PluginNetDns) AgentExecuteTask(logger *zap.Logger, ctx context.Context,
}
wg.Wait()

logger.Sugar().Infof("plugin finished all http request tests")
logger.Sugar().Infof("plugin finished all dns request tests")

// ----------------------- aggregate report
task := &v1beta1.NetDNSTask{}
Expand Down
12 changes: 8 additions & 4 deletions pkg/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ func (r *UsedResource) RunResourceCollector() {

func (r *UsedResource) Stats() v1beta1.SystemResource {
r.l.Lock()
defer r.l.Unlock()
useCPU := r.cpu
totalCPU := r.totalCPU
roundCount := r.roundCount
mem := r.mem
r.l.Unlock()
resource := v1beta1.SystemResource{
MaxCPU: fmt.Sprintf("%.3f%%", r.cpu),
MeanCPU: fmt.Sprintf("%.3f%%", r.totalCPU/float64(r.roundCount)),
MaxMemory: fmt.Sprintf("%.2fMB", float64(r.mem/(1024*1024))),
MaxCPU: fmt.Sprintf("%.3f%%", useCPU),
MeanCPU: fmt.Sprintf("%.3f%%", totalCPU/float64(roundCount)),
MaxMemory: fmt.Sprintf("%.2fMB", float64(mem/(1024*1024))),
}

return resource
Expand Down
5 changes: 2 additions & 3 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ deploy_project:
HELM_OPTION+=" --set kdoctorAgent.ingress.enable=false " ; \
fi ; \
HELM_OPTION+=" --set feature.aggregateReport.enabled=true " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.cpu=200m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.memory=256Mi " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.cpu=300m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.memory=512Mi " ; \
HELM_OPTION+=" --set feature.aggregateReport.controller.reportHostPath=/var/run/kdoctor/controller " ; \
HELM_OPTION+=" --set kdoctorAgent.debug.logLevel=debug --set kdoctorController.debug.logLevel=debug " ; \
HELM_OPTION+=" --set kdoctorAgent.prometheus.enabled=true --set kdoctorController.prometheus.enabled=true " ; \
Expand All @@ -172,7 +172,6 @@ deploy_project:
|| { KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) ./scripts/debugCluster.sh $(KIND_KUBECONFIG) "detail" $(E2E_INSTALL_NAMESPACE) ; exit 1 ; } ; \
exit 0


#=========================

.PHONY: deploy_multus
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/apphttphealth/apphttphealth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ var _ = Describe("testing appHttpHealth test ", Label("appHttpHealth"), func() {

})

It("Successfully testing using default daemonSet as workload with Task AppHttpHealthy ", Label("E00017"), func() {
It("Successfully testing using default daemonSet as workload with more Task AppHttpHealthy", Label("E00017"), func() {
var e error
successRate := float64(1)
successMean := int64(1500)
Expand Down
1 change: 1 addition & 0 deletions test/e2e/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
PluginReportPath = "/apis/system.kdoctor.io/v1beta1/namespaces/default/kdoctorreports/"
KDoctorCaName = "kdoctor-ca"
KdoctorTestTokenSecretName = "apiserver-token"
RequestFaultRate = 0.1
)

var (
Expand Down
13 changes: 6 additions & 7 deletions test/e2e/common/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func WaitKdoctorTaskDone(f *frame.Framework, task client.Object, taskKind string
default:
return fmt.Errorf("unknown task type: %s", task.GetObjectKind().GroupVersionKind().Kind)
}

}

func GetKdoctorToken(f *frame.Framework) (string, error) {
Expand Down Expand Up @@ -249,8 +248,8 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
// qps
expectRequestCount := float64(rs.Spec.Request.QPS * rs.Spec.Request.DurationInSecond)
realRequestCount := float64(m.Metrics.RequestCounts)
if math.Abs(realRequestCount-expectRequestCount)/expectRequestCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.05 ,real request count: %d,expect request count:%d", int(realRequestCount), int(expectRequestCount))
if math.Abs(realRequestCount-expectRequestCount)/expectRequestCount > RequestFaultRate {
return GetResultFromReport(r), fmt.Errorf("the error in the number of requests is greater than %.2f ,real request count: %d,expect request count:%d", RequestFaultRate, int(realRequestCount), int(expectRequestCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
return GetResultFromReport(r), fmt.Errorf("succeedRate not equal")
Expand Down Expand Up @@ -301,8 +300,8 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
realCount := float64(m.Metrics.RequestCounts)
// report request count
reportRequestCount += m.Metrics.RequestCounts
if math.Abs(realCount-expectCount)/expectCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.05 ,real request count: %d,expect request count:%d", int(realCount), int(expectCount))
if math.Abs(realCount-expectCount)/expectCount > RequestFaultRate {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than %.2f ,real request count: %d,expect request count:%d", RequestFaultRate, int(realCount), int(expectCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
return GetResultFromReport(r), fmt.Errorf("succeedRate not equal")
Expand Down Expand Up @@ -366,8 +365,8 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
realCount := float64(m.Metrics.RequestCounts)
// report request count
reportRequestCount += m.Metrics.RequestCounts
if math.Abs(realCount-expectCount)/expectCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.05, real request count: %d,expect request count:%d ", int(realCount), int(expectCount))
if math.Abs(realCount-expectCount)/expectCount > RequestFaultRate {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than %.2f, real request count: %d,expect request count:%d ", RequestFaultRate, int(realCount), int(expectCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
return GetResultFromReport(r), fmt.Errorf("succeedRate not equal")
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/netdns/netdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = Describe("testing netDns ", Label("netDns"), func() {

// request
request := new(v1beta1.NetdnsRequest)
request.PerRequestTimeoutInMS = 1000
request.PerRequestTimeoutInMS = 1500
request.QPS = 5
request.DurationInSecond = 5
request.Domain = fmt.Sprintf(targetDomain, netDnsName)
Expand Down Expand Up @@ -234,7 +234,7 @@ var _ = Describe("testing netDns ", Label("netDns"), func() {

// request
request := new(v1beta1.NetdnsRequest)
request.PerRequestTimeoutInMS = 1000
request.PerRequestTimeoutInMS = 1500
request.QPS = 5
request.DurationInSecond = 5
request.Domain = fmt.Sprintf(targetDomain, netDnsName)
Expand Down
Loading

0 comments on commit f515cf9

Please sign in to comment.