Skip to content

Commit

Permalink
fix: handle task failures (#156)
Browse files Browse the repository at this point in the history
* fix: handle task failures

* fix: remove debug logs
  • Loading branch information
lakhansamani authored Nov 30, 2020
1 parent 4b8fd83 commit 1ba2cf1
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 11 deletions.
60 changes: 49 additions & 11 deletions plugins/reindexer/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strconv"
Expand Down Expand Up @@ -52,6 +54,15 @@ func postReIndex(ctx context.Context, sourceIndex, newIndexName string, operatio
return nil
}

func postReIndexFailure(ctx context.Context, newIndexName string) error {
_, err := util.GetClient7().DeleteIndex(newIndexName).Do(ctx)
if err != nil {
log.Errorln(logTag, "error deleting index", err)
return err;
}
return nil
}

// Reindex Inplace: https://www.elastic.co/guide/en/elasticsearch/reference/current/reindex-upgrade-inplace.html
//
// 1. Create a new index and copy the mappings and settings from the old index.
Expand Down Expand Up @@ -523,16 +534,33 @@ func getIndexSize(ctx context.Context, indexName string) (int64, error) {
}

func isTaskCompleted(ctx context.Context, taskID string) (bool, error) {
res := false
isCompleted := false
url := util.GetESURL() + "/_tasks/" + taskID

status, err := util.GetClient7().TasksGetTask().TaskId(taskID).Do(ctx)

response, err := http.Get(url)
if err != nil {
log.Errorln(logTag, " Get task status error", err)
return res, err
return isCompleted, err
}

res = status.Completed
return res, nil
defer response.Body.Close()

body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Errorln(logTag, " error reading json data", err)
return isCompleted, err
}

var data TaskResponseStruct
json.Unmarshal(body, &data)
isCompleted = data.Completed

if (isCompleted && len(data.Response.Failures) > 0) {
log.Errorln(logTag, "error re indexing data", data.Response.Failures[0])
return isCompleted, errors.New(data.Response.Failures[0].Cause.Reason)
}
return isCompleted, nil
}

// go routine to track async re-indexing process for a given source and destination index.
Expand All @@ -542,22 +570,32 @@ func asyncReIndex(taskID, source, destination string, operation ReIndexOperation
isCompleted := make(chan bool, 1)
ticker := time.Tick(30 * time.Second)
ctx := context.Background()
hasError := false;

for {
select {
case <-ticker:
ok, _ := isTaskCompleted(ctx, taskID)
log.Println(logTag, " "+taskID+" task is still re-indexing data...")
ok, err := isTaskCompleted(ctx, taskID)
if err != nil {
hasError = true
}

if ok {
isCompleted <- true
} else {
log.Println(logTag, " "+taskID+" task is still re-indexing data...")
}
case <-isCompleted:
log.Println(logTag, taskID+" task completed successfully")
// remove process from current cache
RemoveCurrentProcess(taskID)
err := postReIndex(ctx, source, destination, operation, replicas)
if err != nil {
log.Errorln(logTag, " post re-indexing error: ", err)
if (!hasError) {
log.Println(logTag, taskID+" task completed successfully")
err := postReIndex(ctx, source, destination, operation, replicas)
if err != nil {
log.Errorln(logTag, " post re-indexing error: ", err)
}
} else {
postReIndexFailure(ctx, destination)
}
return
}
Expand Down
23 changes: 23 additions & 0 deletions plugins/reindexer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ type AliasedIndices struct {
PriStoreSize string `json:"pri.store.size"`
}

type TaskResponseFailure struct {
Index string `json:"index"`
Type string `json:"type"`
ID string `json:"id"`
Status int32 `json:"status"`
Cause struct {
Type string `json:"type"`
Reason string `json:"reason"`
CausedBy struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"caused_by"`
} `json:"cause"`
}


type TaskResponseStruct struct {
Completed bool `json:"completed"`
Response struct {
Failures []TaskResponseFailure `json:"failures"`
} `json:"response"`
}

// CurrentlyReIndexingProcess map of taskID [source, destinations] indexes for which indexing process is going on
var CurrentlyReIndexingProcess = make(map[string][]string)

Expand Down

0 comments on commit 1ba2cf1

Please sign in to comment.