Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ca data for yurthub component #1815

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/util/certmanager/pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
// load the root ca from the given kubeconfig file
config, err := clientcmd.LoadFromFile(kubeConfig)
if err != nil || config == nil {
return nil, fmt.Errorf("could not load the kubeconfig file(%s), %w",

Check warning on line 87 in pkg/util/certmanager/pki.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/certmanager/pki.go#L87

Added line #L87 was not covered by tests
kubeConfig, err)
}

Expand Down Expand Up @@ -173,7 +173,7 @@
if os.IsNotExist(err) {
return nil, fmt.Errorf("CA file(%s) doesn't exist", caFile)
}
return nil, fmt.Errorf("could not stat the CA file(%s): %w", caFile, err)

Check warning on line 176 in pkg/util/certmanager/pki.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/certmanager/pki.go#L176

Added line #L176 was not covered by tests
}

caData, err := os.ReadFile(caFile)
Expand All @@ -185,3 +185,10 @@
certPool.AppendCertsFromPEM(caData)
return certPool, nil
}

// GenCertPoolUseCAData generates a x509 CertPool based on the given CA data
func GenCertPoolUseCAData(caData []byte) (*x509.CertPool, error) {
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caData)
return certPool, nil

Check warning on line 193 in pkg/util/certmanager/pki.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/certmanager/pki.go#L190-L193

Added lines #L190 - L193 were not covered by tests
}
1 change: 1 addition & 0 deletions pkg/yurthub/certificate/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type YurtClientCertificateManager interface {
Stop()
UpdateBootstrapConf(joinToken string) error
GetHubConfFile() string
GetCAData() []byte
GetCaFile() string
GetAPIServerClientCert() *tls.Certificate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"crypto/x509"
"errors"
"fmt"
"os"
"time"

"k8s.io/klog/v2"
Expand All @@ -40,6 +41,7 @@
kubeletCAFile string
kubeletPemFile string
cert *tls.Certificate
caData []byte
}

func NewKubeletCertManager(kubeConfFile, kubeletCAFile, kubeletPemFile string) (certificate.YurtClientCertificateManager, error) {
Expand All @@ -50,6 +52,10 @@
if exist, _ := util.FileExists(kubeletCAFile); !exist {
return nil, KubeletCANotExistErr
}
caData, err := os.ReadFile(kubeletCAFile)
if err != nil {
return nil, err
}

Check warning on line 58 in pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go#L57-L58

Added lines #L57 - L58 were not covered by tests

if exist, _ := util.FileExists(kubeletPemFile); !exist {
return nil, KubeletPemNotExistErr
Expand All @@ -65,6 +71,7 @@
kubeletCAFile: kubeletCAFile,
kubeletPemFile: kubeletPemFile,
cert: cert,
caData: caData,
}, nil
}

Expand All @@ -84,6 +91,10 @@
return kcm.kubeConfFile
}

func (kcm *kubeletCertManager) GetCAData() []byte {
return kcm.caData

Check warning on line 95 in pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}

func (kcm *kubeletCertManager) GetCaFile() string {
return kcm.kubeletCAFile
}
Expand All @@ -96,7 +107,7 @@
klog.Warningf("current certificate: %s is expired, reload it", kcm.kubeletPemFile)
cert, err := loadFile(kcm.kubeletPemFile)
if err != nil {
klog.Errorf("could not load client certificate(%s), %v", kcm.kubeletPemFile, err)

Check warning on line 110 in pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/kubeletcertificate/kubelet_certificate.go#L110

Added line #L110 was not covered by tests
return nil
}
kcm.cert = cert
Expand Down
9 changes: 2 additions & 7 deletions pkg/yurthub/certificate/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubeletcertificate"
hubServerCert "github.com/openyurtio/openyurt/pkg/yurthub/certificate/server"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/token"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

const (
Expand Down Expand Up @@ -123,12 +122,8 @@
errs = append(errs, apiServerClientCertNotReadyError)
}

