Skip to content

Commit

Permalink
Support index cleaner for rollover indices and add integration tests (#…
Browse files Browse the repository at this point in the history
…1689)

* Support index cleaning for rollover

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Simplify regexes

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Jul 26, 2019
1 parent 7f1daf2 commit 5cd5752
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 12 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ storage-integration-test: go-gen
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) $(STORAGE_PKGS) | $(COLORIZE)"

.PHONY: index-cleaner-integration-test
index-cleaner-integration-test: docker-images-elastic
# Expire tests results for storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -tags index_cleaner $(STORAGE_PKGS) | $(COLORIZE)"

all-pkgs:
@echo $(ALL_PKGS) | tr ' ' '\n' | sort

Expand Down
28 changes: 22 additions & 6 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def main():
print('HOSTNAME ... specifies which Elasticsearch hosts URL to search and delete indices from.')
print('TIMEOUT ... number of seconds to wait for master node response.'.format(TIMEOUT))
print('INDEX_PREFIX ... specifies index prefix.')
print('ARCHIVE ... specifies whether to remove archive indices (default false).')
print('ARCHIVE ... specifies whether to remove archive indices (only works for rollover) (default false).')
print('ROLLOVER ... specifies whether to remove indices created by rollover (default false).')
print('ES_USERNAME ... The username required by Elasticsearch.')
print('ES_PASSWORD ... The password required by Elasticsearch.')
print('ES_TLS ... enable TLS (default false).')
Expand Down Expand Up @@ -44,9 +45,12 @@ def main():
prefix += '-'

if str2bool(os.getenv("ARCHIVE", 'false')):
filter_archive_indices(ilo, prefix)
filter_archive_indices_rollover(ilo, prefix)
else:
filter_main_indices(ilo, prefix)
if str2bool(os.getenv("ROLLOVER", 'false')):
filter_main_indices_rollover(ilo, prefix)
else:
filter_main_indices(ilo, prefix)

empty_list(ilo, 'No indices to delete')

Expand All @@ -58,17 +62,29 @@ def main():


def filter_main_indices(ilo, prefix):
ilo.filter_by_regex(kind='prefix', value=prefix + "jaeger")
ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-\d{4}-\d{2}-\d{2}")
empty_list(ilo, "No indices to delete")
# This excludes archive index as we use source='name'
# source `creation_date` would include archive index
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))


def filter_archive_indices(ilo, prefix):
# Remove only archive indices when aliases are used
def filter_main_indices_rollover(ilo, prefix):
ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service)-\d{6}")
empty_list(ilo, "No indices to delete")
# do not remove active write indices
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True)
empty_list(ilo, "No indices to delete")
ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True)
empty_list(ilo, "No indices to delete")
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1]))


def filter_archive_indices_rollover(ilo, prefix):
# Remove only rollover archive indices
# Do not remove active write archive index
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True)
empty_list(ilo, "No indices to delete")
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read'])
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1]))

Expand Down
3 changes: 0 additions & 3 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const (
queryPort = "9200"
queryHostPort = host + ":" + queryPort
queryURL = "http://" + queryHostPort
username = "elastic" // the elasticsearch default username
password = "changeme" // the elasticsearch default password
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
Expand All @@ -59,7 +57,6 @@ type ESStorageIntegration struct {
func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetBasicAuth(username, password),
elastic.SetSniff(false))
if err != nil {
return err
Expand Down
218 changes: 218 additions & 0 deletions plugin/storage/integration/es_index_cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build index_cleaner

package integration

import (
"context"
"fmt"
"os/exec"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/olivere/elastic.v5"
)

const (
archiveIndexName = "jaeger-span-archive"
dependenciesIndexName = "jaeger-dependencies-2019-01-01"
spanIndexName = "jaeger-span-2019-01-01"
serviceIndexName = "jaeger-service-2019-01-01"
indexCleanerImage = "jaegertracing/jaeger-es-index-cleaner:latest"
rolloverImage = "jaegertracing/jaeger-es-rollover:latest"
rolloverNowEnvVar = "CONDITIONS='{\"max_age\":\"0s\"}'"
)

func TestIndexCleaner_doNotFailOnEmptyStorage(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)
_, err = client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)

tests := []struct{
envs []string
}{
{envs:[]string{"ROLLOVER=false"}},
{envs:[]string{"ROLLOVER=true"}},
{envs:[]string{"ARCHIVE=true"}},
}
for _, test := range tests {
err := runEsCleaner(7, test.envs)
require.NoError(t, err)
}
}

