diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 79765e77f4..3f5687fe25 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -356,6 +356,7 @@ func (a *AggMetric) persist(pos int) { chunk: chunk, ttl: a.ttl, timestamp: time.Now(), + len: a.ChunkSpan, } // if we recently became the primary, there may be older chunks @@ -375,6 +376,7 @@ func (a *AggMetric) persist(pos int) { chunk: previousChunk, ttl: a.ttl, timestamp: time.Now(), + len: a.ChunkSpan, }) previousPos-- if previousPos < 0 { diff --git a/mdata/chunk/format.go b/mdata/chunk/format.go index 90bfe928ea..674f9a971a 100644 --- a/mdata/chunk/format.go +++ b/mdata/chunk/format.go @@ -6,8 +6,10 @@ package chunk //go:generate stringer -type=Format type Format uint8 +type Length uint8 // identifier of message format const ( FormatStandardGoTsz Format = iota + FormatWithLen ) diff --git a/mdata/cwr.go b/mdata/cwr.go index ffd6e8d655..9dafb7aa57 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -20,4 +20,5 @@ type ChunkWriteRequest struct { chunk *chunk.Chunk ttl uint32 timestamp time.Time + len uint32 } diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 67365847f0..88effb2656 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -22,6 +22,7 @@ import ( // write aggregated data to cassandra. const Month_sec = 60 * 60 * 24 * 28 +const LengthPrecision = 10 * 60 // 10min const keyspace_schema = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} AND durable_writes = true` const table_schema = `CREATE TABLE IF NOT EXISTS %s.metric ( @@ -204,9 +205,10 @@ func (c *cassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter data := cwr.chunk.Series.Bytes() chunkSizeAtSave.Value(int64(len(data))) - version := chunk.FormatStandardGoTsz + version := chunk.FormatWithLen buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, uint8(version)) + binary.Write(buf, binary.LittleEndian, chunk.Format(version)) + binary.Write(buf, binary.LittleEndian, chunk.Length(cwr.len/LengthPrecision)) buf.Write(data) success := false attempts := 0 @@ -359,11 +361,17 @@ func (c *cassandraStore) Search(key string, start, end uint32) ([]iter.Iter, err log.Error(3, errChunkTooSmall.Error()) return iters, errChunkTooSmall } - if chunk.Format(b[0]) != chunk.FormatStandardGoTsz { + switch chunk.Format(b[0]) { + case chunk.FormatStandardGoTsz: + b = b[1:] + case chunk.FormatWithLen: + _ = chunk.Length(b[1]) // this is the chunk length in 10min intervals + b = b[2:] + default: log.Error(3, errUnknownChunkFormat.Error()) return iters, errUnknownChunkFormat } - it, err := tsz.NewIterator(b[1:]) + it, err := tsz.NewIterator(b) if err != nil { log.Error(3, "failed to unpack cassandra payload. %s", err) return iters, err diff --git a/metrictank.go b/metrictank.go index cabb9c45d0..dca87dad62 100644 --- a/metrictank.go +++ b/metrictank.go @@ -250,6 +250,14 @@ func main() { if (mdata.Month_sec % chunkSpan) != 0 { log.Fatal(4, "chunkSpan must fit without remainders into month_sec (28*24*60*60)") } + // 153600 is the max because we use 8bit per chunk to store the length with precision of 10min + // 2 ^ 8 * 10 * 60 = 153600 + if chunkSpan > (2^8)*mdata.LengthPrecision { + log.Fatal(4, "chunkSpan can not be more than 153600 (2^8*10*60)") + } + if (chunkSpan % mdata.LengthPrecision) != 0 { + log.Fatal(4, "chunkSpan must be a multiple of 10min (600)") + } set := strings.Split(*aggSettings, ",") finalSettings := make([]mdata.AggSetting, 0) @@ -269,6 +277,12 @@ func main() { if (mdata.Month_sec % aggChunkSpan) != 0 { log.Fatal(4, "aggChunkSpan must fit without remainders into month_sec (28*24*60*60)") } + if aggChunkSpan > (2^8)*mdata.LengthPrecision { + log.Fatal(4, "aggChunkSpan can not be more than 153600 (2^8*10*60)") + } + if (aggChunkSpan % mdata.LengthPrecision) != 0 { + log.Fatal(4, "aggChunkSpan must be a multiple of 10min (600)") + } highestChunkSpan = util.Max(highestChunkSpan, aggChunkSpan) ready := true if len(fields) == 5 {