if exist, err := util.FileExists(hcm.YurtClientCertificateManager.GetCaFile()); !exist {
if err == nil {
errs = append(errs, caCertIsNotReadyError)
} else {
errs = append(errs, err)
}
if len(hcm.YurtClientCertificateManager.GetCAData()) == 0 {
errs = append(errs, caCertIsNotReadyError)

Check warning on line 126 in pkg/yurthub/certificate/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/manager/manager.go#L126

Added line #L126 was not covered by tests
}

if hcm.GetHubServerCert() == nil {
Expand Down
42 changes: 8 additions & 34 deletions pkg/yurthub/certificate/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
certutil "k8s.io/client-go/util/cert"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
"github.com/openyurtio/openyurt/pkg/projectinfo"
kubeconfigutil "github.com/openyurtio/openyurt/pkg/util/kubeconfig"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/testdata"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

func TestGetHubServerCertFile(t *testing.T) {
Expand Down Expand Up @@ -108,33 +103,6 @@ func TestReady(t *testing.T) {
if mgr.Ready() {
return true, nil
}

if exist, err := util.FileExists(mgr.GetCaFile()); !exist {
if err != nil {
t.Logf("could not get ca file(%s), %v", mgr.GetCaFile(), err)
return false, err
}

if exist, err := util.FileExists(mgr.GetHubConfFile()); err != nil {
t.Logf("could not get hub conf file(%s), %v", mgr.GetHubConfFile(), err)
return false, nil
} else if exist {
t.Logf("%s file already exists, so use it to create ca file", mgr.GetHubConfFile())
hubKubeConfig, err := clientcmd.LoadFromFile(mgr.GetHubConfFile())
if err != nil {
return false, err
}

cluster := kubeconfigutil.GetClusterFromKubeConfig(hubKubeConfig)
if cluster != nil {
if err := certutil.WriteCert(mgr.GetCaFile(), cluster.CertificateAuthorityData); err != nil {
return false, errors.Wrap(err, "couldn't save the CA certificate to disk")
}
} else {
return false, errors.Errorf("couldn't prepare ca.crt(%s) file", mgr.GetCaFile())
}
}
}
return false, nil
})

Expand All @@ -161,8 +129,14 @@ func TestReady(t *testing.T) {
return
}
newMgr.Start()
if !newMgr.Ready() {
t.Errorf("certificates can not be reused")
err = wait.PollImmediate(2*time.Second, 1*time.Minute, func() (done bool, err error) {
if mgr.Ready() {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("certificates are not reused, %v", err)
}
newMgr.Stop()

Expand Down
17 changes: 17 additions & 0 deletions pkg/yurthub/certificate/token/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
joinToken string
bootstrapFile string
dialer *util.Dialer
caData []byte
}

// NewYurtHubClientCertManager new a YurtCertificateManager instance
Expand Down Expand Up @@ -149,7 +150,7 @@
func (ycm *yurtHubClientCertManager) Start() {
err := ycm.prepareConfigAndCaFile()
if err != nil {
klog.Errorf("could not prepare config and ca file, %v", err)

Check warning on line 153 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L153

Added line #L153 was not covered by tests
return
}

Expand All @@ -171,7 +172,7 @@
if len(ycm.bootstrapFile) != 0 {
// 1. load bootstrap config
if tlsBootstrapCfg, err = clientcmd.LoadFromFile(ycm.getBootstrapConfFile()); err != nil {
klog.Errorf("maybe hub agent restarted, could not load bootstrap config file(%s), %v.", ycm.getBootstrapConfFile(), err)

Check warning on line 175 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L175

Added line #L175 was not covered by tests
} else {
klog.V(2).Infof("%s file is configured, just use it", ycm.getBootstrapConfFile())
}
Expand Down Expand Up @@ -203,11 +204,17 @@
if err := certutil.WriteCert(ycm.GetCaFile(), cluster.CertificateAuthorityData); err != nil {
return errors.Wrap(err, "couldn't save the CA certificate to disk")
}
ycm.caData = cluster.CertificateAuthorityData

Check warning on line 207 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L207

Added line #L207 was not covered by tests
} else {
return errors.Errorf("couldn't prepare ca.crt(%s) file", ycm.GetCaFile())
}
} else {
klog.V(2).Infof("%s file already exists, so reuse it", ycm.GetCaFile())
caData, err := os.ReadFile(ycm.GetCaFile())
if err != nil {
return err
}
ycm.caData = caData

Check warning on line 217 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L213-L217

Added lines #L213 - L217 were not covered by tests
}
return nil
}
Expand All @@ -220,7 +227,7 @@
return errors.Wrap(err, "couldn't stat bootstrap config file")
} else if !exist {
if tlsBootstrapCfg, err = ycm.retrieveHubBootstrapConfig(ycm.joinToken); err != nil {
return errors.Wrap(err, "could not retrieve bootstrap config")

Check warning on line 230 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L230

Added line #L230 was not covered by tests
}
} else {
klog.V(2).Infof("%s file already exists, so reuse it", ycm.getBootstrapConfFile())
Expand Down Expand Up @@ -250,11 +257,17 @@
if err := certutil.WriteCert(ycm.GetCaFile(), cluster.CertificateAuthorityData); err != nil {
return errors.Wrap(err, "couldn't save the CA certificate to disk")
}
ycm.caData = cluster.CertificateAuthorityData

Check warning on line 260 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L260

Added line #L260 was not covered by tests
} else {
return errors.Errorf("couldn't prepare ca.crt(%s) file", ycm.GetCaFile())
}
} else {
klog.V(2).Infof("%s file already exists, so reuse it", ycm.GetCaFile())
caData, err := os.ReadFile(ycm.GetCaFile())
if err != nil {
return err
}
ycm.caData = caData

Check warning on line 270 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L266-L270

Added lines #L266 - L270 were not covered by tests
}

return nil
Expand Down Expand Up @@ -284,6 +297,10 @@
return filepath.Join(ycm.hubRunDir, bootstrapConfigFileName)
}

