Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Better mt index migrate #451

Merged
merged 5 commits into from
Jan 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions cmd/mt-index-migrate-050-to-054/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"sync"
"time"

"github.com/gocql/gocql"
"github.com/mattbaird/elastigo/lib"
schemaV0 "gopkg.in/raintank/schema.v0"
"gopkg.in/raintank/schema.v1"
)

var (
dryRun = flag.Bool("dry-run", true, "run in dry-run mode. No changes will be made.")
cassAddr = flag.String("cass-addr", "localhost", "Address of cassandra host.")
cassKeyspace = flag.String("keyspace", "raintank", "Cassandra keyspace to use.")
esAddr = flag.String("es-addr", "localhost", "address of elasticsearch host.")
esIndex = flag.String("index", "metric", "elasticsearch index that contains current metric index values.")

wg sync.WaitGroup
)

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-index-migrate-050-to054")
fmt.Fprintln(os.Stderr, "This tool converts a metrictank index to the format, using cassandra instead of elasticsearch")
fmt.Fprintln(os.Stderr, "differences:")
fmt.Fprintln(os.Stderr, " * schema 0 to schema 1 - proper metrics2.0 - (for 0.5.1, 576d8fcb47888b8a334e9a125d6aadf8e0e4d4d7)")
fmt.Fprintln(os.Stderr, " * store data in cassandra in the metric_def_ix table, as a messagepack encoded blob (cassandra idx new in 0.5.3 - 26be821bd8bead43db120e96d14d0ee88d6b6880)")
fmt.Fprintln(os.Stderr, " * use lastUpdate field (for 0.5.4, a07007ab8d4a22b122bbc5f9fadb51480e1c5b0c)")
fmt.Fprintf(os.Stderr, "\n\nFlags:\n\n")
flag.PrintDefaults()
}

flag.Parse()

defsChan := make(chan *schema.MetricDefinition, 100)

cluster := gocql.NewCluster(*cassAddr)
cluster.Consistency = gocql.ParseConsistency("one")
cluster.Timeout = time.Second
cluster.NumConns = 10
cluster.ProtoVersion = 4
cluster.Keyspace = *cassKeyspace
session, err := cluster.CreateSession()
if err != nil {
log.Fatalf("failed to create cql session. %s", err)
}
wg.Add(1)
go writeDefs(session, defsChan)

conn := elastigo.NewConn()
conn.SetHosts([]string{*esAddr})
wg.Add(1)
go getDefs(conn, defsChan)

wg.Wait()

}

func writeDefs(session *gocql.Session, defsChan chan *schema.MetricDefinition) {
defer wg.Done()
data := make([]byte, 0)
for def := range defsChan {
data = data[:0]
data, err := def.MarshalMsg(data)
if err != nil {
log.Printf("Failed to marshal metricDef. %s", err)
continue
}
if *dryRun {
fmt.Printf("INSERT INTO metric_def_idx (id, def) VALUES ('%s', '%s')\n", def.Id, data)
continue
}
success := false
attempts := 0
for !success {
if err := session.Query(`INSERT INTO metric_def_idx (id, def) VALUES (?, ?)`, def.Id, data).Exec(); err != nil {
if (attempts % 20) == 0 {
log.Printf("cassandra-idx Failed to write def to cassandra. it will be retried. %s", err)
}
sleepTime := 100 * attempts
if sleepTime > 2000 {
sleepTime = 2000
}
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
attempts++
} else {
success = true
}
}
}
log.Printf("defsWriter exiting.")
}

func getDefs(conn *elastigo.Conn, defsChan chan *schema.MetricDefinition) {
defer wg.Done()
defer close(defsChan)
var err error
var out elastigo.SearchResult
loading := true
scroll_id := ""
for loading {
if scroll_id == "" {
out, err = conn.Search(*esIndex, "metric_index", map[string]interface{}{"scroll": "1m", "size": 1000}, nil)
} else {
out, err = conn.Scroll(map[string]interface{}{"scroll": "1m"}, scroll_id)
}
if err != nil {
log.Fatalf("Failed to load metric definitions from ES. %s", err)
}
for _, h := range out.Hits.Hits {
mdef, err := schemaV0.MetricDefinitionFromJSON(*h.Source)
if err != nil {
log.Printf("Error: Bad definition in index. %v - %s", h.Source, err)
continue
}
newDef := &schema.MetricDefinition{
Id: mdef.Id,
OrgId: mdef.OrgId,
Name: mdef.Name,
Metric: mdef.Metric,
Interval: mdef.Interval,
Unit: mdef.Unit,
Mtype: mdef.TargetType,
Tags: mdef.Tags,
LastUpdate: mdef.LastUpdate,
}
defsChan <- newDef
}

scroll_id = out.ScrollId
if out.Hits.Len() == 0 {
loading = false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"os"
"sync"
"time"

Expand All @@ -14,17 +15,27 @@ import (
)

var (
dryRun = flag.Bool("dry-run", true, "run in dry-run mode. No changes will be made.")
logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL")
cassAddr = flag.String("cass-addr", "localhost", "Address of cassandra host.")
keyspace = flag.String("keyspace", "raintank", "Cassandra keyspace to use.")
partitionBy = flag.String("partition-by", "byOrg", "method used for paritioning metrics. (byOrg|bySeries)")
numPartitions = flag.Int("num-partitions", 1, "number of partitions in cluster")
dryRun = flag.Bool("dry-run", true, "run in dry-run mode. No changes will be made.")
logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL")
cassAddr = flag.String("cass-addr", "localhost", "Address of cassandra host.")
keyspace = flag.String("keyspace", "raintank", "Cassandra keyspace to use.")
partitionScheme = flag.String("partition-scheme", "byOrg", "method used for paritioning metrics. (byOrg|bySeries)")
numPartitions = flag.Int("num-partitions", 1, "number of partitions in cluster")

wg sync.WaitGroup
)

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-index-migrate-06-to-07")
fmt.Fprintln(os.Stderr, "This tool converts a metrictank v0.6 index to the v0.7 format")
fmt.Fprintln(os.Stderr, "differences:")
fmt.Fprintln(os.Stderr, " * cassandra table: metric_def_idx -> metric_idx")
fmt.Fprintln(os.Stderr, " * data is stored as individual fields, not as messagepack encoded blob")
fmt.Fprintln(os.Stderr, " * support for partitioning. the partition field in cassandra will be set based on num-partitions and partition-schema")
fmt.Fprintf(os.Stderr, "\n\nFlags:\n\n")
flag.PrintDefaults()
}
flag.Parse()
log.NewLogger(0, "console", fmt.Sprintf(`{"level": %d, "formatting":false}`, *logLevel))

Expand Down Expand Up @@ -117,7 +128,7 @@ func getDefs(session *gocql.Session, defsChan chan *schema.MetricDefinition) {
log.Info("starting read thread")
defer wg.Done()
defer close(defsChan)
partitioner, err := cluster.NewKafkaPartitioner(*partitionBy)
partitioner, err := cluster.NewKafkaPartitioner(*partitionScheme)
if err != nil {
log.Fatal(4, "failed to initialize partitioner. %s", err)
}
Expand Down
36 changes: 36 additions & 0 deletions vendor/gopkg.in/raintank/schema.v0/event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading