Skip to content

Commit

Permalink
Merge #69431
Browse files Browse the repository at this point in the history
69431: roachtest: move kafka logs into collected log dir r=stevendanna a=stevendanna

Fixes #69155

Release justification: Non-production code change
Release note: None

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
craig[bot] and stevendanna committed Aug 27, 2021
2 parents 51b4726 + 494ee1b commit 0fa29ad
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,24 +1346,51 @@ func (k kafkaManager) start(ctx context.Context, services ...string) {
k.restart(ctx, services...)
}

func (k kafkaManager) restart(ctx context.Context, services ...string) {
var startArgs string
if len(services) == 0 {
startArgs = "schema-registry"
var kafkaServices = map[string][]string{
"zookeeper": {"zookeeper"},
"kafka": {"zookeeper", "kafka"},
"schema-registry": {"zookeeper", "kafka", "schema-registry"},
}

func (k kafkaManager) kafkaServicesForTargets(targets []string) []string {
var services []string
for _, tgt := range targets {
if s, ok := kafkaServices[tgt]; ok {
services = append(services, s...)
} else {
k.t.Fatalf("unknown kafka start target %q", tgt)
}
}
return services
}

func (k kafkaManager) restart(ctx context.Context, targetServices ...string) {
var services []string
if len(targetServices) == 0 {
services = kafkaServices["schema-registry"]
} else {
startArgs = strings.Join(services, " ")
services = k.kafkaServicesForTargets(targetServices)
}

k.c.Run(ctx, k.nodes, "touch", k.serverJAASConfig())
for _, svcName := range services {
// The confluent tool applies the KAFKA_OPTS to all
// services. Also, the kafka.logs.dir is used by each
// service, despite the name.
opts := fmt.Sprintf("-Djava.security.auth.login.config=%s -Dkafka.logs.dir=%s",
k.serverJAASConfig(),
fmt.Sprintf("logs/%s", svcName),
)
startCmd := fmt.Sprintf(
"CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS='%s' %s local services %s start",
k.basePath(),
k.confluentHome(),
opts,
k.confluentBin(),
svcName)
k.c.Run(ctx, k.nodes, startCmd)
}

startCmd := fmt.Sprintf(
"CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS=-Djava.security.auth.login.config=%s %s local services %s start",
k.basePath(),
k.confluentHome(),
k.serverJAASConfig(),
k.confluentBin(),
startArgs)
k.c.Run(ctx, k.nodes, startCmd)
}

func (k kafkaManager) makeCommand(exe string, args ...string) string {
Expand Down

0 comments on commit 0fa29ad

Please sign in to comment.