diff --git a/controller/rancher/metadata_util.go b/controller/rancher/metadata_util.go index 56c5e77..8916ad4 100644 --- a/controller/rancher/metadata_util.go +++ b/controller/rancher/metadata_util.go @@ -14,8 +14,7 @@ type LBMetadata struct { StickinessPolicy config.StickinessPolicy `json:"stickiness_policy"` } -// converts an interface to LBMetadata -func getLBMetadata(data interface{}) (*LBMetadata, error) { +func GetLBMetadata(data interface{}) (*LBMetadata, error) { lbMeta := &LBMetadata{} if err := convert(data, lbMeta); err != nil { return nil, err diff --git a/controller/rancher/rancher.go b/controller/rancher/rancher.go index 90a4df0..478b2ad 100644 --- a/controller/rancher/rancher.go +++ b/controller/rancher/rancher.go @@ -22,7 +22,7 @@ const ( ) func init() { - lbc, err := newLoadBalancerController() + lbc, err := NewLoadBalancerController() if err != nil { logrus.Fatalf("%v", err) } @@ -30,7 +30,7 @@ func init() { controller.RegisterController(lbc.GetName(), lbc) } -func (lbc *loadBalancerController) Init() { +func (lbc *LoadBalancerController) Init() { cattleURL := os.Getenv("CATTLE_URL") if len(cattleURL) == 0 { logrus.Fatalf("CATTLE_URL is not set, fail to init Rancher LB provider") @@ -57,54 +57,55 @@ func (lbc *loadBalancerController) Init() { logrus.Fatalf("Failed to create Rancher client %v", err) } - certFetcher := &rCertificateFetcher{ - client: client, + certFetcher := &RCertificateFetcher{ + Client: client, } - lbc.certFetcher = certFetcher + lbc.CertFetcher = certFetcher } -type loadBalancerController struct { +type LoadBalancerController struct { shutdown bool stopCh chan struct{} - lbProvider provider.LBProvider + LBProvider provider.LBProvider syncQueue *utils.TaskQueue opts *client.ClientOpts incrementalBackoff int64 incrementalBackoffInterval int64 - certFetcher CertificateFetcher - metaFetcher MetadataFetcher + CertFetcher CertificateFetcher + MetaFetcher MetadataFetcher } type MetadataFetcher interface { GetSelfService() (metadata.Service, error) - GetService(svcName string, stackName string) (*metadata.Service, error) + GetService(envUUID string, svcName string, stackName string) (*metadata.Service, error) OnChange(intervalSeconds int, do func(string)) GetServices() ([]metadata.Service, error) } -type rMetaFetcher struct { - metadataClient metadata.Client +type RMetaFetcher struct { + MetadataClient metadata.Client } type CertificateFetcher interface { - fetchCertificate(certName string) (*config.Certificate, error) + FetchCertificate(certName string) (*config.Certificate, error) + UpdateEndpoints(lbSvc *metadata.Service, eps []client.PublicEndpoint) error } -type rCertificateFetcher struct { - client *client.RancherClient +type RCertificateFetcher struct { + Client *client.RancherClient } -func (lbc *loadBalancerController) GetName() string { +func (lbc *LoadBalancerController) GetName() string { return "rancher" } -func (lbc *loadBalancerController) Run(provider provider.LBProvider) { +func (lbc *LoadBalancerController) Run(provider provider.LBProvider) { logrus.Infof("starting %s controller", lbc.GetName()) - lbc.lbProvider = provider + lbc.LBProvider = provider go lbc.syncQueue.Run(time.Second, lbc.stopCh) - go lbc.lbProvider.Run(nil) + go lbc.LBProvider.Run(nil) metadataClient, err := metadata.NewClientAndWait(metadataURL) if err != nil { @@ -112,29 +113,29 @@ func (lbc *loadBalancerController) Run(provider provider.LBProvider) { lbc.Stop() } - lbc.metaFetcher = rMetaFetcher{ - metadataClient: metadataClient, + lbc.MetaFetcher = RMetaFetcher{ + MetadataClient: metadataClient, } - lbc.metaFetcher.OnChange(5, lbc.ScheduleApplyConfig) + lbc.MetaFetcher.OnChange(5, lbc.ScheduleApplyConfig) <-lbc.stopCh } -func (mf rMetaFetcher) OnChange(intervalSeconds int, do func(string)) { - mf.metadataClient.OnChange(intervalSeconds, do) +func (mf RMetaFetcher) OnChange(intervalSeconds int, do func(string)) { + mf.MetadataClient.OnChange(intervalSeconds, do) } -func (lbc *loadBalancerController) ScheduleApplyConfig(string) { +func (lbc *LoadBalancerController) ScheduleApplyConfig(string) { logrus.Debug("Scheduling apply config") lbc.syncQueue.Enqueue(lbc.GetName()) } -func (lbc *loadBalancerController) Stop() error { +func (lbc *LoadBalancerController) Stop() error { if !lbc.shutdown { logrus.Infof("Shutting down %s controller", lbc.GetName()) //stop the provider - if err := lbc.lbProvider.Stop(); err != nil { + if err := lbc.LBProvider.Stop(); err != nil { return err } close(lbc.stopCh) @@ -144,7 +145,7 @@ func (lbc *loadBalancerController) Stop() error { return fmt.Errorf("shutdown already in progress") } -func (lbc *loadBalancerController) BuildConfigFromMetadata(lbName string, lbMeta *LBMetadata) ([]*config.LoadBalancerConfig, error) { +func (lbc *LoadBalancerController) BuildConfigFromMetadata(lbName string, envUUID string, lbMeta *LBMetadata) ([]*config.LoadBalancerConfig, error) { lbConfigs := []*config.LoadBalancerConfig{} if lbMeta == nil { lbMeta = &LBMetadata{ @@ -158,13 +159,13 @@ func (lbc *loadBalancerController) BuildConfigFromMetadata(lbName string, lbMeta // fetch certificates certs := []*config.Certificate{} for _, certName := range lbMeta.Certs { - cert, err := lbc.certFetcher.fetchCertificate(certName) + cert, err := lbc.CertFetcher.FetchCertificate(certName) if err != nil { return nil, err } certs = append(certs, cert) } - defaultCert, err := lbc.certFetcher.fetchCertificate(lbMeta.DefaultCert) + defaultCert, err := lbc.CertFetcher.FetchCertificate(lbMeta.DefaultCert) if err != nil { return nil, err } @@ -198,7 +199,7 @@ func (lbc *loadBalancerController) BuildConfigFromMetadata(lbName string, lbMeta // service comes in a format of stackName/serviceName, // replace "/"" with "_" svcName := strings.SplitN(rule.Service, "/", 2) - service, err := lbc.metaFetcher.GetService(svcName[1], svcName[0]) + service, err := lbc.MetaFetcher.GetService(envUUID, svcName[1], svcName[0]) if err != nil { return nil, err } @@ -287,7 +288,7 @@ func (lbc *loadBalancerController) BuildConfigFromMetadata(lbName string, lbMeta StickinessPolicy: &lbMeta.StickinessPolicy, } - if err = lbc.lbProvider.ProcessCustomConfig(lbConfig, lbMeta.Config); err != nil { + if err = lbc.LBProvider.ProcessCustomConfig(lbConfig, lbMeta.Config); err != nil { return nil, err } @@ -296,32 +297,28 @@ func (lbc *loadBalancerController) BuildConfigFromMetadata(lbName string, lbMeta return lbConfigs, nil } -func (mf rMetaFetcher) GetSelfService() (metadata.Service, error) { - return mf.metadataClient.GetSelfService() +func (mf RMetaFetcher) GetSelfService() (metadata.Service, error) { + return mf.MetadataClient.GetSelfService() } -func (lbc *loadBalancerController) GetLBConfigs() ([]*config.LoadBalancerConfig, error) { - lbSvc, err := lbc.metaFetcher.GetSelfService() +func (lbc *LoadBalancerController) GetLBConfigs() ([]*config.LoadBalancerConfig, error) { + lbSvc, err := lbc.MetaFetcher.GetSelfService() if err != nil { return nil, err } - lbMeta, err := lbc.collectLBMetadata(lbSvc) + lbMeta, err := lbc.CollectLBMetadata(lbSvc) if err != nil { return nil, err } - return lbc.BuildConfigFromMetadata(lbSvc.Name, lbMeta) + return lbc.BuildConfigFromMetadata(lbSvc.Name, lbSvc.EnvironmentUUID, lbMeta) } -func (lbc *loadBalancerController) collectLBMetadata(lbSvc metadata.Service) (*LBMetadata, error) { +func (lbc *LoadBalancerController) CollectLBMetadata(lbSvc metadata.Service) (*LBMetadata, error) { lbConfig := lbSvc.LBConfig - if len(lbConfig.PortRules) == 0 { - logrus.Debugf("Metadata is empty for the service %v", lbSvc.Name) - return nil, nil - } - lbMeta, err := getLBMetadata(lbConfig) + lbMeta, err := GetLBMetadata(lbConfig) if err != nil { return nil, err } @@ -333,10 +330,10 @@ func (lbc *loadBalancerController) collectLBMetadata(lbSvc metadata.Service) (*L return lbMeta, nil } -func (lbc *loadBalancerController) processSelector(lbMeta *LBMetadata) error { +func (lbc *LoadBalancerController) processSelector(lbMeta *LBMetadata) error { //collect selector based services var rules []metadata.PortRule - svcs, err := lbc.metaFetcher.GetServices() + svcs, err := lbc.MetaFetcher.GetServices() if err != nil { return err } @@ -354,7 +351,7 @@ func (lbc *loadBalancerController) processSelector(lbMeta *LBMetadata) error { continue } - meta, err := getLBMetadata(lbConfig) + meta, err := GetLBMetadata(lbConfig) if err != nil { return err } @@ -377,7 +374,7 @@ func (lbc *loadBalancerController) processSelector(lbMeta *LBMetadata) error { return nil } -func (fetcher *rCertificateFetcher) fetchCertificate(certName string) (*config.Certificate, error) { +func (fetcher *RCertificateFetcher) FetchCertificate(certName string) (*config.Certificate, error) { if certName == "" { return nil, nil } @@ -385,7 +382,7 @@ func (fetcher *rCertificateFetcher) fetchCertificate(certName string) (*config.C opts.Filters["name"] = certName opts.Filters["removed_null"] = "1" - certs, err := fetcher.client.Certificate.List(opts) + certs, err := fetcher.Client.Certificate.List(opts) if err != nil { return nil, fmt.Errorf("Coudln't get certificate by name [%s]. Error: %#v", certName, err) } @@ -402,6 +399,29 @@ func (fetcher *rCertificateFetcher) fetchCertificate(certName string) (*config.C }, nil } +func (fetcher *RCertificateFetcher) UpdateEndpoints(lbSvc *metadata.Service, eps []client.PublicEndpoint) error { + opts := client.NewListOpts() + opts.Filters["uuid"] = lbSvc.UUID + opts.Filters["removed_null"] = "1" + lbs, err := fetcher.Client.LoadBalancerService.List(opts) + if err != nil { + return fmt.Errorf("Coudln't get LB service by uuid [%s]. Error: %#v", lbSvc.UUID, err) + } + if len(lbs.Data) == 0 { + logrus.Infof("Failed to find lb by uuid %s", lbSvc.UUID) + return nil + } + lb := lbs.Data[0] + + toUpdate := make(map[string]interface{}) + toUpdate["publicEndpoints"] = eps + logrus.Infof("Updating Rancher LB [%s] in stack [%s] with the new public endpoints [%v] ", lbSvc.Name, lbSvc.StackName, eps) + if _, err := fetcher.Client.LoadBalancerService.Update(&lb, toUpdate); err != nil { + return fmt.Errorf("Failed to update Rancher LB [%s] in stack [%s]. Error: %#v", lbSvc.Name, lbSvc.StackName, err) + } + return nil +} + func getServiceHealthCheck(svc *metadata.Service) (*config.HealthCheck, error) { if &svc.HealthCheck == nil { return nil, nil @@ -409,17 +429,21 @@ func getServiceHealthCheck(svc *metadata.Service) (*config.HealthCheck, error) { return getConfigServiceHealthCheck(svc.HealthCheck) } -func (mf rMetaFetcher) GetServices() ([]metadata.Service, error) { - return mf.metadataClient.GetServices() +func (mf RMetaFetcher) GetServices() ([]metadata.Service, error) { + return mf.MetadataClient.GetServices() } -func (mf rMetaFetcher) GetService(svcName string, stackName string) (*metadata.Service, error) { - svcs, err := mf.metadataClient.GetServices() +func (mf RMetaFetcher) GetService(envUUID string, svcName string, stackName string) (*metadata.Service, error) { + svcs, err := mf.MetadataClient.GetServices() if err != nil { return nil, err } var service metadata.Service for _, svc := range svcs { + //only consider services from the same environment + if !strings.EqualFold(svc.EnvironmentUUID, envUUID) { + continue + } if strings.EqualFold(svc.Name, svcName) && strings.EqualFold(svc.StackName, stackName) { service = svc break @@ -428,7 +452,7 @@ func (mf rMetaFetcher) GetService(svcName string, stackName string) (*metadata.S return &service, nil } -func (lbc *loadBalancerController) getServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) (config.Endpoints, error) { +func (lbc *LoadBalancerController) getServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) (config.Endpoints, error) { var eps config.Endpoints var err error if strings.EqualFold(svc.Kind, "externalService") { @@ -447,11 +471,11 @@ func (lbc *loadBalancerController) getServiceEndpoints(svc *metadata.Service, ta return eps, nil } -func (lbc *loadBalancerController) getAliasServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) (config.Endpoints, error) { +func (lbc *LoadBalancerController) getAliasServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) (config.Endpoints, error) { var eps config.Endpoints for link := range svc.Links { svcName := strings.SplitN(link, "/", 2) - service, err := lbc.metaFetcher.GetService(svcName[1], svcName[0]) + service, err := lbc.MetaFetcher.GetService(svc.EnvironmentUUID, svcName[1], svcName[0]) if err != nil { return nil, err } @@ -467,7 +491,7 @@ func (lbc *loadBalancerController) getAliasServiceEndpoints(svc *metadata.Servic return eps, nil } -func (lbc *loadBalancerController) getExternalServiceEndpoints(svc *metadata.Service, targetPort int) config.Endpoints { +func (lbc *LoadBalancerController) getExternalServiceEndpoints(svc *metadata.Service, targetPort int) config.Endpoints { var eps config.Endpoints for _, e := range svc.ExternalIps { ep := &config.Endpoint{ @@ -480,7 +504,7 @@ func (lbc *loadBalancerController) getExternalServiceEndpoints(svc *metadata.Ser return eps } -func (lbc *loadBalancerController) getRegularServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) config.Endpoints { +func (lbc *LoadBalancerController) getRegularServiceEndpoints(svc *metadata.Service, targetPort int, activeOnly bool) config.Endpoints { var eps config.Endpoints for _, c := range svc.Containers { if strings.EqualFold(c.State, "running") || strings.EqualFold(c.State, "starting") { @@ -495,12 +519,12 @@ func (lbc *loadBalancerController) getRegularServiceEndpoints(svc *metadata.Serv return eps } -func (lbc *loadBalancerController) IsHealthy() bool { +func (lbc *LoadBalancerController) IsHealthy() bool { return true } -func newLoadBalancerController() (*loadBalancerController, error) { - lbc := &loadBalancerController{ +func NewLoadBalancerController() (*LoadBalancerController, error) { + lbc := &LoadBalancerController{ stopCh: make(chan struct{}), incrementalBackoff: 0, incrementalBackoffInterval: 5, @@ -510,7 +534,7 @@ func newLoadBalancerController() (*loadBalancerController, error) { return lbc, nil } -func (lbc *loadBalancerController) sync(key string) { +func (lbc *LoadBalancerController) sync(key string) { if lbc.shutdown { //skip syncing if controller is being shut down return @@ -520,7 +544,7 @@ func (lbc *loadBalancerController) sync(key string) { cfgs, err := lbc.GetLBConfigs() if err == nil { for _, cfg := range cfgs { - if err := lbc.lbProvider.ApplyConfig(cfg); err != nil { + if err := lbc.LBProvider.ApplyConfig(cfg); err != nil { logrus.Errorf("Failed to apply lb config on provider: %v", err) requeue = true } @@ -538,7 +562,7 @@ func (lbc *loadBalancerController) sync(key string) { } } -func (lbc *loadBalancerController) requeue(key string) { +func (lbc *LoadBalancerController) requeue(key string) { // requeue only when after incremental backoff time lbc.incrementalBackoff = lbc.incrementalBackoff + lbc.incrementalBackoffInterval time.Sleep(time.Duration(lbc.incrementalBackoff) * time.Second) diff --git a/controller/rancher/rancher_test.go b/controller/rancher/rancher_test.go index 393d065..d6e9458 100644 --- a/controller/rancher/rancher_test.go +++ b/controller/rancher/rancher_test.go @@ -2,22 +2,23 @@ package rancher import ( "github.com/rancher/go-rancher-metadata/metadata" + "github.com/rancher/go-rancher/v2" "github.com/rancher/lb-controller/config" utils "github.com/rancher/lb-controller/utils" "strings" "testing" ) -var lbc *loadBalancerController +var lbc *LoadBalancerController func init() { - lbc = &loadBalancerController{ + lbc = &LoadBalancerController{ stopCh: make(chan struct{}), incrementalBackoff: 0, incrementalBackoffInterval: 5, - metaFetcher: tMetaFetcher{}, - certFetcher: tCertFetcher{}, - lbProvider: &tProvider{}, + MetaFetcher: tMetaFetcher{}, + CertFetcher: tCertFetcher{}, + LBProvider: &tProvider{}, } } @@ -45,7 +46,7 @@ func TestTCPRuleFields(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) be := configs[0].FrontendServices[0].BackendServices[0] if be.Host != "" { @@ -77,7 +78,7 @@ func TestTwoRunningServices(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) eps := configs[0].FrontendServices[0].BackendServices[0].Endpoints if len(eps) != 3 { @@ -107,7 +108,7 @@ func TestTwoSourcePorts(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) fes := configs[0].FrontendServices if len(fes) != 2 { @@ -143,7 +144,7 @@ func TestOneSourcePortTwoRules(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) fes := configs[0].FrontendServices if len(fes) != 1 { @@ -177,7 +178,7 @@ func TestStoppedAndRunningInstance(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) eps := configs[0].FrontendServices[0].BackendServices[0].Endpoints if len(eps) != 1 { @@ -198,7 +199,7 @@ func TestStoppedInstance(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) eps := configs[0].FrontendServices[0].BackendServices[0].Endpoints if len(eps) != 0 { @@ -259,7 +260,7 @@ func TestPriority(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) bes := configs[0].FrontendServices[0].BackendServices if len(bes) != 5 { @@ -309,7 +310,7 @@ func TestPriorityExtra(t *testing.T) { PortRules: portRules, } - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) bes := configs[0].FrontendServices[0].BackendServices if len(bes) != 2 { @@ -332,7 +333,7 @@ func TestPriorityExtra(t *testing.T) { PortRules: portRules, } - configs, _ = lbc.BuildConfigFromMetadata("test", meta) + configs, _ = lbc.BuildConfigFromMetadata("test", "", meta) bes = configs[0].FrontendServices[0].BackendServices if len(bes) != 2 { t.Fatalf("Invalid backend length [%v]", len(bes)) @@ -363,7 +364,7 @@ func TestRuleFields(t *testing.T) { PortRules: portRules, } - configs, err := lbc.BuildConfigFromMetadata("test", meta) + configs, err := lbc.BuildConfigFromMetadata("test", "", meta) if err != nil { t.Fatalf("Failed to build the config from metadata %v", err) } @@ -457,7 +458,7 @@ func (mf tMetaFetcher) GetServices() ([]metadata.Service, error) { return svcs, nil } -func (mf tMetaFetcher) GetService(svcName string, stackName string) (*metadata.Service, error) { +func (mf tMetaFetcher) GetService(envUUID string, svcName string, stackName string) (*metadata.Service, error) { var svc *metadata.Service if strings.EqualFold(svcName, "foo") { svc = &metadata.Service{ @@ -549,10 +550,14 @@ func (mf tMetaFetcher) GetSelfService() (metadata.Service, error) { func (mf tMetaFetcher) OnChange(intervalSeconds int, do func(string)) { } -func (cf tCertFetcher) fetchCertificate(certName string) (*config.Certificate, error) { +func (cf tCertFetcher) FetchCertificate(certName string) (*config.Certificate, error) { return nil, nil } +func (cf tCertFetcher) UpdateEndpoints(lbSvc *metadata.Service, eps []client.PublicEndpoint) error { + return nil +} + func (p *tProvider) ApplyConfig(lbConfig *config.LoadBalancerConfig) error { return nil } @@ -597,7 +602,7 @@ func TestSelectorNoMatch(t *testing.T) { lbc.processSelector(meta) - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) if len(configs[0].FrontendServices) != 0 { t.Fatalf("Incorrect number of frontend services %v", len(configs[0].FrontendServices)) @@ -618,7 +623,7 @@ func TestSelectorMatch(t *testing.T) { lbc.processSelector(meta) - configs, _ := lbc.BuildConfigFromMetadata("test", meta) + configs, _ := lbc.BuildConfigFromMetadata("test", "", meta) fe := configs[0].FrontendServices[0] if len(fe.BackendServices) == 0 { diff --git a/controller/rancherglb/rancherglb.go b/controller/rancherglb/rancherglb.go new file mode 100644 index 0000000..17001b2 --- /dev/null +++ b/controller/rancherglb/rancherglb.go @@ -0,0 +1,382 @@ +package rancherglb + +import ( + "fmt" + "github.com/Sirupsen/logrus" + "github.com/patrickmn/go-cache" + "github.com/rancher/go-rancher-metadata/metadata" + "github.com/rancher/go-rancher/v2" + "github.com/rancher/lb-controller/config" + "github.com/rancher/lb-controller/controller" + "github.com/rancher/lb-controller/controller/rancher" + "github.com/rancher/lb-controller/provider" + utils "github.com/rancher/lb-controller/utils" + "reflect" + "sort" + "strconv" + "strings" + "time" +) + +const ( + metadataURL = "http://rancher-metadata/2015-12-19" +) + +func init() { + lbc, err := newGLBController() + if err != nil { + logrus.Fatalf("%v", err) + } + + controller.RegisterController(lbc.GetName(), lbc) +} + +func (lbc *glbController) Init() { + lbc.rancherController.Init() +} + +type glbController struct { + shutdown bool + stopCh chan struct{} + lbProvider provider.LBProvider + syncQueue *utils.TaskQueue + opts *client.ClientOpts + incrementalBackoff int64 + incrementalBackoffInterval int64 + metaFetcher MetadataFetcher + rancherController *rancher.LoadBalancerController + endpointsCache *cache.Cache +} + +type MetadataFetcher interface { + GetSelfService() (metadata.Service, error) + GetService(envUUID string, svcName string, stackName string) (*metadata.Service, error) + OnChange(intervalSeconds int, do func(string)) + GetServices() ([]metadata.Service, error) +} + +func (lbc *glbController) GetName() string { + return "rancherglb" +} + +func newGLBController() (*glbController, error) { + lbc, err := rancher.NewLoadBalancerController() + if err != nil { + return nil, err + } + + c := cache.New(1*time.Hour, 1*time.Minute) + + glb := &glbController{ + stopCh: make(chan struct{}), + incrementalBackoff: 0, + incrementalBackoffInterval: 5, + rancherController: lbc, + endpointsCache: c, + } + glb.syncQueue = utils.NewTaskQueue(glb.sync) + + return glb, nil +} + +func (lbc *glbController) sync(key string) { + if lbc.shutdown { + //skip syncing if controller is being shut down + return + } + logrus.Debugf("Syncing up LB") + requeue := false + cfgs, err := lbc.GetLBConfigs() + if err == nil { + for _, cfg := range cfgs { + if err := lbc.lbProvider.ApplyConfig(cfg); err != nil { + logrus.Errorf("Failed to apply lb config on provider: %v", err) + requeue = true + } + } + } else { + logrus.Errorf("Failed to get lb config: %v", err) + requeue = true + } + + if requeue { + go lbc.requeue(key) + } else { + //clear up the backoff + lbc.incrementalBackoff = 0 + } +} + +func (lbc *glbController) GetLBConfigs() ([]*config.LoadBalancerConfig, error) { + glbSvc, err := lbc.metaFetcher.GetSelfService() + if err != nil { + return nil, err + } + return lbc.GetGLBConfigs(glbSvc) +} + +func (lbc *glbController) GetGLBConfigs(glbSvc metadata.Service) ([]*config.LoadBalancerConfig, error) { + glbMeta, err := rancher.GetLBMetadata(glbSvc.LBConfig) + if err != nil { + return nil, err + } + + var lbSvcs []metadata.Service + svcs, err := lbc.metaFetcher.GetServices() + if err != nil { + return nil, err + } + + for _, svc := range svcs { + if !strings.EqualFold(svc.Kind, "loadBalancerService") { + continue + } + lbSvcs = append(lbSvcs, svc) + } + + //for every lb service, get metadata and build the config + var configs []*config.LoadBalancerConfig + for _, glbRule := range glbMeta.PortRules { + for _, lbSvc := range lbSvcs { + if !rancher.IsSelectorMatch(glbRule.Selector, lbSvc.Labels) { + continue + } + lbConfig := lbSvc.LBConfig + if len(lbConfig.PortRules) == 0 { + logrus.Info("port rules is 0") + continue + } + sourcePort := glbRule.SourcePort + proto := glbRule.Protocol + lbMeta, err := lbc.rancherController.CollectLBMetadata(lbSvc) + if err != nil { + return nil, err + } + cs, err := lbc.rancherController.BuildConfigFromMetadata(lbSvc.Name, lbSvc.EnvironmentUUID, lbMeta) + if err != nil { + return nil, err + } + + //source port and proto from the glb + for _, cs := range cs { + for _, fe := range cs.FrontendServices { + fe.Port = sourcePort + fe.Protocol = proto + } + } + + configs = append(configs, cs...) + + //update endpoints + eps, err := getEndpoints(&glbSvc, sourcePort) + if err != nil { + return nil, err + } + if err := lbc.updateEndpoints(&lbSvc, eps); err != nil { + return nil, err + } + } + } + + merged, err := lbc.mergeConfigs(glbSvc, configs) + if err != nil { + return nil, err + } + for _, config := range merged { + if err = lbc.lbProvider.ProcessCustomConfig(config, glbMeta.Config); err != nil { + return nil, err + } + } + return merged, nil +} + +func (lbc *glbController) needEndpointsUpdate(lbSvc *metadata.Service, eps []client.PublicEndpoint) bool { + previousEps, _ := lbc.endpointsCache.Get(lbSvc.UUID) + logrus.Debugf("previous endpoints are %v", previousEps) + return !reflect.DeepEqual(previousEps, eps) +} + +func (lbc *glbController) updateEndpoints(lbSvc *metadata.Service, eps []client.PublicEndpoint) error { + logrus.Debugf("endpoints are %v", eps) + if !lbc.needEndpointsUpdate(lbSvc, eps) { + logrus.Debug("no need to update endpoints") + return nil + } + if err := lbc.rancherController.CertFetcher.UpdateEndpoints(lbSvc, eps); err != nil { + return err + } + lbc.endpointsCache.Set(lbSvc.UUID, eps, cache.DefaultExpiration) + return nil +} + +func getEndpoints(glbSvc *metadata.Service, lbPort int) ([]client.PublicEndpoint, error) { + var publicEndpoints []client.PublicEndpoint + for _, c := range glbSvc.Containers { + for _, port := range c.Ports { + splitted := strings.Split(port, ":") + port, err := strconv.Atoi(splitted[1]) + if err != nil { + return nil, err + } + if port != lbPort { + logrus.Infof("ports are diff, %v and %v", port, lbPort) + continue + } + pE := client.PublicEndpoint{ + IpAddress: splitted[0], + Port: int64(port), + } + publicEndpoints = append(publicEndpoints, pE) + } + } + return publicEndpoints, nil +} + +/* +merge host name routing rules from all the configs +*/ +func (lbc *glbController) mergeConfigs(glbSvc metadata.Service, configs []*config.LoadBalancerConfig) ([]*config.LoadBalancerConfig, error) { + var merged []*config.LoadBalancerConfig + frontendsMap := map[string]*config.FrontendService{} + // 1. merge frontends and their backends + for _, lbConfig := range configs { + for _, fe := range lbConfig.FrontendServices { + name := strconv.Itoa(fe.Port) + if val, ok := frontendsMap[name]; ok { + existing := val + existing.BackendServices = append(existing.BackendServices, fe.BackendServices...) + } else { + frontendsMap[name] = fe + } + } + } + + // 2. merge backend services + for _, fe := range frontendsMap { + bes := map[string]*config.BackendService{} + for _, be := range fe.BackendServices { + pathUUID := fmt.Sprintf("%v_%s_%s_%s", fe.Port, be.Host, be.Path, be.RuleComparator) + existing := bes[pathUUID] + if existing == nil { + existing = be + } else { + existing.Endpoints = append(existing.Endpoints, be.Endpoints...) + } + bes[pathUUID] = existing + } + backends := []*config.BackendService{} + for _, be := range bes { + backends = append(backends, be) + } + fe.BackendServices = backends + } + + // 3. sort frontends and backends + var frontends config.FrontendServices + for _, v := range frontendsMap { + // sort endpoints + for _, b := range v.BackendServices { + sort.Sort(b.Endpoints) + } + // sort backends + sort.Sort(v.BackendServices) + frontends = append(frontends, v) + } + //sort frontends + sort.Sort(frontends) + + //get glb info + glbMeta, err := lbc.rancherController.CollectLBMetadata(glbSvc) + if err != nil { + return nil, err + } + certs := []*config.Certificate{} + for _, certName := range glbMeta.Certs { + cert, err := lbc.rancherController.CertFetcher.FetchCertificate(certName) + if err != nil { + return nil, err + } + certs = append(certs, cert) + } + + var defaultCert *config.Certificate + if glbMeta.DefaultCert != "" { + defaultCert, err = lbc.rancherController.CertFetcher.FetchCertificate(glbMeta.DefaultCert) + if err != nil { + return nil, err + } + + if defaultCert != nil { + certs = append(certs, defaultCert) + } + } + + lbConfig := &config.LoadBalancerConfig{ + Name: glbSvc.Name, + FrontendServices: frontends, + Certs: certs, + DefaultCert: defaultCert, + StickinessPolicy: &glbMeta.StickinessPolicy, + } + + merged = append(merged, lbConfig) + + return merged, nil +} + +func (lbc *glbController) Run(provider provider.LBProvider) { + logrus.Infof("starting %s controller", lbc.GetName()) + lbc.lbProvider = provider + lbc.rancherController.LBProvider = provider + go lbc.syncQueue.Run(time.Second, lbc.stopCh) + + go lbc.lbProvider.Run(nil) + + metadataClient, err := metadata.NewClientAndWait(metadataURL) + if err != nil { + logrus.Errorf("Error initiating metadata client: %v", err) + lbc.Stop() + } + + lbc.metaFetcher = rancher.RMetaFetcher{ + MetadataClient: metadataClient, + } + + lbc.rancherController.MetaFetcher = rancher.RMetaFetcher{ + MetadataClient: metadataClient, + } + + lbc.metaFetcher.OnChange(5, lbc.ScheduleApplyConfig) + + <-lbc.stopCh +} + +func (lbc *glbController) Stop() error { + if !lbc.shutdown { + logrus.Infof("Shutting down %s controller", lbc.GetName()) + //stop the provider + if err := lbc.lbProvider.Stop(); err != nil { + return err + } + close(lbc.stopCh) + lbc.shutdown = true + } + + return fmt.Errorf("shutdown already in progress") +} + +func (lbc *glbController) IsHealthy() bool { + return true +} + +func (lbc *glbController) ScheduleApplyConfig(string) { + logrus.Debug("Scheduling apply config") + lbc.syncQueue.Enqueue(lbc.GetName()) +} + +func (lbc *glbController) requeue(key string) { + // requeue only when after incremental backoff time + lbc.incrementalBackoff = lbc.incrementalBackoff + lbc.incrementalBackoffInterval + time.Sleep(time.Duration(lbc.incrementalBackoff) * time.Second) + lbc.syncQueue.Requeue(key, fmt.Errorf("retrying sync as one of the configs failed to apply on a backend")) +} diff --git a/controller/rancherglb/rancherglb_test.go b/controller/rancherglb/rancherglb_test.go new file mode 100644 index 0000000..62dafd7 --- /dev/null +++ b/controller/rancherglb/rancherglb_test.go @@ -0,0 +1,403 @@ +package rancherglb + +import ( + // "github.com/Sirupsen/logrus" + "github.com/patrickmn/go-cache" + "github.com/rancher/go-rancher-metadata/metadata" + "github.com/rancher/go-rancher/v2" + "github.com/rancher/lb-controller/config" + "github.com/rancher/lb-controller/controller/rancher" + utils "github.com/rancher/lb-controller/utils" + "strings" + "testing" + "time" +) + +var glb *glbController + +func init() { + lbc := &rancher.LoadBalancerController{ + MetaFetcher: tMetaFetcher{}, + CertFetcher: tCertFetcher{}, + LBProvider: &tProvider{}, + } + + glb = &glbController{ + stopCh: make(chan struct{}), + incrementalBackoff: 0, + incrementalBackoffInterval: 5, + rancherController: lbc, + metaFetcher: tMetaFetcher{}, + lbProvider: &tProvider{}, + endpointsCache: cache.New(1*time.Hour, 1*time.Minute), + } +} + +type tProvider struct { +} + +type tCertFetcher struct { +} + +type tMetaFetcher struct { +} + +func TestBasicCaseTwoServices(t *testing.T) { + configs, err := glb.GetLBConfigs() + if err != nil { + t.Fatalf("Error getting configs: %s", err) + } + + if len(configs) != 1 { + t.Fatalf("Incorrect number of configs, expected 1, actual: %v", len(configs)) + } + + fes := configs[0].FrontendServices + if len(fes) != 1 { + t.Fatalf("Incorrect number of frontends, expected 1, actual: %v", len(fes)) + } + + bes := fes[0].BackendServices + if len(bes) != 2 { + t.Fatalf("Incorrect number of backends, expected 2, actual: %v", len(bes)) + } + + for _, be := range bes { + eps := be.Endpoints + if be.Host == "foo.com" { + if len(eps) != 1 { + t.Fatalf("Incorrect number of endpoints for foo service, expected 1, actual: %v", len(eps)) + } + } else if be.Host == "bar.com" { + if len(eps) != 2 { + t.Fatalf("Incorrect number of endpoints for bar service, expected 2, actual: %v", len(eps)) + } + } + } + + config := configs[0] + if config.DefaultCert == nil { + t.Fatal("Default certificate is not set") + } + if len(config.Certs) != 1 { + t.Fatalf("Incorrect number of certs, expected 1, actual: %v", len(config.Certs)) + } +} + +func TestTwoServicesMerge(t *testing.T) { + var portRules []metadata.PortRule + portRule := metadata.PortRule{ + SourcePort: 80, + Protocol: "http", + Selector: "http=true", + } + portRules = append(portRules, portRule) + lbConfig := metadata.LBConfig{ + PortRules: portRules, + } + + glbSvc := metadata.Service{ + Kind: "loadBalancerService", + LBConfig: lbConfig, + Name: "glb", + StackName: "glb", + UUID: "glb", + } + configs, err := glb.GetGLBConfigs(glbSvc) + if err != nil { + t.Fatalf("Error getting configs: %s", err) + } + + if len(configs) != 1 { + t.Fatalf("Incorrect number of configs, expected 1, actual: %v", len(configs)) + } + + fes := configs[0].FrontendServices + if len(fes) != 1 { + t.Fatalf("Incorrect number of frontends, expected 1, actual: %v", len(fes)) + } + + fe := fes[0] + + bes := fe.BackendServices + if len(bes) != 1 { + t.Fatalf("Incorrect number of backends, expected 1, actual: %v", len(bes)) + } + be := bes[0] + if be.Host != "foo.com" { + t.Fatalf("Incorrect hostname, expected foo.com, actual: %v", be.Host) + } + + if be.Path != "/foo" { + t.Fatalf("Incorrect path, expected /foo, actual: %v", be.Path) + } + eps := be.Endpoints + if len(eps) != 2 { + t.Fatalf("Incorrect number of endpoints, expected 2, actual: %v", len(eps)) + } + + for _, ep := range eps { + if ep.IP != "10.1.1.1" && ep.IP != "10.1.1.3" { + t.Fatalf("Incorrect ip, expected either 10.1.1.1/3, actual: %v", ep.IP) + } + if ep.IP == "10.1.1.1" { + if ep.Port != 101 { + t.Fatalf("Incorrect port for foo's container ip, expected 101, actual: %v", ep.Port) + } + } else if ep.IP == "10.1.1.3" { + if ep.Port != 103 { + t.Fatalf("Incorrect port for foodup's container ip, expected 103, actual: %v", ep.Port) + } + } + } + +} + +func (mf tMetaFetcher) GetServices() ([]metadata.Service, error) { + var svcs []metadata.Service + + foo, err := mf.GetService("", "foo", "foo") + if err != nil { + return nil, err + } + svcs = append(svcs, *foo) + foodup, err := mf.GetService("", "foodup", "foo") + if err != nil { + return nil, err + } + svcs = append(svcs, *foodup) + bar, err := mf.GetService("", "bar", "bar") + if err != nil { + return nil, err + } + svcs = append(svcs, *bar) + lbBar, err := mf.GetService("", "lbbar", "bar") + if err != nil { + return nil, err + } + svcs = append(svcs, *lbBar) + + lbFoo, err := mf.GetService("", "lbfoo", "foo") + if err != nil { + return nil, err + } + svcs = append(svcs, *lbFoo) + + lbFooDup, err := mf.GetService("", "lbfoodup", "foo") + if err != nil { + return nil, err + } + svcs = append(svcs, *lbFooDup) + + return svcs, nil +} + +func (mf tMetaFetcher) GetService(envUUID string, svcName string, stackName string) (*metadata.Service, error) { + var svc *metadata.Service + + if strings.EqualFold(svcName, "foo") { + foo := metadata.Service{ + Kind: "service", + Containers: getContainers("foo"), + Name: "foo", + StackName: "foo", + } + svc = &foo + } else if strings.EqualFold(svcName, "bar") { + bar := metadata.Service{ + Kind: "service", + Containers: getContainers("bar"), + Name: "bar", + StackName: "bar", + } + svc = &bar + } else if strings.EqualFold(svcName, "lbbar") { + + port := metadata.PortRule{ + SourcePort: 80, + Path: "/bar", + Hostname: "bar.com", + Service: "bar/bar", + Protocol: "http", + TargetPort: 102, + } + var portRules []metadata.PortRule + portRules = append(portRules, port) + lbConfig := metadata.LBConfig{ + PortRules: portRules, + } + labels := make(map[string]string) + labels["glbself"] = "true" + lbbar := metadata.Service{ + Kind: "loadBalancerService", + LBConfig: lbConfig, + Name: "lbbar", + UUID: "lbbar", + StackName: "bar", + Labels: labels, + } + svc = &lbbar + } else if strings.EqualFold(svcName, "lbfoo") { + port := metadata.PortRule{ + SourcePort: 80, + Path: "/foo", + Hostname: "foo.com", + Service: "foo/foo", + Protocol: "http", + TargetPort: 101, + } + var portRules []metadata.PortRule + portRules = append(portRules, port) + lbConfig := metadata.LBConfig{ + PortRules: portRules, + } + labels := make(map[string]string) + labels["http"] = "true" + labels["glbself"] = "true" + lbfoo := metadata.Service{ + Kind: "loadBalancerService", + LBConfig: lbConfig, + Name: "lbfoo", + UUID: "lbfoo", + StackName: "foo", + Labels: labels, + } + svc = &lbfoo + } else if strings.EqualFold(svcName, "glb") { + self, err := glb.metaFetcher.GetSelfService() + if err != nil { + return nil, err + } + svc = &self + } else if strings.EqualFold(svcName, "lbfoodup") { + port := metadata.PortRule{ + SourcePort: 80, + Path: "/foo", + Hostname: "foo.com", + Service: "foo/foodup", + Protocol: "http", + TargetPort: 103, + } + var portRules []metadata.PortRule + portRules = append(portRules, port) + lbConfig := metadata.LBConfig{ + PortRules: portRules, + } + labels := make(map[string]string) + labels["http"] = "true" + lbfoodup := metadata.Service{ + Kind: "loadBalancerService", + LBConfig: lbConfig, + Name: "lbfoodup", + UUID: "lbfooddup", + StackName: "foo", + Labels: labels, + } + svc = &lbfoodup + } else if strings.EqualFold(svcName, "foodup") { + foo := metadata.Service{ + Kind: "service", + Containers: getContainers("foodup"), + Name: "foodup", + StackName: "foo", + } + svc = &foo + } + + return svc, nil +} + +func getContainers(svcName string) []metadata.Container { + containers := []metadata.Container{} + if strings.EqualFold(svcName, "foo") { + c := metadata.Container{ + PrimaryIp: "10.1.1.1", + State: "running", + } + containers = append(containers, c) + } else if strings.EqualFold(svcName, "bar") { + c1 := metadata.Container{ + PrimaryIp: "10.1.1.2", + State: "running", + } + c2 := metadata.Container{ + PrimaryIp: "10.1.1.22", + State: "running", + } + containers = append(containers, c1, c2) + } else if strings.EqualFold(svcName, "foodup") { + c := metadata.Container{ + PrimaryIp: "10.1.1.3", + State: "running", + } + containers = append(containers, c) + } + return containers +} + +func (mf tMetaFetcher) GetSelfService() (metadata.Service, error) { + defaultCert := "glbcert" + portRule := metadata.PortRule{ + SourcePort: 80, + Protocol: "http", + Selector: "glbself=true", + } + var portRules []metadata.PortRule + portRules = append(portRules, portRule) + lbConfig := metadata.LBConfig{ + DefaultCert: defaultCert, + PortRules: portRules, + } + lbfoo := metadata.Service{ + Kind: "loadBalancerService", + LBConfig: lbConfig, + Name: "glb", + UUID: "glb", + StackName: "glb", + } + return lbfoo, nil +} + +func (mf tMetaFetcher) OnChange(intervalSeconds int, do func(string)) { +} + +func (cf tCertFetcher) FetchCertificate(certName string) (*config.Certificate, error) { + if certName == "" { + return nil, nil + } + return &config.Certificate{}, nil +} + +func (cf tCertFetcher) UpdateEndpoints(lbSvc *metadata.Service, eps []client.PublicEndpoint) error { + return nil +} + +func (p *tProvider) ApplyConfig(lbConfig *config.LoadBalancerConfig) error { + return nil +} +func (p *tProvider) GetName() string { + return "" +} + +func (p *tProvider) GetPublicEndpoints(configName string) []string { + return []string{} +} + +func (p *tProvider) CleanupConfig(configName string) error { + return nil +} + +func (p *tProvider) Run(syncEndpointsQueue *utils.TaskQueue) { +} + +func (p *tProvider) Stop() error { + return nil +} + +func (p *tProvider) IsHealthy() bool { + return true +} + +func (p *tProvider) ProcessCustomConfig(lbConfig *config.LoadBalancerConfig, customConfig string) error { + return nil +} diff --git a/plugins.go b/plugins.go index b5fefd9..e979efe 100644 --- a/plugins.go +++ b/plugins.go @@ -4,6 +4,7 @@ import ( // controllers _ "github.com/rancher/lb-controller/controller/kubernetes" _ "github.com/rancher/lb-controller/controller/rancher" + _ "github.com/rancher/lb-controller/controller/rancherglb" //providers _ "github.com/rancher/lb-controller/provider/haproxy" diff --git a/trash.conf b/trash.conf index fecf4e1..b33e745 100644 --- a/trash.conf +++ b/trash.conf @@ -41,8 +41,9 @@ golang.org/x/sys a646d33 google.golang.org/appengine 267c27e google.golang.org/cloud 4f1a5ca gopkg.in/inf.v0 v0.9.0 -github.com/rancher/go-rancher-metadata c48cdb6a5f2b96797f4163f47ceb5fa387ab71c4 +github.com/rancher/go-rancher-metadata 4c6aeec78cf34419855c88e47066f816b9361569 k8s.io/kubernetes v1.4.4 github.com/pkg/errors 1d2e60385a13aaa66134984235061c2f9302520e k8s.io/client-go/1.4 93fcd402979cfad8a7151f96e016416947c6a3cb k8s.io/kubernetes/pkg/controller/framework dfce7e639b341a13a1b6c8c1c52517949772b650 +github.com/patrickmn/go-cache 1881a9bccb818787f68c52bfba648c6cf34c34fa diff --git a/vendor/github.com/patrickmn/go-cache/CONTRIBUTORS b/vendor/github.com/patrickmn/go-cache/CONTRIBUTORS new file mode 100644 index 0000000..8a4da4e --- /dev/null +++ b/vendor/github.com/patrickmn/go-cache/CONTRIBUTORS @@ -0,0 +1,8 @@ +This is a list of people who have contributed code to go-cache. They, or their +employers, are the copyright holders of the contributed code. Contributed code +is subject to the license restrictions listed in LICENSE (as they were when the +code was contributed.) + +Dustin Sallings +Jason Mooberry +Sergey Shepelev diff --git a/vendor/github.com/patrickmn/go-cache/LICENSE b/vendor/github.com/patrickmn/go-cache/LICENSE new file mode 100644 index 0000000..159e1e7 --- /dev/null +++ b/vendor/github.com/patrickmn/go-cache/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2012-2015 Patrick Mylund Nielsen and the go-cache contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/patrickmn/go-cache/README.md b/vendor/github.com/patrickmn/go-cache/README.md new file mode 100644 index 0000000..168ff7b --- /dev/null +++ b/vendor/github.com/patrickmn/go-cache/README.md @@ -0,0 +1,107 @@ +# go-cache + +go-cache is an in-memory key:value store/cache similar to memcached that is +suitable for applications running on a single machine. Its major advantage is +that, being essentially a thread-safe `map[string]interface{}` with expiration +times, it doesn't need to serialize or transmit its contents over the network. + +Any object can be stored, for a given duration or forever, and the cache can be +safely used by multiple goroutines. + +Although go-cache isn't meant to be used as a persistent datastore, the entire +cache can be saved to and loaded from a file (using `c.Items()` to retrieve the +items map to serialize, and `NewFrom()` to create a cache from a deserialized +one) to recover from downtime quickly. (See the docs for `NewFrom()` for caveats.) + +### Installation + +`go get github.com/patrickmn/go-cache` + +### Usage + +```go + import ( + "fmt" + "github.com/patrickmn/go-cache" + "time" + ) + + func main() { + + // Create a cache with a default expiration time of 5 minutes, and which + // purges expired items every 30 seconds + c := cache.New(5*time.Minute, 30*time.Second) + + // Set the value of the key "foo" to "bar", with the default expiration time + c.Set("foo", "bar", cache.DefaultExpiration) + + // Set the value of the key "baz" to 42, with no expiration time + // (the item won't be removed until it is re-set, or removed using + // c.Delete("baz") + c.Set("baz", 42, cache.NoExpiration) + + // Get the string associated with the key "foo" from the cache + foo, found := c.Get("foo") + if found { + fmt.Println(foo) + } + + // Since Go is statically typed, and cache values can be anything, type + // assertion is needed when values are being passed to functions that don't + // take arbitrary types, (i.e. interface{}). The simplest way to do this for + // values which will only be used once--e.g. for passing to another + // function--is: + foo, found := c.Get("foo") + if found { + MyFunction(foo.(string)) + } + + // This gets tedious if the value is used several times in the same function. + // You might do either of the following instead: + if x, found := c.Get("foo"); found { + foo := x.(string) + // ... + } + // or + var foo string + if x, found := c.Get("foo"); found { + foo = x.(string) + } + // ... + // foo can then be passed around freely as a string + + // Want performance? Store pointers! + c.Set("foo", &MyStruct, cache.DefaultExpiration) + if x, found := c.Get("foo"); found { + foo := x.(*MyStruct) + // ... + } + + // If you store a reference type like a pointer, slice, map or channel, you + // do not need to run Set if you modify the underlying data. The cached + // reference points to the same memory, so if you modify a struct whose + // pointer you've stored in the cache, retrieving that pointer with Get will + // point you to the same data: + foo := &MyStruct{Num: 1} + c.Set("foo", foo, cache.DefaultExpiration) + // ... + x, _ := c.Get("foo") + foo := x.(*MyStruct) + fmt.Println(foo.Num) + // ... + foo.Num++ + // ... + x, _ := c.Get("foo") + foo := x.(*MyStruct) + foo.Println(foo.Num) + + // will print: + // 1 + // 2 + + } +``` + +### Reference + +`godoc` or [http://godoc.org/github.com/patrickmn/go-cache](http://godoc.org/github.com/patrickmn/go-cache) diff --git a/vendor/github.com/patrickmn/go-cache/cache.go b/vendor/github.com/patrickmn/go-cache/cache.go new file mode 100644 index 0000000..3562543 --- /dev/null +++ b/vendor/github.com/patrickmn/go-cache/cache.go @@ -0,0 +1,1118 @@ +package cache + +import ( + "encoding/gob" + "fmt" + "io" + "os" + "runtime" + "sync" + "time" +) + +type Item struct { + Object interface{} + Expiration int64 +} + +// Returns true if the item has expired. +func (item Item) Expired() bool { + if item.Expiration == 0 { + return false + } + return time.Now().UnixNano() > item.Expiration +} + +const ( + // For use with functions that take an expiration time. + NoExpiration time.Duration = -1 + // For use with functions that take an expiration time. Equivalent to + // passing in the same expiration duration as was given to New() or + // NewFrom() when the cache was created (e.g. 5 minutes.) + DefaultExpiration time.Duration = 0 +) + +type Cache struct { + *cache + // If this is confusing, see the comment at the bottom of New() +} + +type cache struct { + defaultExpiration time.Duration + items map[string]Item + mu sync.RWMutex + onEvicted func(string, interface{}) + janitor *janitor +} + +// Add an item to the cache, replacing any existing item. If the duration is 0 +// (DefaultExpiration), the cache's default expiration time is used. If it is -1 +// (NoExpiration), the item never expires. +func (c *cache) Set(k string, x interface{}, d time.Duration) { + // "Inlining" of set + var e int64 + if d == DefaultExpiration { + d = c.defaultExpiration + } + if d > 0 { + e = time.Now().Add(d).UnixNano() + } + c.mu.Lock() + c.items[k] = Item{ + Object: x, + Expiration: e, + } + // TODO: Calls to mu.Unlock are currently not deferred because defer + // adds ~200 ns (as of go1.) + c.mu.Unlock() +} + +func (c *cache) set(k string, x interface{}, d time.Duration) { + var e int64 + if d == DefaultExpiration { + d = c.defaultExpiration + } + if d > 0 { + e = time.Now().Add(d).UnixNano() + } + c.items[k] = Item{ + Object: x, + Expiration: e, + } +} + +// Add an item to the cache only if an item doesn't already exist for the given +// key, or if the existing item has expired. Returns an error otherwise. +func (c *cache) Add(k string, x interface{}, d time.Duration) error { + c.mu.Lock() + _, found := c.get(k) + if found { + c.mu.Unlock() + return fmt.Errorf("Item %s already exists", k) + } + c.set(k, x, d) + c.mu.Unlock() + return nil +} + +// Set a new value for the cache key only if it already exists, and the existing +// item hasn't expired. Returns an error otherwise. +func (c *cache) Replace(k string, x interface{}, d time.Duration) error { + c.mu.Lock() + _, found := c.get(k) + if !found { + c.mu.Unlock() + return fmt.Errorf("Item %s doesn't exist", k) + } + c.set(k, x, d) + c.mu.Unlock() + return nil +} + +// Get an item from the cache. Returns the item or nil, and a bool indicating +// whether the key was found. +func (c *cache) Get(k string) (interface{}, bool) { + c.mu.RLock() + // "Inlining" of get and Expired + item, found := c.items[k] + if !found { + c.mu.RUnlock() + return nil, false + } + if item.Expiration > 0 { + if time.Now().UnixNano() > item.Expiration { + c.mu.RUnlock() + return nil, false + } + } + c.mu.RUnlock() + return item.Object, true +} + +func (c *cache) get(k string) (interface{}, bool) { + item, found := c.items[k] + if !found { + return nil, false + } + // "Inlining" of Expired + if item.Expiration > 0 { + if time.Now().UnixNano() > item.Expiration { + return nil, false + } + } + return item.Object, true +} + +// Increment an item of type int, int8, int16, int32, int64, uintptr, uint, +// uint8, uint32, or uint64, float32 or float64 by n. Returns an error if the +// item's value is not an integer, if it was not found, or if it is not +// possible to increment it by n. To retrieve the incremented value, use one +// of the specialized methods, e.g. IncrementInt64. +func (c *cache) Increment(k string, n int64) error { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return fmt.Errorf("Item %s not found", k) + } + switch v.Object.(type) { + case int: + v.Object = v.Object.(int) + int(n) + case int8: + v.Object = v.Object.(int8) + int8(n) + case int16: + v.Object = v.Object.(int16) + int16(n) + case int32: + v.Object = v.Object.(int32) + int32(n) + case int64: + v.Object = v.Object.(int64) + n + case uint: + v.Object = v.Object.(uint) + uint(n) + case uintptr: + v.Object = v.Object.(uintptr) + uintptr(n) + case uint8: + v.Object = v.Object.(uint8) + uint8(n) + case uint16: + v.Object = v.Object.(uint16) + uint16(n) + case uint32: + v.Object = v.Object.(uint32) + uint32(n) + case uint64: + v.Object = v.Object.(uint64) + uint64(n) + case float32: + v.Object = v.Object.(float32) + float32(n) + case float64: + v.Object = v.Object.(float64) + float64(n) + default: + c.mu.Unlock() + return fmt.Errorf("The value for %s is not an integer", k) + } + c.items[k] = v + c.mu.Unlock() + return nil +} + +// Increment an item of type float32 or float64 by n. Returns an error if the +// item's value is not floating point, if it was not found, or if it is not +// possible to increment it by n. Pass a negative number to decrement the +// value. To retrieve the incremented value, use one of the specialized methods, +// e.g. IncrementFloat64. +func (c *cache) IncrementFloat(k string, n float64) error { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return fmt.Errorf("Item %s not found", k) + } + switch v.Object.(type) { + case float32: + v.Object = v.Object.(float32) + float32(n) + case float64: + v.Object = v.Object.(float64) + n + default: + c.mu.Unlock() + return fmt.Errorf("The value for %s does not have type float32 or float64", k) + } + c.items[k] = v + c.mu.Unlock() + return nil +} + +// Increment an item of type int by n. Returns an error if the item's value is +// not an int, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementInt(k string, n int) (int, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type int8 by n. Returns an error if the item's value is +// not an int8, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementInt8(k string, n int8) (int8, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int8) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int8", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type int16 by n. Returns an error if the item's value is +// not an int16, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementInt16(k string, n int16) (int16, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int16) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int16", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type int32 by n. Returns an error if the item's value is +// not an int32, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementInt32(k string, n int32) (int32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int32", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type int64 by n. Returns an error if the item's value is +// not an int64, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementInt64(k string, n int64) (int64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int64", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uint by n. Returns an error if the item's value is +// not an uint, or if it was not found. If there is no error, the incremented +// value is returned. +func (c *cache) IncrementUint(k string, n uint) (uint, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uintptr by n. Returns an error if the item's value +// is not an uintptr, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementUintptr(k string, n uintptr) (uintptr, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uintptr) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uintptr", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uint8 by n. Returns an error if the item's value +// is not an uint8, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementUint8(k string, n uint8) (uint8, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint8) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint8", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uint16 by n. Returns an error if the item's value +// is not an uint16, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementUint16(k string, n uint16) (uint16, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint16) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint16", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uint32 by n. Returns an error if the item's value +// is not an uint32, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementUint32(k string, n uint32) (uint32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint32", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type uint64 by n. Returns an error if the item's value +// is not an uint64, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementUint64(k string, n uint64) (uint64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint64", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type float32 by n. Returns an error if the item's value +// is not an float32, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementFloat32(k string, n float32) (float32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(float32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an float32", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Increment an item of type float64 by n. Returns an error if the item's value +// is not an float64, or if it was not found. If there is no error, the +// incremented value is returned. +func (c *cache) IncrementFloat64(k string, n float64) (float64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(float64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an float64", k) + } + nv := rv + n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type int, int8, int16, int32, int64, uintptr, uint, +// uint8, uint32, or uint64, float32 or float64 by n. Returns an error if the +// item's value is not an integer, if it was not found, or if it is not +// possible to decrement it by n. To retrieve the decremented value, use one +// of the specialized methods, e.g. DecrementInt64. +func (c *cache) Decrement(k string, n int64) error { + // TODO: Implement Increment and Decrement more cleanly. + // (Cannot do Increment(k, n*-1) for uints.) + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return fmt.Errorf("Item not found") + } + switch v.Object.(type) { + case int: + v.Object = v.Object.(int) - int(n) + case int8: + v.Object = v.Object.(int8) - int8(n) + case int16: + v.Object = v.Object.(int16) - int16(n) + case int32: + v.Object = v.Object.(int32) - int32(n) + case int64: + v.Object = v.Object.(int64) - n + case uint: + v.Object = v.Object.(uint) - uint(n) + case uintptr: + v.Object = v.Object.(uintptr) - uintptr(n) + case uint8: + v.Object = v.Object.(uint8) - uint8(n) + case uint16: + v.Object = v.Object.(uint16) - uint16(n) + case uint32: + v.Object = v.Object.(uint32) - uint32(n) + case uint64: + v.Object = v.Object.(uint64) - uint64(n) + case float32: + v.Object = v.Object.(float32) - float32(n) + case float64: + v.Object = v.Object.(float64) - float64(n) + default: + c.mu.Unlock() + return fmt.Errorf("The value for %s is not an integer", k) + } + c.items[k] = v + c.mu.Unlock() + return nil +} + +// Decrement an item of type float32 or float64 by n. Returns an error if the +// item's value is not floating point, if it was not found, or if it is not +// possible to decrement it by n. Pass a negative number to decrement the +// value. To retrieve the decremented value, use one of the specialized methods, +// e.g. DecrementFloat64. +func (c *cache) DecrementFloat(k string, n float64) error { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return fmt.Errorf("Item %s not found", k) + } + switch v.Object.(type) { + case float32: + v.Object = v.Object.(float32) - float32(n) + case float64: + v.Object = v.Object.(float64) - n + default: + c.mu.Unlock() + return fmt.Errorf("The value for %s does not have type float32 or float64", k) + } + c.items[k] = v + c.mu.Unlock() + return nil +} + +// Decrement an item of type int by n. Returns an error if the item's value is +// not an int, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementInt(k string, n int) (int, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type int8 by n. Returns an error if the item's value is +// not an int8, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementInt8(k string, n int8) (int8, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int8) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int8", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type int16 by n. Returns an error if the item's value is +// not an int16, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementInt16(k string, n int16) (int16, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int16) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int16", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type int32 by n. Returns an error if the item's value is +// not an int32, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementInt32(k string, n int32) (int32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int32", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type int64 by n. Returns an error if the item's value is +// not an int64, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementInt64(k string, n int64) (int64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(int64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an int64", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uint by n. Returns an error if the item's value is +// not an uint, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementUint(k string, n uint) (uint, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uintptr by n. Returns an error if the item's value +// is not an uintptr, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementUintptr(k string, n uintptr) (uintptr, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uintptr) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uintptr", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uint8 by n. Returns an error if the item's value is +// not an uint8, or if it was not found. If there is no error, the decremented +// value is returned. +func (c *cache) DecrementUint8(k string, n uint8) (uint8, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint8) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint8", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uint16 by n. Returns an error if the item's value +// is not an uint16, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementUint16(k string, n uint16) (uint16, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint16) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint16", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uint32 by n. Returns an error if the item's value +// is not an uint32, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementUint32(k string, n uint32) (uint32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint32", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type uint64 by n. Returns an error if the item's value +// is not an uint64, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementUint64(k string, n uint64) (uint64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(uint64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an uint64", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type float32 by n. Returns an error if the item's value +// is not an float32, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementFloat32(k string, n float32) (float32, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(float32) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an float32", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Decrement an item of type float64 by n. Returns an error if the item's value +// is not an float64, or if it was not found. If there is no error, the +// decremented value is returned. +func (c *cache) DecrementFloat64(k string, n float64) (float64, error) { + c.mu.Lock() + v, found := c.items[k] + if !found || v.Expired() { + c.mu.Unlock() + return 0, fmt.Errorf("Item %s not found", k) + } + rv, ok := v.Object.(float64) + if !ok { + c.mu.Unlock() + return 0, fmt.Errorf("The value for %s is not an float64", k) + } + nv := rv - n + v.Object = nv + c.items[k] = v + c.mu.Unlock() + return nv, nil +} + +// Delete an item from the cache. Does nothing if the key is not in the cache. +func (c *cache) Delete(k string) { + c.mu.Lock() + v, evicted := c.delete(k) + c.mu.Unlock() + if evicted { + c.onEvicted(k, v) + } +} + +func (c *cache) delete(k string) (interface{}, bool) { + if c.onEvicted != nil { + if v, found := c.items[k]; found { + delete(c.items, k) + return v.Object, true + } + } + delete(c.items, k) + return nil, false +} + +type keyAndValue struct { + key string + value interface{} +} + +// Delete all expired items from the cache. +func (c *cache) DeleteExpired() { + var evictedItems []keyAndValue + now := time.Now().UnixNano() + c.mu.Lock() + for k, v := range c.items { + // "Inlining" of expired + if v.Expiration > 0 && now > v.Expiration { + ov, evicted := c.delete(k) + if evicted { + evictedItems = append(evictedItems, keyAndValue{k, ov}) + } + } + } + c.mu.Unlock() + for _, v := range evictedItems { + c.onEvicted(v.key, v.value) + } +} + +// Sets an (optional) function that is called with the key and value when an +// item is evicted from the cache. (Including when it is deleted manually, but +// not when it is overwritten.) Set to nil to disable. +func (c *cache) OnEvicted(f func(string, interface{})) { + c.mu.Lock() + c.onEvicted = f + c.mu.Unlock() +} + +// Write the cache's items (using Gob) to an io.Writer. +// +// NOTE: This method is deprecated in favor of c.Items() and NewFrom() (see the +// documentation for NewFrom().) +func (c *cache) Save(w io.Writer) (err error) { + enc := gob.NewEncoder(w) + defer func() { + if x := recover(); x != nil { + err = fmt.Errorf("Error registering item types with Gob library") + } + }() + c.mu.RLock() + defer c.mu.RUnlock() + for _, v := range c.items { + gob.Register(v.Object) + } + err = enc.Encode(&c.items) + return +} + +// Save the cache's items to the given filename, creating the file if it +// doesn't exist, and overwriting it if it does. +// +// NOTE: This method is deprecated in favor of c.Items() and NewFrom() (see the +// documentation for NewFrom().) +func (c *cache) SaveFile(fname string) error { + fp, err := os.Create(fname) + if err != nil { + return err + } + err = c.Save(fp) + if err != nil { + fp.Close() + return err + } + return fp.Close() +} + +// Add (Gob-serialized) cache items from an io.Reader, excluding any items with +// keys that already exist (and haven't expired) in the current cache. +// +// NOTE: This method is deprecated in favor of c.Items() and NewFrom() (see the +// documentation for NewFrom().) +func (c *cache) Load(r io.Reader) error { + dec := gob.NewDecoder(r) + items := map[string]Item{} + err := dec.Decode(&items) + if err == nil { + c.mu.Lock() + defer c.mu.Unlock() + for k, v := range items { + ov, found := c.items[k] + if !found || ov.Expired() { + c.items[k] = v + } + } + } + return err +} + +// Load and add cache items from the given filename, excluding any items with +// keys that already exist in the current cache. +// +// NOTE: This method is deprecated in favor of c.Items() and NewFrom() (see the +// documentation for NewFrom().) +func (c *cache) LoadFile(fname string) error { + fp, err := os.Open(fname) + if err != nil { + return err + } + err = c.Load(fp) + if err != nil { + fp.Close() + return err + } + return fp.Close() +} + +// Returns the items in the cache. This may include items that have expired, +// but have not yet been cleaned up. If this is significant, the Expiration +// fields of the items should be checked. Note that explicit synchronization +// is needed to use a cache and its corresponding Items() return value at +// the same time, as the map is shared. +func (c *cache) Items() map[string]Item { + c.mu.RLock() + defer c.mu.RUnlock() + return c.items +} + +// Returns the number of items in the cache. This may include items that have +// expired, but have not yet been cleaned up. Equivalent to len(c.Items()). +func (c *cache) ItemCount() int { + c.mu.RLock() + n := len(c.items) + c.mu.RUnlock() + return n +} + +// Delete all items from the cache. +func (c *cache) Flush() { + c.mu.Lock() + c.items = map[string]Item{} + c.mu.Unlock() +} + +type janitor struct { + Interval time.Duration + stop chan bool +} + +func (j *janitor) Run(c *cache) { + j.stop = make(chan bool) + ticker := time.NewTicker(j.Interval) + for { + select { + case <-ticker.C: + c.DeleteExpired() + case <-j.stop: + ticker.Stop() + return + } + } +} + +func stopJanitor(c *Cache) { + c.janitor.stop <- true +} + +func runJanitor(c *cache, ci time.Duration) { + j := &janitor{ + Interval: ci, + } + c.janitor = j + go j.Run(c) +} + +func newCache(de time.Duration, m map[string]Item) *cache { + if de == 0 { + de = -1 + } + c := &cache{ + defaultExpiration: de, + items: m, + } + return c +} + +func newCacheWithJanitor(de time.Duration, ci time.Duration, m map[string]Item) *Cache { + c := newCache(de, m) + // This trick ensures that the janitor goroutine (which--granted it + // was enabled--is running DeleteExpired on c forever) does not keep + // the returned C object from being garbage collected. When it is + // garbage collected, the finalizer stops the janitor goroutine, after + // which c can be collected. + C := &Cache{c} + if ci > 0 { + runJanitor(c, ci) + runtime.SetFinalizer(C, stopJanitor) + } + return C +} + +// Return a new cache with a given default expiration duration and cleanup +// interval. If the expiration duration is less than one (or NoExpiration), +// the items in the cache never expire (by default), and must be deleted +// manually. If the cleanup interval is less than one, expired items are not +// deleted from the cache before calling c.DeleteExpired(). +func New(defaultExpiration, cleanupInterval time.Duration) *Cache { + items := make(map[string]Item) + return newCacheWithJanitor(defaultExpiration, cleanupInterval, items) +} + +// Return a new cache with a given default expiration duration and cleanup +// interval. If the expiration duration is less than one (or NoExpiration), +// the items in the cache never expire (by default), and must be deleted +// manually. If the cleanup interval is less than one, expired items are not +// deleted from the cache before calling c.DeleteExpired(). +// +// NewFrom() also accepts an items map which will serve as the underlying map +// for the cache. This is useful for starting from a deserialized cache +// (serialized using e.g. gob.Encode() on c.Items()), or passing in e.g. +// make(map[string]Item, 500) to improve startup performance when the cache +// is expected to reach a certain minimum size. +// +// Only the cache's methods synchronize access to this map, so it is not +// recommended to keep any references to the map around after creating a cache. +// If need be, the map can be accessed at a later point using c.Items() (subject +// to the same caveat.) +// +// Note regarding serialization: When using e.g. gob, make sure to +// gob.Register() the individual types stored in the cache before encoding a +// map retrieved with c.Items(), and to register those same types before +// decoding a blob containing an items map. +func NewFrom(defaultExpiration, cleanupInterval time.Duration, items map[string]Item) *Cache { + return newCacheWithJanitor(defaultExpiration, cleanupInterval, items) +} diff --git a/vendor/github.com/patrickmn/go-cache/sharded.go b/vendor/github.com/patrickmn/go-cache/sharded.go new file mode 100644 index 0000000..bcc0538 --- /dev/null +++ b/vendor/github.com/patrickmn/go-cache/sharded.go @@ -0,0 +1,192 @@ +package cache + +import ( + "crypto/rand" + "math" + "math/big" + insecurerand "math/rand" + "os" + "runtime" + "time" +) + +// This is an experimental and unexported (for now) attempt at making a cache +// with better algorithmic complexity than the standard one, namely by +// preventing write locks of the entire cache when an item is added. As of the +// time of writing, the overhead of selecting buckets results in cache +// operations being about twice as slow as for the standard cache with small +// total cache sizes, and faster for larger ones. +// +// See cache_test.go for a few benchmarks. + +type unexportedShardedCache struct { + *shardedCache +} + +type shardedCache struct { + seed uint32 + m uint32 + cs []*cache + janitor *shardedJanitor +} + +// djb2 with better shuffling. 5x faster than FNV with the hash.Hash overhead. +func djb33(seed uint32, k string) uint32 { + var ( + l = uint32(len(k)) + d = 5381 + seed + l + i = uint32(0) + ) + // Why is all this 5x faster than a for loop? + if l >= 4 { + for i < l-4 { + d = (d * 33) ^ uint32(k[i]) + d = (d * 33) ^ uint32(k[i+1]) + d = (d * 33) ^ uint32(k[i+2]) + d = (d * 33) ^ uint32(k[i+3]) + i += 4 + } + } + switch l - i { + case 1: + case 2: + d = (d * 33) ^ uint32(k[i]) + case 3: + d = (d * 33) ^ uint32(k[i]) + d = (d * 33) ^ uint32(k[i+1]) + case 4: + d = (d * 33) ^ uint32(k[i]) + d = (d * 33) ^ uint32(k[i+1]) + d = (d * 33) ^ uint32(k[i+2]) + } + return d ^ (d >> 16) +} + +func (sc *shardedCache) bucket(k string) *cache { + return sc.cs[djb33(sc.seed, k)%sc.m] +} + +func (sc *shardedCache) Set(k string, x interface{}, d time.Duration) { + sc.bucket(k).Set(k, x, d) +} + +func (sc *shardedCache) Add(k string, x interface{}, d time.Duration) error { + return sc.bucket(k).Add(k, x, d) +} + +func (sc *shardedCache) Replace(k string, x interface{}, d time.Duration) error { + return sc.bucket(k).Replace(k, x, d) +} + +func (sc *shardedCache) Get(k string) (interface{}, bool) { + return sc.bucket(k).Get(k) +} + +func (sc *shardedCache) Increment(k string, n int64) error { + return sc.bucket(k).Increment(k, n) +} + +func (sc *shardedCache) IncrementFloat(k string, n float64) error { + return sc.bucket(k).IncrementFloat(k, n) +} + +func (sc *shardedCache) Decrement(k string, n int64) error { + return sc.bucket(k).Decrement(k, n) +} + +func (sc *shardedCache) Delete(k string) { + sc.bucket(k).Delete(k) +} + +func (sc *shardedCache) DeleteExpired() { + for _, v := range sc.cs { + v.DeleteExpired() + } +} + +// Returns the items in the cache. This may include items that have expired, +// but have not yet been cleaned up. If this is significant, the Expiration +// fields of the items should be checked. Note that explicit synchronization +// is needed to use a cache and its corresponding Items() return values at +// the same time, as the maps are shared. +func (sc *shardedCache) Items() []map[string]Item { + res := make([]map[string]Item, len(sc.cs)) + for i, v := range sc.cs { + res[i] = v.Items() + } + return res +} + +func (sc *shardedCache) Flush() { + for _, v := range sc.cs { + v.Flush() + } +} + +type shardedJanitor struct { + Interval time.Duration + stop chan bool +} + +func (j *shardedJanitor) Run(sc *shardedCache) { + j.stop = make(chan bool) + tick := time.Tick(j.Interval) + for { + select { + case <-tick: + sc.DeleteExpired() + case <-j.stop: + return + } + } +} + +func stopShardedJanitor(sc *unexportedShardedCache) { + sc.janitor.stop <- true +} + +func runShardedJanitor(sc *shardedCache, ci time.Duration) { + j := &shardedJanitor{ + Interval: ci, + } + sc.janitor = j + go j.Run(sc) +} + +func newShardedCache(n int, de time.Duration) *shardedCache { + max := big.NewInt(0).SetUint64(uint64(math.MaxUint32)) + rnd, err := rand.Int(rand.Reader, max) + var seed uint32 + if err != nil { + os.Stderr.Write([]byte("WARNING: go-cache's newShardedCache failed to read from the system CSPRNG (/dev/urandom or equivalent.) Your system's security may be compromised. Continuing with an insecure seed.\n")) + seed = insecurerand.Uint32() + } else { + seed = uint32(rnd.Uint64()) + } + sc := &shardedCache{ + seed: seed, + m: uint32(n), + cs: make([]*cache, n), + } + for i := 0; i < n; i++ { + c := &cache{ + defaultExpiration: de, + items: map[string]Item{}, + } + sc.cs[i] = c + } + return sc +} + +func unexportedNewSharded(defaultExpiration, cleanupInterval time.Duration, shards int) *unexportedShardedCache { + if defaultExpiration == 0 { + defaultExpiration = -1 + } + sc := newShardedCache(shards, defaultExpiration) + SC := &unexportedShardedCache{sc} + if cleanupInterval > 0 { + runShardedJanitor(sc, cleanupInterval) + runtime.SetFinalizer(SC, stopShardedJanitor) + } + return SC +} diff --git a/vendor/github.com/rancher/go-rancher-metadata/metadata/types.go b/vendor/github.com/rancher/go-rancher-metadata/metadata/types.go index 9d02d6b..628d2a9 100644 --- a/vendor/github.com/rancher/go-rancher-metadata/metadata/types.go +++ b/vendor/github.com/rancher/go-rancher-metadata/metadata/types.go @@ -38,6 +38,7 @@ type Service struct { HealthCheck HealthCheck `json:"health_check"` PrimaryServiceName string `json:"primary_service_name"` LBConfig LBConfig `json:"lb_config"` + EnvironmentUUID string `json:"environment_uuid"` } type Container struct { @@ -47,6 +48,7 @@ type Container struct { Ips []string `json:"ips"` Ports []string `json:"ports"` ServiceName string `json:"service_name"` + ServiceIndex string `json:"service_index"` StackName string `json:"stack_name"` Labels map[string]string `json:"labels"` CreateIndex int `json:"create_index"` @@ -63,6 +65,9 @@ type Container struct { HealthCheckHosts []string `json:"health_check_hosts"` NetworkFromContainerUUID string `json:"network_from_container_uuid"` NetworkUUID string `json:"network_uuid"` + Links map[string]string `json:"links"` + System bool `json:"system"` + EnvironmentUUID string `json:"environment_uuid"` } type Network struct { @@ -70,18 +75,20 @@ type Network struct { UUID string `json:"uuid"` Metadata map[string]interface{} `json:"metadata"` HostPorts bool `json:"host_ports"` + Default bool `json:"is_default"` } type Host struct { - Name string `json:"name"` - AgentIP string `json:"agent_ip"` - HostId int `json:"host_id"` - Labels map[string]string `json:"labels"` - UUID string `json:"uuid"` - Hostname string `json:"hostname"` - Memory int64 `json:"memory"` - MilliCPU int64 `json:"milli_cpu"` - LocalStorageMb int64 `json:"local_storage_mb"` + Name string `json:"name"` + AgentIP string `json:"agent_ip"` + HostId int `json:"host_id"` + Labels map[string]string `json:"labels"` + UUID string `json:"uuid"` + Hostname string `json:"hostname"` + Memory int64 `json:"memory"` + MilliCPU int64 `json:"milli_cpu"` + LocalStorageMb int64 `json:"local_storage_mb"` + EnvironmentUUID string `json:"environment_uuid"` } type PortRule struct {