func TestIndexCleaner_doNotFailOnFullStorage(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)
tests := []struct{
envs []string
}{
{envs:[]string{"ROLLOVER=false"}},
{envs:[]string{"ROLLOVER=true"}},
{envs:[]string{"ARCHIVE=true"}},
}
for _, test := range tests {
_, err = client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
err := createAllIndices(client, "")
require.NoError(t, err)
err = runEsCleaner(1500, test.envs)
require.NoError(t, err)
}
}

func TestIndexCleaner(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)

tests := []struct{
name string
envVars []string
expectedIndices []string
}{
{
name: "RemoveDailyIndices",
envVars: []string{},
expectedIndices: []string{
archiveIndexName,
"jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000001", "jaeger-span-archive-000002",
},
},
{
name: "RemoveRolloverIndices",
envVars: []string{"ROLLOVER=true"},
expectedIndices: []string{
archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName,
"jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000001", "jaeger-span-archive-000002",
},
},
{
name: "RemoveArchiveIndices",
envVars: []string{"ARCHIVE=true"},
expectedIndices: []string{
archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName,
"jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000002",
},
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%s_no_prefix", test.name), func(t *testing.T) {
runIndexCleanerTest(t, client, "", test.expectedIndices, test.envVars)
})
t.Run(fmt.Sprintf("%s_prefix", test.name), func(t *testing.T) {
runIndexCleanerTest(t, client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix))
})
}
}

func runIndexCleanerTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string) {
// make sure ES is clean
_, err := client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)

err = createAllIndices(client, prefix)
require.NoError(t, err)
err = runEsCleaner(0, envVars)
require.NoError(t, err)

indices, err := client.IndexNames()
require.NoError(t, err)
if prefix != "" {
prefix = prefix + "-"
}
var expected []string
for _, index := range expectedIndices {
expected = append(expected, prefix + index)
}
assert.ElementsMatch(t, indices, expected, fmt.Sprintf("indices found: %v, expected: %v", indices, expected))
}

func createAllIndices(client *elastic.Client, prefix string) error {
prefixWithSeparator := prefix
if prefix != "" {
prefixWithSeparator = prefixWithSeparator + "-"
}
// create daily indices and archive index
err := createEsIndices(client, []string{
prefixWithSeparator + spanIndexName, prefixWithSeparator + serviceIndexName,
prefixWithSeparator + dependenciesIndexName, prefixWithSeparator + archiveIndexName})
if err != nil {
return err
}
// create rollover archive index and roll alias to the new index
err = runEsRollover("init", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix})
if err != nil {
return err
}
err = runEsRollover("rollover", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar})
if err != nil {
return err
}
// create rollover main indices and roll over to the new index
err = runEsRollover("init", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix})
if err != nil {
return err
}
err = runEsRollover("rollover", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar})
if err != nil {
return err
}
return nil
}

func createEsIndices(client *elastic.Client, indices []string) error {
for _, index := range indices {
if _, err := client.CreateIndex(index).Do(context.Background()); err != nil {
return err
}
}
return nil
}

func runEsCleaner(days int, envs []string) error {
var dockerEnv string
for _, e := range envs {
dockerEnv += fmt.Sprintf(" -e %s", e)
}
args := fmt.Sprintf("sudo docker run %s --net=host %s %d http://%s", dockerEnv, indexCleanerImage, days, queryHostPort)
cmd := exec.Command("/bin/sh", "-c", args)
out, err := cmd.CombinedOutput()
fmt.Println(string(out))
return err
}

func runEsRollover(action string, envs []string) error {
var dockerEnv string
for _, e := range envs {
dockerEnv += fmt.Sprintf(" -e %s", e)
}
args := fmt.Sprintf("sudo docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort)
cmd := exec.Command("/bin/sh", "-c", args)
out, err := cmd.CombinedOutput()
fmt.Println(string(out))
return err
}

func createESClient() (*elastic.Client, error) {
return elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
}
8 changes: 5 additions & 3 deletions scripts/travis/es-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
set -e

docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.12
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" docker.elastic.co/elasticsearch/elasticsearch:5.6.12)
export STORAGE=elasticsearch
make storage-integration-test
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.12)

STORAGE=elasticsearch make storage-integration-test
make index-cleaner-integration-test

docker kill $CID

0 comments on commit 5cd5752

Please sign in to comment.