Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Introduces a second chunk format to include length
Browse files Browse the repository at this point in the history
By default all new chunks will be written in the new format. This means
the second byte of the binary data is a uint8 that specifies how many
10min intervals this chunk is long.

We only specify with a precision of 10min to require less space to
define the length.

If a configured chunk span (or aggmetric chunk span) is not dividable
by 10min we error at startup, same if it exceeds the maximum of
2^8*10min.
  • Loading branch information
replay committed Dec 12, 2016
1 parent 2ea5313 commit e9d48f5
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 4 deletions.
2 changes: 2 additions & 0 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (a *AggMetric) persist(pos int) {
chunk: chunk,
ttl: a.ttl,
timestamp: time.Now(),
len: a.ChunkSpan,
}

// if we recently became the primary, there may be older chunks
Expand All @@ -375,6 +376,7 @@ func (a *AggMetric) persist(pos int) {
chunk: previousChunk,
ttl: a.ttl,
timestamp: time.Now(),
len: a.ChunkSpan,
})
previousPos--
if previousPos < 0 {
Expand Down
2 changes: 2 additions & 0 deletions mdata/chunk/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package chunk
//go:generate stringer -type=Format

type Format uint8
type Length uint8

// identifier of message format
const (
FormatStandardGoTsz Format = iota
FormatWithLen
)
1 change: 1 addition & 0 deletions mdata/cwr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type ChunkWriteRequest struct {
chunk *chunk.Chunk
ttl uint32
timestamp time.Time
len uint32
}
16 changes: 12 additions & 4 deletions mdata/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// write aggregated data to cassandra.

const Month_sec = 60 * 60 * 24 * 28
const LengthPrecision = 10 * 60 // 10min

const keyspace_schema = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} AND durable_writes = true`
const table_schema = `CREATE TABLE IF NOT EXISTS %s.metric (
Expand Down Expand Up @@ -204,9 +205,10 @@ func (c *cassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter

data := cwr.chunk.Series.Bytes()
chunkSizeAtSave.Value(int64(len(data)))
version := chunk.FormatStandardGoTsz
version := chunk.FormatWithLen
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, uint8(version))
binary.Write(buf, binary.LittleEndian, chunk.Format(version))
binary.Write(buf, binary.LittleEndian, chunk.Length(cwr.len/LengthPrecision))
buf.Write(data)
success := false
attempts := 0
Expand Down Expand Up @@ -359,11 +361,17 @@ func (c *cassandraStore) Search(key string, start, end uint32) ([]iter.Iter, err
log.Error(3, errChunkTooSmall.Error())
return iters, errChunkTooSmall
}
if chunk.Format(b[0]) != chunk.FormatStandardGoTsz {
switch chunk.Format(b[0]) {
case chunk.FormatStandardGoTsz:
b = b[1:]
case chunk.FormatWithLen:
_ = chunk.Length(b[1]) // this is the chunk length in 10min intervals
b = b[2:]
default:
log.Error(3, errUnknownChunkFormat.Error())
return iters, errUnknownChunkFormat
}
it, err := tsz.NewIterator(b[1:])
it, err := tsz.NewIterator(b)
if err != nil {
log.Error(3, "failed to unpack cassandra payload. %s", err)
return iters, err
Expand Down
14 changes: 14 additions & 0 deletions metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ func main() {
if (mdata.Month_sec % chunkSpan) != 0 {
log.Fatal(4, "chunkSpan must fit without remainders into month_sec (28*24*60*60)")
}
// 153600 is the max because we use 8bit per chunk to store the length with precision of 10min
// 2 ^ 8 * 10 * 60 = 153600
if chunkSpan > (2^8)*mdata.LengthPrecision {
log.Fatal(4, "chunkSpan can not be more than 153600 (2^8*10*60)")
}
if (chunkSpan % mdata.LengthPrecision) != 0 {
log.Fatal(4, "chunkSpan must be a multiple of 10min (600)")
}

set := strings.Split(*aggSettings, ",")
finalSettings := make([]mdata.AggSetting, 0)
Expand All @@ -269,6 +277,12 @@ func main() {
if (mdata.Month_sec % aggChunkSpan) != 0 {
log.Fatal(4, "aggChunkSpan must fit without remainders into month_sec (28*24*60*60)")
}
if aggChunkSpan > (2^8)*mdata.LengthPrecision {
log.Fatal(4, "aggChunkSpan can not be more than 153600 (2^8*10*60)")
}
if (aggChunkSpan % mdata.LengthPrecision) != 0 {
log.Fatal(4, "aggChunkSpan must be a multiple of 10min (600)")
}
highestChunkSpan = util.Max(highestChunkSpan, aggChunkSpan)
ready := true
if len(fields) == 5 {
Expand Down

0 comments on commit e9d48f5

Please sign in to comment.