func (ycm *yurtHubClientCertManager) GetCAData() []byte {
return ycm.caData

Check warning on line 301 in pkg/yurthub/certificate/token/token.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/certificate/token/token.go#L300-L301

Added lines #L300 - L301 were not covered by tests
}

// GetCaFile returns the path of ca file
func (ycm *yurtHubClientCertManager) GetCaFile() string {
return filepath.Join(ycm.getPkiDir(), hubCaFileName)
Expand Down
19 changes: 9 additions & 10 deletions pkg/yurthub/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type CertGetter interface {
// GetAPIServerClientCert returns the currently selected certificate, as well as
// the associated certificate and key data in PEM format.
GetAPIServerClientCert() *tls.Certificate
// Return CA file path.
GetCaFile() string
// GetCAData returns CA file data.
GetCAData() []byte
}

// Interface is an transport interface for managing clients that used to connecting kube-apiserver
Expand All @@ -60,13 +60,12 @@ type transportManager struct {

// NewTransportManager create a transport interface object.
func NewTransportManager(certGetter CertGetter, stopCh <-chan struct{}) (Interface, error) {
caFile := certGetter.GetCaFile()
if len(caFile) == 0 {
return nil, fmt.Errorf("ca cert file was not prepared when new transport")
caData := certGetter.GetCAData()
if len(caData) == 0 {
return nil, fmt.Errorf("ca cert data was not prepared when new transport")
}
klog.V(2).Infof("use %s ca cert file to access remote server", caFile)

cfg, err := tlsConfig(certGetter.GetAPIServerClientCert, caFile)
cfg, err := tlsConfig(certGetter.GetAPIServerClientCert, caData)
if err != nil {
klog.Errorf("could not get tls config when new transport, %v", err)
return nil, err
Expand All @@ -81,7 +80,7 @@ func NewTransportManager(certGetter CertGetter, stopCh <-chan struct{}) (Interfa
DialContext: d.DialContext,
})

bearerTLSCfg, err := tlsConfig(nil, caFile)
bearerTLSCfg, err := tlsConfig(nil, caData)
if err != nil {
klog.Errorf("could not get tls config when new bearer transport, %v", err)
return nil, err
Expand Down Expand Up @@ -151,9 +150,9 @@ func (tm *transportManager) start() {
}, 10*time.Second, tm.stopCh)
}

func tlsConfig(current func() *tls.Certificate, caFile string) (*tls.Config, error) {
func tlsConfig(current func() *tls.Certificate, caData []byte) (*tls.Config, error) {
// generate the TLS configuration based on the latest certificate
rootCert, err := certmanager.GenCertPoolUseCA(caFile)
rootCert, err := certmanager.GenCertPoolUseCAData(caData)
if err != nil {
klog.Errorf("could not generate a x509 CertPool based on the given CA file, %v", err)
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/yurthub/yurtcoordinator/certmanager/certmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
func NewCertManager(pkiDir, yurtHubNs string, yurtClient kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*CertManager, error) {
store := fs.FileSystemOperator{}
if err := store.CreateDir(pkiDir); err != nil && err != fs.ErrExists {
return nil, fmt.Errorf("could not create dir %s, %v", pkiDir, err)

Check warning on line 60 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L60

Added line #L60 was not covered by tests
}

certMgr := &CertManager{
Expand Down Expand Up @@ -98,6 +98,7 @@
coordinatorCert *tls.Certificate
nodeLeaseProxyCert *tls.Certificate
store fs.FileSystemOperator
caData []byte

// Used for unit test.
secret *corev1.Secret
Expand All @@ -115,6 +116,10 @@
return c.nodeLeaseProxyCert
}

func (c *CertManager) GetCAData() []byte {
return c.caData

Check warning on line 120 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}

func (c *CertManager) GetCaFile() string {
return c.GetFilePath(RootCA)
}
Expand All @@ -139,7 +144,7 @@
var coordinatorCert, nodeLeaseProxyCert *tls.Certificate
if cook {
if cert, err := tls.X509KeyPair(coordinatorClientCrt, coordinatorClientKey); err != nil {
klog.Errorf("could not create tls certificate for coordinator, %v", err)

Check warning on line 147 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L147

Added line #L147 was not covered by tests
} else {
coordinatorCert = &cert
}
Expand All @@ -147,7 +152,7 @@

if nook {
if cert, err := tls.X509KeyPair(nodeLeaseProxyClientCrt, nodeLeaseProxyClientKey); err != nil {
klog.Errorf("could not create tls certificate for node lease proxy, %v", err)

Check warning on line 155 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L155

Added line #L155 was not covered by tests
} else {
nodeLeaseProxyCert = &cert
}
Expand All @@ -160,27 +165,28 @@
if caok {
klog.Infof("updating coordinator ca cert")
if err := c.createOrUpdateFile(c.GetFilePath(RootCA), ca); err != nil {
klog.Errorf("could not update ca, %v", err)

Check warning on line 168 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L168

Added line #L168 was not covered by tests
}
c.caData = ca
}

if cook {
klog.Infof("updating yurt-coordinator-yurthub client cert and key")
if err := c.createOrUpdateFile(c.GetFilePath(YurthubClientKey), coordinatorClientKey); err != nil {
klog.Errorf("could not update coordinator client key, %v", err)

Check warning on line 176 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L176

Added line #L176 was not covered by tests
}
if err := c.createOrUpdateFile(c.GetFilePath(YurthubClientCert), coordinatorClientCrt); err != nil {
klog.Errorf("could not update coordinator client cert, %v", err)

Check warning on line 179 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L179

Added line #L179 was not covered by tests
}
}

if nook {
klog.Infof("updating node-lease-proxy-client cert and key")
if err := c.createOrUpdateFile(c.GetFilePath(NodeLeaseProxyClientKey), nodeLeaseProxyClientKey); err != nil {
klog.Errorf("could not update node lease proxy client key, %v", err)

Check warning on line 186 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L186

Added line #L186 was not covered by tests
}
if err := c.createOrUpdateFile(c.GetFilePath(NodeLeaseProxyClientCert), nodeLeaseProxyClientCrt); err != nil {
klog.Errorf("could not update node lease proxy client cert, %v", err)

Check warning on line 189 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L189

Added line #L189 was not covered by tests
}
}

Expand All @@ -199,7 +205,7 @@
func (c *CertManager) createOrUpdateFile(path string, data []byte) error {
if err := c.store.Write(path, data); err == fs.ErrNotExists {
if err := c.store.CreateFile(path, data); err != nil {
return fmt.Errorf("could not create file at %s, %v", path, err)

Check warning on line 208 in pkg/yurthub/yurtcoordinator/certmanager/certmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/certmanager/certmanager.go#L208

Added line #L208 was not covered by tests
}
} else if err != nil {
return fmt.Errorf("could not update file at %s, %v", path, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/yurthub/yurtcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@
statusInfoChan chan statusInfo
isPoolCacheSynced bool
certMgr *certmanager.CertManager
// cloudCAFilePath is the file path of cloud kubernetes cluster CA cert.
cloudCAFilePath string
// cloudCAFileData is the file data of cloud kubernetes cluster CA cert.
cloudCAFileData []byte
// cloudHealthChecker is health checker of cloud APIServers. It is used to
// pick a healthy cloud APIServer to proxy heartbeats.
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
Expand Down Expand Up @@ -148,12 +148,12 @@
}
coordinatorClient, err := kubernetes.NewForConfig(coordinatorRESTCfg)
if err != nil {
return nil, fmt.Errorf("could not create client for yurt coordinator, %v", err)

Check warning on line 151 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L151

Added line #L151 was not covered by tests
}

coordinator := &coordinator{
ctx: ctx,
cloudCAFilePath: cfg.CertManager.GetCaFile(),
cloudCAFileData: cfg.CertManager.GetCAData(),

Check warning on line 156 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L156

Added line #L156 was not covered by tests
cloudHealthChecker: cloudHealthChecker,
etcdStorageCfg: etcdStorageCfg,
restConfigMgr: restMgr,
Expand Down Expand Up @@ -188,7 +188,7 @@

proxiedClient, err := buildProxiedClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent)
if err != nil {
return nil, fmt.Errorf("could not create proxied client, %v", err)

Check warning on line 191 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L191

Added line #L191 was not covered by tests
}

// init pool scope resources
Expand All @@ -196,7 +196,7 @@

dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent)
if err != nil {
return nil, fmt.Errorf("could not create dynamic client, %v", err)

Check warning on line 199 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L199

Added line #L199 was not covered by tests
}

poolScopedCacheSyncManager := &poolScopedCacheSyncManager{
Expand Down Expand Up @@ -283,13 +283,13 @@
case LeaderHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("could not create pool scoped cache store and manager, %v", err)

Check warning on line 286 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L286

Added line #L286 was not covered by tests
coordinator.statusInfoChan <- electorStatusInfo
continue
}

if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil {
klog.Errorf("could not sync pool-scoped resource, %v", err)

Check warning on line 292 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L292

Added line #L292 was not covered by tests
cancelEtcdStorage()
coordinator.statusInfoChan <- electorStatusInfo
continue
Expand Down Expand Up @@ -320,7 +320,7 @@

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
klog.Errorf("could not upload local cache when yurthub becomes leader, %v", err)

Check warning on line 323 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L323

Added line #L323 was not covered by tests
} else {
needUploadLocalCache = false
}
Expand All @@ -328,7 +328,7 @@
case FollowerHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("could not create pool scoped cache store and manager, %v", err)

Check warning on line 331 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L331

Added line #L331 was not covered by tests
coordinator.statusInfoChan <- electorStatusInfo
continue
}
Expand All @@ -339,7 +339,7 @@

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
klog.Errorf("could not upload local cache when yurthub becomes follower, %v", err)

Check warning on line 342 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L342

Added line #L342 was not covered by tests
} else {
needUploadLocalCache = false
}
Expand Down Expand Up @@ -400,7 +400,7 @@
etcdStore, err := etcd.NewStorage(ctx, coordinator.etcdStorageCfg)
if err != nil {
cancel()
return nil, nil, nil, fmt.Errorf("could not create etcd storage, %v", err)

Check warning on line 403 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L403

Added line #L403 was not covered by tests
}
poolCacheManager := cachemanager.NewCacheManager(
cachemanager.NewStorageWrapper(etcdStore),
Expand All @@ -418,14 +418,14 @@
func (coordinator *coordinator) newNodeLeaseProxyClient() (coordclientset.LeaseInterface, error) {
healthyCloudServer, err := coordinator.cloudHealthChecker.PickHealthyServer()
if err != nil {
return nil, fmt.Errorf("could not get a healthy cloud APIServer, %v", err)

Check warning on line 421 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L421

Added line #L421 was not covered by tests
} else if healthyCloudServer == nil {
return nil, fmt.Errorf("could not get a healthy cloud APIServer, all server are unhealthy")

Check warning on line 423 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L423

Added line #L423 was not covered by tests
}
restCfg := &rest.Config{
Host: healthyCloudServer.String(),
TLSClientConfig: rest.TLSClientConfig{
CAFile: coordinator.cloudCAFilePath,
CAData: coordinator.cloudCAFileData,

Check warning on line 428 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L428

Added line #L428 was not covered by tests
CertFile: coordinator.certMgr.GetFilePath(certmanager.NodeLeaseProxyClientCert),
KeyFile: coordinator.certMgr.GetFilePath(certmanager.NodeLeaseProxyClientKey),
},
Expand All @@ -433,7 +433,7 @@
}
cloudClient, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return nil, fmt.Errorf("could not create cloud client, %v", err)

Check warning on line 436 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L436

Added line #L436 was not covered by tests
}

return cloudClient.CoordinationV1().Leases(corev1.NamespaceNodeLease), nil
Expand All @@ -458,7 +458,7 @@
cloudLease, err := cloudLeaseClient.Get(coordinator.ctx, newLease.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
if _, err := cloudLeaseClient.Create(coordinator.ctx, cloudLease, metav1.CreateOptions{}); err != nil {
klog.Errorf("could not create lease %s at cloud, %v", newLease.Name, err)

Check warning on line 461 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L461

Added line #L461 was not covered by tests
continue
}
}
Expand All @@ -466,7 +466,7 @@
cloudLease.Annotations = newLease.Annotations
cloudLease.Spec.RenewTime = newLease.Spec.RenewTime
if updatedLease, err := cloudLeaseClient.Update(coordinator.ctx, cloudLease, metav1.UpdateOptions{}); err != nil {
klog.Errorf("could not update lease %s at cloud, %v", newLease.Name, err)

Check warning on line 469 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L469

Added line #L469 was not covered by tests
continue
} else {
klog.V(2).Infof("delegate node lease for %s", updatedLease.Name)
Expand Down Expand Up @@ -498,7 +498,7 @@
if !p.isRunning {
err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("could not delete informer sync lease, %v", err)

Check warning on line 501 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L501

Added line #L501 was not covered by tests
}

etcdStore := p.getEtcdStore()
Expand All @@ -506,7 +506,7 @@
return fmt.Errorf("got empty etcd storage")
}
if err := etcdStore.DeleteComponentResources(constants.DefaultPoolScopedUserAgent); err != nil {
return fmt.Errorf("could not clean old pool-scoped cache, %v", err)

Check warning on line 509 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L509

Added line #L509 was not covered by tests
}

ctx, cancel := context.WithCancel(p.ctx)
Expand Down Expand Up @@ -546,7 +546,7 @@
p.renewInformerLease(ctx, informerLease)
return
}
klog.Error("could not wait for cache synced, it was canceled")

Check warning on line 549 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L549

Added line #L549 was not covered by tests
}

func (p *poolScopedCacheSyncManager) renewInformerLease(ctx context.Context, lease informerLease) {
Expand All @@ -559,7 +559,7 @@
case <-t.C:
newLease, err := lease.Update(p.informerSyncedLease)
if err != nil {
klog.Errorf("could not update informer lease, %v", err)

Check warning on line 562 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L562

Added line #L562 was not covered by tests
continue
}
p.informerSyncedLease = newLease
Expand Down Expand Up @@ -613,12 +613,12 @@
for k, b := range objBytes {
rv, err := getRv(b)
if err != nil {
klog.Errorf("could not get name from bytes %s, %v", string(b), err)

Check warning on line 616 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L616

Added line #L616 was not covered by tests
continue
}

if err := l.createOrUpdate(k, b, rv); err != nil {
klog.Errorf("could not upload %s, %v", k.Key(), err)

Check warning on line 621 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L621

Added line #L621 was not covered by tests
}
}
}
Expand Down Expand Up @@ -648,25 +648,25 @@
}
localKeys, err := l.diskStorage.ListResourceKeysOfComponent(info.Component, gvr)
if err != nil {
klog.Errorf("could not get object keys from disk for %s, %v", gvr.String(), err)

Check warning on line 651 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L651

Added line #L651 was not covered by tests
continue
}

for _, k := range localKeys {
buf, err := l.diskStorage.Get(k)
if err != nil {
klog.Errorf("could not read local cache of key %s, %v", k.Key(), err)

Check warning on line 658 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L658

Added line #L658 was not covered by tests
continue
}
buildInfo, err := disk.ExtractKeyBuildInfo(k)
if err != nil {
klog.Errorf("could not extract key build info from local cache of key %s, %v", k.Key(), err)

Check warning on line 663 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L663

Added line #L663 was not covered by tests
continue
}

poolCacheKey, err := l.etcdStorage.KeyFunc(*buildInfo)
if err != nil {
klog.Errorf("could not generate pool cache key from local cache key %s, %v", k.Key(), err)

Check warning on line 669 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L669

Added line #L669 was not covered by tests
continue
}
objBytes[poolCacheKey] = buf
Expand Down Expand Up @@ -749,12 +749,12 @@
func getRv(objBytes []byte) (uint64, error) {
obj := &unstructured.Unstructured{}
if err := json.Unmarshal(objBytes, obj); err != nil {
return 0, fmt.Errorf("could not unmarshal json: %v", err)

Check warning on line 752 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L752

Added line #L752 was not covered by tests
}

rv, err := strconv.ParseUint(obj.GetResourceVersion(), 10, 64)
if err != nil {
return 0, fmt.Errorf("could not parse rv %s of pod %s, %v", obj.GetName(), obj.GetResourceVersion(), err)

Check warning on line 757 in pkg/yurthub/yurtcoordinator/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/yurtcoordinator/coordinator.go#L757

Added line #L757 was not covered by tests
}

return rv, nil
Expand Down
Loading