Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Revert "Cache results in ReadDestination and ReadConsumerGroup metada…
Browse files Browse the repository at this point in the history
…ta calls (#337)"

This reverts commit 371b784.
  • Loading branch information
Xu Ning committed Dec 6, 2017
1 parent 8390176 commit 7a760c9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 142 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ cherami-server
cherami-replicator-server
cdb
cherami-store-tool
cmq
124 changes: 16 additions & 108 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/cache"
"github.com/uber/cherami-server/common/configure"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
Expand Down Expand Up @@ -178,10 +177,6 @@ const cassandraProtoVersion = 4
const deleteExtentTTLSeconds = int64(time.Hour*24) / int64(time.Second)
const defaultDeleteTTLSeconds = int64(time.Hour*24*30) / int64(time.Second)

const destinationCacheSize = 1048576
const consumerGroupCacheSize = 1048576
const cacheTTL = time.Second

// CassandraMetadataService Implements TChanMetadataServiceClient interface
// TODO: Convert all errors to the ones defined in the thrift API.
type CassandraMetadataService struct {
Expand All @@ -191,9 +186,6 @@ type CassandraMetadataService struct {
highConsLevel gocql.Consistency // Strongest cons level that can be used for this session
clusterName string
log bark.Logger

destinationCache cache.Cache
consumerGroupCache cache.Cache
}

// interface implementation check
Expand Down Expand Up @@ -286,9 +278,6 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo
highConsLevel: highCons,
clusterName: clusterName,
log: log.WithField(common.TagModule, `metadata`),

destinationCache: cache.New(destinationCacheSize, &cache.Options{TTL: cacheTTL}),
consumerGroupCache: cache.New(consumerGroupCacheSize, &cache.Options{TTL: cacheTTL}),
}, nil
}

Expand Down Expand Up @@ -660,25 +649,6 @@ func unmarshalDstZoneConfigs(configsData []map[string]interface{}) []*shared.Des
return configs
}

type readDestinationResponse struct {
result *shared.DestinationDescription
err error
}

func (s *CassandraMetadataService) cacheReadDestinationResponse(key string, result *shared.DestinationDescription,
err error) (*shared.DestinationDescription, error) {
item := &readDestinationResponse{
result: result,
err: err,
}

if key != "" {
s.destinationCache.Put(key, item)
}

return result, err
}

// ReadDestination implements the corresponding TChanMetadataServiceClient API
// Either path or destinationUUID can be specified.
// Deleted destinations are returned with DELETED status only when destinationUUID is used.
Expand All @@ -699,18 +669,6 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
}
}

var key string
if getRequest.Path != nil {
key = getRequest.GetPath()
} else {
key = getRequest.GetDestinationUUID()
}

cached := s.destinationCache.Get(key)
if cached != nil {
return cached.(*readDestinationResponse).result, cached.(*readDestinationResponse).err
}

result = getUtilDestinationDescription()
var zoneConfigsData []map[string]interface{}
if getRequest.Path != nil {
Expand Down Expand Up @@ -764,20 +722,20 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
dest = getRequest.GetDestinationUUID()
}

return s.cacheReadDestinationResponse(key, nil, &shared.EntityNotExistsError{
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("Destination %s does not exist", dest),
})
}
}

return s.cacheReadDestinationResponse(key, nil, &shared.InternalServiceError{
return nil, &shared.InternalServiceError{
Message: err.Error(),
})
}
}

*result.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*result.DLQPurgeBefore))
*result.DLQMergeBefore = int64(cqlTimestampToUnixNano(*result.DLQMergeBefore))

return s.cacheReadDestinationResponse(key, result, nil)
return result, nil
}

// UpdateDestination implements the corresponding TChanMetadataServiceClient API
Expand Down Expand Up @@ -898,9 +856,6 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
existing.ZoneConfigs = updateRequest.GetZoneConfigs()
}
existing.IsMultiZone = common.BoolPtr(isMultiZone)

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(updateRequest.GetDestinationUUID())
return existing, nil
}

Expand Down Expand Up @@ -954,9 +909,6 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR
opsDelete,
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -1022,8 +974,6 @@ func (s *CassandraMetadataService) DeleteDestinationUUID(ctx thrift.Context, del
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -1535,34 +1485,8 @@ func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName st
return dlqDestDesc, err
}

type readConsumerGroupResponse struct {
result *shared.ConsumerGroupDescription
err error
}

func (s *CassandraMetadataService) cacheReadConsumerGroupResponse(key string, result *shared.ConsumerGroupDescription,
err error) (*shared.ConsumerGroupDescription, error) {
item := &readConsumerGroupResponse{
result: result,
err: err,
}

if key != "" {
s.consumerGroupCache.Put(key, item)
}

return result, err
}

func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) {
result := getUtilConsumerGroupDescription()

key := dstUUID + cgName
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err
}

