Skip to content

Commit

Permalink
Merge pull request #106 from cockroachdb/spencerkimball/enforce-key-l…
Browse files Browse the repository at this point in the history
…ength

Enforce key length constraint of 4096 bytes.
  • Loading branch information
spencerkimball committed Oct 5, 2014
2 parents 380ba8a + b786ccb commit a949f99
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 17 deletions.
4 changes: 2 additions & 2 deletions server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func TestBootstrapCluster(t *testing.T) {
keys = append(keys, kv.Key)
}
var expectedKeys = []engine.Key{
engine.Key("\x00\x00meta1\xff"),
engine.Key("\x00\x00meta2\xff"),
engine.MakeKey(engine.Key("\x00\x00meta1"), engine.KeyMax),
engine.MakeKey(engine.Key("\x00\x00meta2"), engine.KeyMax),
engine.Key("\x00acct"),
engine.Key("\x00node-idgen"),
engine.Key("\x00perm"),
Expand Down
7 changes: 4 additions & 3 deletions storage/engine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
// InvalidRangeMetaKeyError indicates that a Range Metadata key is somehow
// invalid.
type InvalidRangeMetaKeyError struct {
Msg string
Key Key
}

// NewInvalidRangeMetaKeyError returns a new InvalidRangeMetaKeyError
func NewInvalidRangeMetaKeyError(k Key) *InvalidRangeMetaKeyError {
return &InvalidRangeMetaKeyError{k}
func NewInvalidRangeMetaKeyError(msg string, k Key) *InvalidRangeMetaKeyError {
return &InvalidRangeMetaKeyError{Msg: msg, Key: k}
}

// Error formats error string.
func (i *InvalidRangeMetaKeyError) Error() string {
return fmt.Sprintf("%q is not valid range metadata key.", string(i.Key))
return fmt.Sprintf("%q is not valid range metadata key: %s", string(i.Key), i.Msg)
}

// Init registers engine error types with Gob.
Expand Down
21 changes: 12 additions & 9 deletions storage/engine/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package engine

import (
"bytes"
"fmt"
"strings"

"code.google.com/p/biogo.store/interval"
"github.com/cockroachdb/cockroach/proto"
Expand Down Expand Up @@ -176,7 +178,7 @@ func RangeMetadataLookupKey(r *proto.RangeDescriptor) Key {

// ValidateRangeMetaKey validates that the given key is a valid Range Metadata
// key. It must have an appropriate metadata range prefix, and the original key
// value must be less thas KeyMax. As a special case, KeyMin is considered a
// value must be less than KeyMax. As a special case, KeyMin is considered a
// valid Range Metadata Key.
func ValidateRangeMetaKey(key Key) error {
// KeyMin is a valid key.
Expand All @@ -185,21 +187,21 @@ func ValidateRangeMetaKey(key Key) error {
}
// Key must be at least as long as KeyMeta1Prefix.
if len(key) < len(KeyMeta1Prefix) {
return NewInvalidRangeMetaKeyError(key)
return NewInvalidRangeMetaKeyError("too short", key)
}

prefix, body := key[:len(KeyMeta1Prefix)], key[len(KeyMeta1Prefix):]

// The prefix must be equal to KeyMeta1Prefix or KeyMeta2Prefix
if !bytes.HasPrefix(key, KeyMetaPrefix) {
return NewInvalidRangeMetaKeyError(key)
return NewInvalidRangeMetaKeyError(fmt.Sprintf("does not have %q prefix", KeyMetaPrefix), key)
}
if lvl := string(prefix[len(KeyMetaPrefix)]); lvl != "1" && lvl != "2" {
return NewInvalidRangeMetaKeyError(key)
return NewInvalidRangeMetaKeyError("meta level is not 1 or 2", key)
}
// Body of the key must sort before KeyMax
if !body.Less(KeyMax) {
return NewInvalidRangeMetaKeyError(key)
return NewInvalidRangeMetaKeyError("body of range lookup is >= KeyMax", key)
}

return nil
Expand All @@ -213,12 +215,13 @@ func init() {

// Constants for system-reserved keys in the KV map.
var (
// KeyMaxLength is the maximum key length.
KeyMaxLength = 4096

// KeyMin is a minimum key value which sorts before all other keys.
KeyMin = Key("")
// KeyMax is a maximum key value which sorts after all other
// keys. Because keys are stored using an ordered encoding (see
// storage/encoding.go), they will never start with \xff.
KeyMax = Key("\xff")
// KeyMax is a maximum key value which sorts after all other keys.
KeyMax = Key(strings.Repeat("\xff", KeyMaxLength))

// KeyLocalPrefix is the prefix for keys which hold data local to a
// RocksDB instance, such as range accounting information
Expand Down
5 changes: 3 additions & 2 deletions storage/engine/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ func TestKeyPrefixEnd(t *testing.T) {
key Key
end Key
}{
{Key{}, Key{0xff}},
{Key{}, KeyMax},
{Key{0}, Key{0x01}},
{Key{0xff}, Key{0xff}},
{Key{0xff, 0xff}, Key{0xff, 0xff}},
{KeyMax, KeyMax},
{Key{0xff, 0xfe}, Key{0xff, 0xff}},
{Key{0x00, 0x00}, Key{0x00, 0x01}},
{Key{0x00, 0xff}, Key{0x01, 0x00}},
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestNextKey(t *testing.T) {
{nil, Key("\x00")},
{Key(""), Key("\x00")},
{Key("test key"), Key("test key\x00")},
{Key(KeyMax), Key("\xff\x00")},
{Key(KeyMax), MakeKey(KeyMax, []byte("\x00"))},
{Key("xoxo\x00"), Key("xoxo\x00\x00")},
}
for i, c := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func (r *Range) InternalRangeLookup(args *proto.InternalRangeLookupRequest, repl
// key, but no matching results were returned from the scan. This could
// indicate a very bad system error, but for now we will just treat it
// as a retryable Key Mismatch error.
err := proto.NewRangeKeyMismatchError(args.Key, args.Key, r.Meta)
err := proto.NewRangeKeyMismatchError(args.Key, args.EndKey, r.Meta)
reply.SetGoError(err)
log.Errorf("InternalRangeLookup dispatched to correct range, but no matching RangeDescriptor was found. %s", err)
return
Expand Down
12 changes: 12 additions & 0 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,18 @@ func incrementArgs(key []byte, inc int64, rangeID int64) (*proto.IncrementReques
return args, reply
}

func scanArgs(start, end []byte, rangeID int64) (*proto.ScanRequest, *proto.ScanResponse) {
args := &proto.ScanRequest{
RequestHeader: proto.RequestHeader{
Key: start,
EndKey: end,
Replica: proto.Replica{RangeID: rangeID},
},
}
reply := &proto.ScanResponse{}
return args, reply
}

// endTxnArgs returns request/response pair for EndTransaction RPC
// addressed to the default replica for the specified key.
func endTxnArgs(txn *proto.Transaction, commit bool, rangeID int64) (
Expand Down
33 changes: 33 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,36 @@ func makeRangeKey(rangeID int64) engine.Key {
return engine.MakeLocalKey(engine.KeyLocalRangeMetadataPrefix, engine.Key(strconv.FormatInt(rangeID, 10)))
}

// verifyKeyLength verifies key length. Extra key length is allowed for
// keys prefixed with the meta1 or meta2 addressing prefixes.
func verifyKeyLength(key engine.Key) error {
maxLength := engine.KeyMaxLength
if bytes.HasPrefix(key, engine.KeyMeta1Prefix) || bytes.HasPrefix(key, engine.KeyMeta2Prefix) {
maxLength += len(engine.KeyMeta1Prefix)
}
if len(key) > maxLength {
return util.Errorf("maximum key length exceeded for %q: %d > %d", key, len(key), maxLength)
}
return nil
}

// verifyKeys verifies key length for start and end. If end is
// non-empty, it must be >= start.
func verifyKeys(start, end engine.Key) error {
if err := verifyKeyLength(start); err != nil {
return err
}
if len(end) > 0 {
if err := verifyKeyLength(end); err != nil {
return err
}
if end.Less(start) {
return util.Errorf("end key cannot sort before start: %q < %q", end, start)
}
}
return nil
}

// A RangeSlice is a slice of Range pointers used for replica lookups
// by key.
type RangeSlice []*Range
Expand Down Expand Up @@ -412,6 +442,9 @@ func (s *Store) Descriptor(nodeDesc *NodeDescriptor) (*StoreDescriptor, error) {
func (s *Store) ExecuteCmd(method string, args proto.Request, reply proto.Response) error {
// If the request has a zero timestamp, initialize to this node's clock.
header := args.Header()
if err := verifyKeys(header.Key, header.EndKey); err != nil {
return err
}
if header.Timestamp.WallTime == 0 && header.Timestamp.Logical == 0 {
// Update the incoming timestamp.
now := s.clock.Now()
Expand Down
30 changes: 30 additions & 0 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,36 @@ func TestStoreExecuteCmd(t *testing.T) {

}

// TestStoreVerifyKeys checks that key length is enforced and
// that end keys must sort >= start.
func TestStoreVerifyKeys(t *testing.T) {
store, _ := createTestStore(true, t)
defer store.Close()
tooLongKey := engine.MakeKey(engine.KeyMax, []byte{0})

// Start with a too-long key on a get.
gArgs, gReply := getArgs(tooLongKey, 2)
if err := store.ExecuteCmd(Get, gArgs, gReply); err == nil {
t.Fatal("expected error for key too long")
}
// Try a scan with too-long EndKey.
sArgs, sReply := scanArgs(engine.KeyMin, tooLongKey, 2)
if err := store.ExecuteCmd(Scan, sArgs, sReply); err == nil {
t.Fatal("expected error for end key too long")
}
// Try a scan with end key < start key.
sArgs.Key = []byte("b")
sArgs.EndKey = []byte("a")
if err := store.ExecuteCmd(Scan, sArgs, sReply); err == nil {
t.Fatal("expected error for end key < start")
}
// Finally, try a range lookup with adjusted maximum key length.
pArgs, pReply := putArgs(engine.MakeKey(engine.KeyMeta2Prefix, engine.KeyMax), []byte("value"), 1)
if err := store.ExecuteCmd(Put, pArgs, pReply); err != nil {
t.Fatalf("unexpected error on put to meta2 value: %s", err)
}
}

// TestStoreExecuteCmdUpdateTime verifies that the node clock is updated.
func TestStoreExecuteCmdUpdateTime(t *testing.T) {
store, _ := createTestStore(true, t)
Expand Down

0 comments on commit a949f99

Please sign in to comment.