var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByName, dstUUID, cgName).Consistency(s.lowConsLevel)
if err := query.Scan(
Expand All @@ -1582,18 +1506,17 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID),
})
}
}

return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{
return nil, &shared.InternalServiceError{
Message: err.Error(),
})
}
}

result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
return s.cacheReadConsumerGroupResponse(key, result, nil)
return result, nil
}

// ReadConsumerGroup returns the ConsumerGroupDescription for the [destinationPath, groupName].
Expand Down Expand Up @@ -1635,12 +1558,6 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"}
}

key := request.GetConsumerGroupUUID()
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err
}

result := getUtilConsumerGroupDescription()
var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByUUID, request.GetConsumerGroupUUID()).Consistency(s.lowConsLevel)
Expand All @@ -1661,18 +1578,18 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID),
})
}
}

return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{
return nil, &shared.InternalServiceError{
Message: err.Error(),
})
}
}

result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
return s.cacheReadConsumerGroupResponse(key, result, nil)

return result, nil
}

func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *shared.ConsumerGroupDescription) bool {
Expand Down Expand Up @@ -1826,9 +1743,6 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(newCG.GetDestinationUUID() + newCG.GetConsumerGroupName())
s.consumerGroupCache.Delete(newCG.GetConsumerGroupUUID())

return newCG, nil
}

Expand Down Expand Up @@ -1957,10 +1871,6 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(existingCG.GetDestinationUUID() + existingCG.GetConsumerGroupName())
s.consumerGroupCache.Delete(existingCG.GetConsumerGroupUUID())
s.destinationCache.Delete(dlqDstID)

return nil
}

Expand Down Expand Up @@ -2024,8 +1934,6 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(existing.GetDestinationUUID() + existing.GetConsumerGroupName())
s.consumerGroupCache.Delete(existing.GetConsumerGroupUUID())
return nil
}

Expand Down
56 changes: 23 additions & 33 deletions test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ import (

type (
testBase struct {
Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
auth configure.Authentication

Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
}
Expand Down Expand Up @@ -161,17 +159,23 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
tb.keyspace = "integration_test"
tb.Assertions = require.New(tb.T())

tb.auth = configure.Authentication{
auth := configure.Authentication{
Enabled: true,
Username: "cassandra",
Password: "cassandra",
}

// create the keyspace first
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, tb.auth)
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, auth)
tb.NoError(err)

tb.mClient = tb.GetNewMetadataClient()
tb.mClient, _ = metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: auth,
}, nil)
tb.NotNil(tb.mClient)

// Drop the keyspace, if it exists. This preserves the keyspace for inspection if the test fails, and simplifies cleanup
Expand All @@ -194,18 +198,6 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
cassConfig.SetRefreshInterval(10 * time.Millisecond)
}

func (tb *testBase) GetNewMetadataClient() *metadata.CassandraMetadataService {
s, _ := metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: tb.auth,
}, nil)

return s
}

func (tb *testBase) TearDownSuite() {
}

Expand All @@ -216,8 +208,6 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
tb.storageBaseDir, err = ioutil.TempDir("", "cherami_integration_test_")
tb.NoError(err)

tb.mClient = tb.GetNewMetadataClient()
tb.NotNil(tb.mClient)
tb.UUIDResolver = common.NewUUIDResolver(tb.mClient)
hwInfoReader := common.NewHostHardwareInfoReader(tb.mClient)

Expand All @@ -242,7 +232,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName)
sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("store ringHosts: %v", cfg.GetRingHosts())
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.GetNewMetadataClient(), storehostOpts)
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.mClient, storehostOpts)
sh.Start(tc)

// start websocket server
Expand All @@ -266,7 +256,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName)
sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("input ringHosts: %v", cfg.GetRingHosts())
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.GetNewMetadataClient(), nil)
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.mClient, nil)
ih.Start(tc)
// start websocket server
common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), ih)
Expand All @@ -282,7 +272,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {

sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("front ringHosts: %v", cfg.GetRingHosts())
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.GetNewMetadataClient(), cfgMap[common.FrontendServiceName][i])
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.mClient, cfgMap[common.FrontendServiceName][i])
fh.Start(tc)
tb.Frontends[hostID] = fh
frontendForOut = fh
Expand All @@ -298,7 +288,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
oh, tc := outputhost.NewOutputHost(
common.OutputServiceName,
sCommon,
tb.GetNewMetadataClient(),
tb.mClient,
frontendForOut,
nil,
cfgMap[common.OutputServiceName][i].GetKafkaConfig(),
Expand All @@ -318,7 +308,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
reporter := common.NewTestMetricsReporter()
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName)
sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.GetNewMetadataClient(), common.NewDummyZoneFailoverManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager())
ch.Start(tc)
tb.Controllers[hostID] = ch
}
Expand Down

0 comments on commit 7a760c9

Please sign in to comment.