From b12abb679bc1bed2246aab826ad6e1b738645ca6 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 7 Dec 2017 14:46:18 +0800 Subject: [PATCH 1/4] update gocql --- vendor/github.com/gocql/gocql/AUTHORS | 6 + vendor/github.com/gocql/gocql/README.md | 4 +- vendor/github.com/gocql/gocql/cluster.go | 4 + vendor/github.com/gocql/gocql/conn.go | 116 +++++--- vendor/github.com/gocql/gocql/control.go | 166 ++++++----- vendor/github.com/gocql/gocql/events.go | 77 ++--- vendor/github.com/gocql/gocql/frame.go | 96 +++++-- vendor/github.com/gocql/gocql/helpers.go | 33 ++- vendor/github.com/gocql/gocql/host_source.go | 265 +++++++++--------- vendor/github.com/gocql/gocql/marshal.go | 78 +++--- vendor/github.com/gocql/gocql/policies.go | 83 +++--- .../github.com/gocql/gocql/query_executor.go | 14 +- vendor/github.com/gocql/gocql/ring.go | 6 +- vendor/github.com/gocql/gocql/session.go | 58 ++-- 14 files changed, 574 insertions(+), 432 deletions(-) diff --git a/vendor/github.com/gocql/gocql/AUTHORS b/vendor/github.com/gocql/gocql/AUTHORS index 3557f60ca6..31636869b6 100644 --- a/vendor/github.com/gocql/gocql/AUTHORS +++ b/vendor/github.com/gocql/gocql/AUTHORS @@ -93,3 +93,9 @@ Yasser Abdolmaleki Krishnanand Thommandra Blake Atkinson Dharmendra Parsaila +Nayef Ghattas +MichaƂ Matczuk +Ben Krebsbach +Vivian Mathews +Sascha Steinbiss +Seth Rosenblum diff --git a/vendor/github.com/gocql/gocql/README.md b/vendor/github.com/gocql/gocql/README.md index 2c91575142..bc07eeca1a 100644 --- a/vendor/github.com/gocql/gocql/README.md +++ b/vendor/github.com/gocql/gocql/README.md @@ -19,8 +19,8 @@ The following matrix shows the versions of Go and Cassandra that are tested with Go/Cassandra | 2.1.x | 2.2.x | 3.0.x -------------| -------| ------| --------- -1.6 | yes | yes | yes -1.7 | yes | yes | yes +1.8 | yes | yes | yes +1.9 | yes | yes | yes Gocql has been tested in production against many different versions of Cassandra. Due to limits in our CI setup we only test against the latest 3 major releases, which coincide with the official support from the Apache project. diff --git a/vendor/github.com/gocql/gocql/cluster.go b/vendor/github.com/gocql/gocql/cluster.go index 86406f2bff..f011cefcb6 100644 --- a/vendor/github.com/gocql/gocql/cluster.go +++ b/vendor/github.com/gocql/gocql/cluster.go @@ -168,6 +168,10 @@ func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, i return newAddr, newPort } +func (cfg *ClusterConfig) filterHost(host *HostInfo) bool { + return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host)) +} + var ( ErrNoHosts = errors.New("no hosts provided") ErrNoConnectionsStarted = errors.New("no connections were made when creating the session") diff --git a/vendor/github.com/gocql/gocql/conn.go b/vendor/github.com/gocql/gocql/conn.go index 4819d9b34d..74b179d541 100644 --- a/vendor/github.com/gocql/gocql/conn.go +++ b/vendor/github.com/gocql/gocql/conn.go @@ -141,8 +141,6 @@ type Conn struct { version uint8 currentKeyspace string - host *HostInfo - session *Session closed int32 @@ -152,14 +150,12 @@ type Conn struct { } // Connect establishes a connection to a Cassandra node. -func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, session *Session) (*Conn, error) { +func (s *Session) dial(ip net.IP, port int, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { // TODO(zariel): remove these - if host == nil { - panic("host is nil") - } else if len(host.ConnectAddress()) == 0 { - panic(fmt.Sprintf("host missing connect ip address: %v", host)) - } else if host.Port() == 0 { - panic(fmt.Sprintf("host missing port: %v", host)) + if len(ip) == 0 || ip.IsUnspecified() { + panic(fmt.Sprintf("host missing connect ip address: %v", ip)) + } else if port == 0 { + panic(fmt.Sprintf("host missing port: %v", port)) } var ( @@ -172,9 +168,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses } // TODO(zariel): handle ipv6 zone - translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port()) - addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String() - //addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String() + addr := (&net.TCPAddr{IP: ip, Port: port}).String() if cfg.tlsConfig != nil { // the TLS config is safe to be reused by connections but it must not @@ -200,9 +194,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses compressor: cfg.Compressor, auth: cfg.Authenticator, quit: make(chan struct{}), - session: session, + session: s, streams: streams.New(cfg.ProtoVersion), - host: host, } if cfg.Keepalive > 0 { @@ -405,13 +398,20 @@ func (c *Conn) closeWithError(err error) { // if error was nil then unblock the quit channel close(c.quit) - c.conn.Close() + cerr := c.close() if err != nil { c.errorHandler.HandleError(c, err, true) + } else if cerr != nil { + // TODO(zariel): is it a good idea to do this? + c.errorHandler.HandleError(c, cerr, true) } } +func (c *Conn) close() error { + return c.conn.Close() +} + func (c *Conn) Close() { c.closeWithError(nil) } @@ -420,15 +420,9 @@ func (c *Conn) Close() { // to execute any queries. This method runs as long as the connection is // open and is therefore usually called in a separate goroutine. func (c *Conn) serve() { - var ( - err error - ) - - for { + var err error + for err == nil { err = c.recv() - if err != nil { - break - } } c.closeWithError(err) @@ -756,6 +750,26 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer) return flight.preparedStatment, flight.err } +func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error { + if named, ok := value.(*namedValue); ok { + dst.name = named.name + value = named.value + } + + if _, ok := value.(unsetColumn); !ok { + val, err := Marshal(typ, value) + if err != nil { + return err + } + + dst.value = val + } else { + dst.isUnset = true + } + + return nil +} + func (c *Conn) executeQuery(qry *Query) *Iter { params := queryParams{ consistency: qry.cons, @@ -809,17 +823,12 @@ func (c *Conn) executeQuery(qry *Query) *Iter { params.values = make([]queryValues, len(values)) for i := 0; i < len(values); i++ { - val, err := Marshal(info.request.columns[i].TypeInfo, values[i]) - if err != nil { - return &Iter{err: err} - } - v := ¶ms.values[i] - v.value = val - if _, ok := values[i].(unsetColumn); ok { - v.isUnset = true + value := values[i] + typ := info.request.columns[i].TypeInfo + if err := marshalQueryValue(typ, value, v); err != nil { + return &Iter{err: err} } - // TODO: handle query binding names } params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) @@ -872,8 +881,9 @@ func (c *Conn) executeQuery(qry *Query) *Iter { if len(x.meta.pagingState) > 0 && !qry.disableAutoPage { iter.next = &nextIter{ - qry: *qry, - pos: int((1 - qry.prefetch) * float64(x.numRows)), + qry: *qry, + pos: int((1 - qry.prefetch) * float64(x.numRows)), + conn: c, } iter.next.qry.pageState = copyBytes(x.meta.pagingState) @@ -1009,16 +1019,12 @@ func (c *Conn) executeBatch(batch *Batch) *Iter { b.values = make([]queryValues, info.request.actualColCount) for j := 0; j < info.request.actualColCount; j++ { - val, err := Marshal(info.request.columns[j].TypeInfo, values[j]) - if err != nil { + v := &b.values[j] + value := values[j] + typ := info.request.columns[j].TypeInfo + if err := marshalQueryValue(typ, value, v); err != nil { return &Iter{err: err} } - - b.values[j].value = val - if _, ok := values[j].(unsetColumn); ok { - b.values[j].isUnset = true - } - // TODO: add names } } else { b.statement = entry.Stmt @@ -1089,7 +1095,7 @@ func (c *Conn) query(statement string, values ...interface{}) (iter *Iter) { func (c *Conn) awaitSchemaAgreement() (err error) { const ( - peerSchemas = "SELECT schema_version FROM system.peers" + peerSchemas = "SELECT schema_version, peer FROM system.peers" localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" ) @@ -1102,9 +1108,10 @@ func (c *Conn) awaitSchemaAgreement() (err error) { versions = make(map[string]struct{}) var schemaVersion string - for iter.Scan(&schemaVersion) { + var peer string + for iter.Scan(&schemaVersion, &peer) { if schemaVersion == "" { - Logger.Println("skipping peer entry with empty schema_version") + Logger.Printf("skipping peer entry with empty schema_version: peer=%q", peer) continue } @@ -1147,6 +1154,25 @@ func (c *Conn) awaitSchemaAgreement() (err error) { return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas) } +const localHostInfo = "SELECT * FROM system.local WHERE key='local'" + +func (c *Conn) localHostInfo() (*HostInfo, error) { + row, err := c.query(localHostInfo).rowMap() + if err != nil { + return nil, err + } + + port := c.conn.RemoteAddr().(*net.TCPAddr).Port + + // TODO(zariel): avoid doing this here + host, err := c.session.hostInfoFromMap(row, port) + if err != nil { + return nil, err + } + + return c.session.ring.addOrUpdate(host), nil +} + var ( ErrQueryArgLength = errors.New("gocql: query argument length mismatch") ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period") diff --git a/vendor/github.com/gocql/gocql/control.go b/vendor/github.com/gocql/gocql/control.go index 1f7424137d..482782393d 100644 --- a/vendor/github.com/gocql/gocql/control.go +++ b/vendor/github.com/gocql/gocql/control.go @@ -7,6 +7,7 @@ import ( "fmt" "math/rand" "net" + "os" "regexp" "strconv" "sync" @@ -31,13 +32,15 @@ func init() { // Ensure that the atomic variable is aligned to a 64bit boundary // so that atomic operations can be applied on 32bit architectures. type controlConn struct { + started int32 + reconnecting int32 + session *Session conn atomic.Value retry RetryPolicy - started int32 - quit chan struct{} + quit chan struct{} } func createControlConn(session *Session) *controlConn { @@ -47,7 +50,7 @@ func createControlConn(session *Session) *controlConn { retry: &SimpleRetryPolicy{NumRetries: 3}, } - control.conn.Store((*Conn)(nil)) + control.conn.Store((*connHost)(nil)) return control } @@ -58,12 +61,16 @@ func (c *controlConn) heartBeat() { } sleepTime := 1 * time.Second + timer := time.NewTimer(sleepTime) + defer timer.Stop() for { + timer.Reset(sleepTime) + select { case <-c.quit: return - case <-time.After(sleepTime): + case <-timer.C: } resp, err := c.writeFrame(&writeOptionsFrame{}) @@ -86,14 +93,13 @@ func (c *controlConn) heartBeat() { // try to connect a bit faster sleepTime = 1 * time.Second c.reconnect(true) - // time.Sleep(5 * time.Second) continue } } -var hostLookupPreferV4 = false +var hostLookupPreferV4 = os.Getenv("GOCQL_HOST_LOOKUP_PREFER_V4") == "true" -func hostInfo(addr string, defaultPort int) (*HostInfo, error) { +func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) { var port int host, portStr, err := net.SplitHostPort(addr) if err != nil { @@ -106,33 +112,40 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) { } } - ip := net.ParseIP(host) - if ip == nil { - ips, err := net.LookupIP(host) - if err != nil { - return nil, err - } else if len(ips) == 0 { - return nil, fmt.Errorf("No IP's returned from DNS lookup for %q", addr) - } + var hosts []*HostInfo - if hostLookupPreferV4 { - for _, v := range ips { - if v4 := v.To4(); v4 != nil { - ip = v4 - break - } - } - if ip == nil { - ip = ips[0] + // Check if host is a literal IP address + if ip := net.ParseIP(host); ip != nil { + hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) + return hosts, nil + } + + // Look up host in DNS + ips, err := net.LookupIP(host) + if err != nil { + return nil, err + } else if len(ips) == 0 { + return nil, fmt.Errorf("No IP's returned from DNS lookup for %q", addr) + } + + // Filter to v4 addresses if any present + if hostLookupPreferV4 { + var preferredIPs []net.IP + for _, v := range ips { + if v4 := v.To4(); v4 != nil { + preferredIPs = append(preferredIPs, v4) } - } else { - // TODO(zariel): should we check that we can connect to any of the ips? - ip = ips[0] } + if len(preferredIPs) != 0 { + ips = preferredIPs + } + } + for _, ip := range ips { + hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) } - return &HostInfo{connectAddress: ip, port: port}, nil + return hosts, nil } func shuffleHosts(hosts []*HostInfo) []*HostInfo { @@ -197,14 +210,20 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) { handler := connErrorHandlerFn(func(c *Conn, err error, closed bool) { // we should never get here, but if we do it means we connected to a // host successfully which means our attempted protocol version worked + if !closed { + c.Close() + } }) var err error for _, host := range hosts { var conn *Conn - conn, err = Connect(host, &connCfg, handler, c.session) - if err == nil { + conn, err = c.session.dial(host.ConnectAddress(), host.Port(), &connCfg, handler) + if conn != nil { conn.Close() + } + + if err == nil { return connCfg.ProtoVersion, nil } @@ -239,35 +258,31 @@ func (c *controlConn) connect(hosts []*HostInfo) error { return nil } +type connHost struct { + conn *Conn + host *HostInfo +} + func (c *controlConn) setupConn(conn *Conn) error { if err := c.registerEvents(conn); err != nil { conn.Close() return err } - c.conn.Store(conn) - - if v, ok := conn.conn.RemoteAddr().(*net.TCPAddr); ok { - c.session.handleNodeUp(copyBytes(v.IP), v.Port, false) - return nil - } - - host, portstr, err := net.SplitHostPort(conn.conn.RemoteAddr().String()) + // TODO(zariel): do we need to fetch host info everytime + // the control conn connects? Surely we have it cached? + host, err := conn.localHostInfo() if err != nil { return err } - port, err := strconv.Atoi(portstr) - if err != nil { - return err + ch := &connHost{ + conn: conn, + host: host, } - ip := net.ParseIP(host) - if ip == nil { - return fmt.Errorf("invalid remote addr: addr=%v host=%q", conn.conn.RemoteAddr(), host) - } - - c.session.handleNodeUp(ip, port, false) + c.conn.Store(ch) + c.session.handleNodeUp(host.ConnectAddress(), host.Port(), false) return nil } @@ -308,14 +323,18 @@ func (c *controlConn) registerEvents(conn *Conn) error { } func (c *controlConn) reconnect(refreshring bool) { + if !atomic.CompareAndSwapInt32(&c.reconnecting, 0, 1) { + return + } + defer atomic.StoreInt32(&c.reconnecting, 0) // TODO: simplify this function, use session.ring to get hosts instead of the // connection pool var host *HostInfo - oldConn := c.conn.Load().(*Conn) - if oldConn != nil { - host = oldConn.host - oldConn.Close() + ch := c.getConn() + if ch != nil { + host = ch.host + ch.conn.Close() } var newConn *Conn @@ -364,21 +383,25 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { return } - oldConn := c.conn.Load().(*Conn) - if oldConn != conn { + oldConn := c.getConn() + if oldConn.conn != conn { return } - c.reconnect(true) + c.reconnect(false) +} + +func (c *controlConn) getConn() *connHost { + return c.conn.Load().(*connHost) } func (c *controlConn) writeFrame(w frameWriter) (frame, error) { - conn := c.conn.Load().(*Conn) - if conn == nil { + ch := c.getConn() + if ch == nil { return nil, errNoControl } - framer, err := conn.exec(context.Background(), w, nil) + framer, err := ch.conn.exec(context.Background(), w, nil) if err != nil { return nil, err } @@ -386,13 +409,13 @@ func (c *controlConn) writeFrame(w frameWriter) (frame, error) { return framer.parseFrame() } -func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter { +func (c *controlConn) withConnHost(fn func(*connHost) *Iter) *Iter { const maxConnectAttempts = 5 connectAttempts := 0 for i := 0; i < maxConnectAttempts; i++ { - conn := c.conn.Load().(*Conn) - if conn == nil { + ch := c.getConn() + if ch == nil { if connectAttempts > maxConnectAttempts { break } @@ -403,12 +426,18 @@ func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter { continue } - return fn(conn) + return fn(ch) } return &Iter{err: errNoControl} } +func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter { + return c.withConnHost(func(ch *connHost) *Iter { + return fn(ch.conn) + }) +} + // query will return nil if the connection is closed or nil func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) { q := c.session.Query(statement, values...).Consistency(One).RoutingKey([]byte{}).Trace(nil) @@ -437,21 +466,14 @@ func (c *controlConn) awaitSchemaAgreement() error { }).err } -func (c *controlConn) GetHostInfo() *HostInfo { - conn := c.conn.Load().(*Conn) - if conn == nil { - return nil - } - return conn.host -} - func (c *controlConn) close() { if atomic.CompareAndSwapInt32(&c.started, 1, -1) { c.quit <- struct{}{} } - conn := c.conn.Load().(*Conn) - if conn != nil { - conn.Close() + + ch := c.getConn() + if ch != nil { + ch.conn.Close() } } diff --git a/vendor/github.com/gocql/gocql/events.go b/vendor/github.com/gocql/gocql/events.go index 78daa76b41..e6d28a19b7 100644 --- a/vendor/github.com/gocql/gocql/events.go +++ b/vendor/github.com/gocql/gocql/events.go @@ -173,16 +173,30 @@ func (s *Session) handleNodeEvent(frames []frame) { } } +func (s *Session) addNewNode(host *HostInfo) { + if s.cfg.filterHost(host) { + return + } + + host.setState(NodeUp) + s.pool.addHost(host) + s.policy.AddHost(host) +} + func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) { + if gocqlDebug { + Logger.Printf("gocql: Session.handleNewNode: %s:%d\n", ip.String(), port) + } + + ip, port = s.cfg.translateAddressPort(ip, port) + // Get host info and apply any filters to the host - hostInfo, err := s.hostSource.GetHostInfo(ip, port) + hostInfo, err := s.hostSource.getHostInfo(ip, port) if err != nil { Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err) return - } - - // If hostInfo is nil, this host was filtered out by cfg.HostFilter - if hostInfo == nil { + } else if hostInfo == nil { + // If hostInfo is nil, this host was filtered out by cfg.HostFilter return } @@ -191,20 +205,23 @@ func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) { } // should this handle token moving? - if existing, ok := s.ring.addHostIfMissing(hostInfo); ok { - existing.update(hostInfo) - hostInfo = existing - } + hostInfo = s.ring.addOrUpdate(hostInfo) + + s.addNewNode(hostInfo) - s.pool.addHost(hostInfo) - s.policy.AddHost(hostInfo) - hostInfo.setState(NodeUp) if s.control != nil && !s.cfg.IgnorePeerAddr { + // TODO(zariel): debounce ring refresh s.hostSource.refreshRing() } } func (s *Session) handleRemovedNode(ip net.IP, port int) { + if gocqlDebug { + Logger.Printf("gocql: Session.handleRemovedNode: %s:%d\n", ip.String(), port) + } + + ip, port = s.cfg.translateAddressPort(ip, port) + // we remove all nodes but only add ones which pass the filter host := s.ring.getHost(ip) if host == nil { @@ -225,34 +242,30 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) { } } -func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { +func (s *Session) handleNodeUp(eventIp net.IP, eventPort int, waitForBinary bool) { if gocqlDebug { - Logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", ip.String(), port) + Logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort) } - host := s.ring.getHost(ip) - if host != nil { - // If we receive a node up event and user has asked us to ignore the peer address use - // the address provide by the event instead the address provide by peer the table. - if s.cfg.IgnorePeerAddr && !host.ConnectAddress().Equal(ip) { - host.SetConnectAddress(ip) - } - - if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { - return - } + ip, _ := s.cfg.translateAddressPort(eventIp, eventPort) - if t := host.Version().nodeUpDelay(); t > 0 && waitForBinary { - time.Sleep(t) - } + host := s.ring.getHost(ip) + if host == nil { + // TODO(zariel): avoid the need to translate twice in this + // case + s.handleNewNode(eventIp, eventPort, waitForBinary) + return + } - s.pool.hostUp(host) - s.policy.HostUp(host) - host.setState(NodeUp) + if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { return } - s.handleNewNode(ip, port, waitForBinary) + if t := host.Version().nodeUpDelay(); t > 0 && waitForBinary { + time.Sleep(t) + } + + s.addNewNode(host) } func (s *Session) handleNodeDown(ip net.IP, port int) { diff --git a/vendor/github.com/gocql/gocql/frame.go b/vendor/github.com/gocql/gocql/frame.go index 98b47eb459..ae94c0f9f6 100644 --- a/vendor/github.com/gocql/gocql/frame.go +++ b/vendor/github.com/gocql/gocql/frame.go @@ -20,6 +20,19 @@ type unsetColumn struct{} var UnsetValue = unsetColumn{} +type namedValue struct { + name string + value interface{} +} + +// NamedValue produce a value which will bind to the named parameter in a query +func NamedValue(name string, value interface{}) interface{} { + return &namedValue{ + name: name, + value: value, + } +} + const ( protoDirectionMask = 0x80 protoVersionMask = 0x7F @@ -183,45 +196,61 @@ func (c Consistency) String() string { } } -func ParseConsistency(s string) Consistency { - switch strings.ToUpper(s) { +func (c Consistency) MarshalText() (text []byte, err error) { + return []byte(c.String()), nil +} + +func (c *Consistency) UnmarshalText(text []byte) error { + switch string(text) { case "ANY": - return Any + *c = Any case "ONE": - return One + *c = One case "TWO": - return Two + *c = Two case "THREE": - return Three + *c = Three case "QUORUM": - return Quorum + *c = Quorum case "ALL": - return All + *c = All case "LOCAL_QUORUM": - return LocalQuorum + *c = LocalQuorum case "EACH_QUORUM": - return EachQuorum + *c = EachQuorum case "LOCAL_ONE": - return LocalOne + *c = LocalOne default: - panic("invalid consistency: " + s) + return fmt.Errorf("invalid consistency %q", string(text)) } + + return nil +} + +func ParseConsistency(s string) Consistency { + var c Consistency + if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { + panic(err) + } + return c } // ParseConsistencyWrapper wraps gocql.ParseConsistency to provide an err // return instead of a panic func ParseConsistencyWrapper(s string) (consistency Consistency, err error) { - defer func() { - if r := recover(); r != nil { - var ok bool - err, ok = r.(error) - if !ok { - err = fmt.Errorf("ParseConsistencyWrapper: %v", r) - } - } - }() - consistency = ParseConsistency(s) - return consistency, nil + err = consistency.UnmarshalText([]byte(strings.ToUpper(s))) + return +} + +// MustParseConsistency is the same as ParseConsistency except it returns +// an error (never). It is kept here since breaking changes are not good. +// DEPRECATED: use ParseConsistency if you want a panic on parse error. +func MustParseConsistency(s string) (Consistency, error) { + c, err := ParseConsistencyWrapper(s) + if err != nil { + panic(err) + } + return c, nil } type SerialConsistency uint16 @@ -242,6 +271,23 @@ func (s SerialConsistency) String() string { } } +func (s SerialConsistency) MarshalText() (text []byte, err error) { + return []byte(s.String()), nil +} + +func (s *SerialConsistency) UnmarshalText(text []byte) error { + switch string(text) { + case "SERIAL": + *s = Serial + case "LOCAL_SERIAL": + *s = LocalSerial + default: + return fmt.Errorf("invalid consistency %q", string(text)) + } + + return nil +} + const ( apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal." ) @@ -830,7 +876,7 @@ func (f *framer) parsePreparedMetadata() preparedMetadata { } if meta.flags&flagHasMorePages == flagHasMorePages { - meta.pagingState = f.readBytes() + meta.pagingState = copyBytes(f.readBytes()) } if meta.flags&flagNoMetaData == flagNoMetaData { @@ -915,7 +961,7 @@ func (f *framer) parseResultMetadata() resultMetadata { meta.actualColCount = meta.colCount if meta.flags&flagHasMorePages == flagHasMorePages { - meta.pagingState = f.readBytes() + meta.pagingState = copyBytes(f.readBytes()) } if meta.flags&flagNoMetaData == flagNoMetaData { diff --git a/vendor/github.com/gocql/gocql/helpers.go b/vendor/github.com/gocql/gocql/helpers.go index cb23dad9d5..944517e289 100644 --- a/vendor/github.com/gocql/gocql/helpers.go +++ b/vendor/github.com/gocql/gocql/helpers.go @@ -205,30 +205,43 @@ func (iter *Iter) RowData() (RowData, error) { return RowData{}, iter.err } - columns := make([]string, 0) - values := make([]interface{}, 0) + columns := make([]string, 0, len(iter.Columns())) + values := make([]interface{}, 0, len(iter.Columns())) for _, column := range iter.Columns() { - - switch c := column.TypeInfo.(type) { - case TupleTypeInfo: + if c, ok := column.TypeInfo.(TupleTypeInfo); !ok { + val := column.TypeInfo.New() + columns = append(columns, column.Name) + values = append(values, val) + } else { for i, elem := range c.Elems { columns = append(columns, TupleColumnName(column.Name, i)) values = append(values, elem.New()) } - default: - val := column.TypeInfo.New() - columns = append(columns, column.Name) - values = append(values, val) } } + rowData := RowData{ Columns: columns, Values: values, } + return rowData, nil } +// TODO(zariel): is it worth exporting this? +func (iter *Iter) rowMap() (map[string]interface{}, error) { + if iter.err != nil { + return nil, iter.err + } + + rowData, _ := iter.RowData() + iter.Scan(rowData.Values...) + m := make(map[string]interface{}, len(rowData.Columns)) + rowData.rowMap(m) + return m, nil +} + // SliceMap is a helper function to make the API easier to use // returns the data from the query in the form of []map[string]interface{} func (iter *Iter) SliceMap() ([]map[string]interface{}, error) { @@ -240,7 +253,7 @@ func (iter *Iter) SliceMap() ([]map[string]interface{}, error) { rowData, _ := iter.RowData() dataToReturn := make([]map[string]interface{}, 0) for iter.Scan(rowData.Values...) { - m := make(map[string]interface{}) + m := make(map[string]interface{}, len(rowData.Columns)) rowData.rowMap(m) dataToReturn = append(dataToReturn, m) } diff --git a/vendor/github.com/gocql/gocql/host_source.go b/vendor/github.com/gocql/gocql/host_source.go index 677d80f6ac..988324c2e9 100644 --- a/vendor/github.com/gocql/gocql/host_source.go +++ b/vendor/github.com/gocql/gocql/host_source.go @@ -121,15 +121,9 @@ type HostInfo struct { } func (h *HostInfo) Equal(host *HostInfo) bool { - h.mu.RLock() - defer h.mu.RUnlock() - //If both hosts pointers are same then lock is required only once because of below reasons: - //Reason 1: There is no point taking lock twice on same mutex variable. - //Reason 2: It may lead to deadlock e.g. if WLock is requested by other routine in between 1st & 2nd RLock - //So WLock will be blocked on 1st RLock and 2nd RLock will be blocked on requested WLock. - if h != host { - host.mu.RLock() - defer host.mu.RUnlock() + if h == host { + // prevent rlock reentry + return true } return h.ConnectAddress().Equal(host.ConnectAddress()) @@ -189,6 +183,7 @@ func (h *HostInfo) ConnectAddress() net.IP { } func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo { + // TODO(zariel): should this not be exported? h.mu.Lock() defer h.mu.Unlock() h.connectAddress = address @@ -341,13 +336,65 @@ func (h *HostInfo) setPort(port int) *HostInfo { } func (h *HostInfo) update(from *HostInfo) { + if h == from { + return + } + h.mu.Lock() defer h.mu.Unlock() - h.tokens = from.tokens - h.version = from.version - h.hostId = from.hostId - h.dataCenter = from.dataCenter + from.mu.RLock() + defer from.mu.RUnlock() + + // autogenerated do not update + if h.peer == nil { + h.peer = from.peer + } + if h.broadcastAddress == nil { + h.broadcastAddress = from.broadcastAddress + } + if h.listenAddress == nil { + h.listenAddress = from.listenAddress + } + if h.rpcAddress == nil { + h.rpcAddress = from.rpcAddress + } + if h.preferredIP == nil { + h.preferredIP = from.preferredIP + } + if h.connectAddress == nil { + h.connectAddress = from.connectAddress + } + if h.port == 0 { + h.port = from.port + } + if h.dataCenter == "" { + h.dataCenter = from.dataCenter + } + if h.rack == "" { + h.rack = from.rack + } + if h.hostId == "" { + h.hostId = from.hostId + } + if h.workload == "" { + h.workload = from.workload + } + if h.dseVersion == "" { + h.dseVersion = from.dseVersion + } + if h.partitioner == "" { + h.partitioner = from.partitioner + } + if h.clusterName == "" { + h.clusterName = from.clusterName + } + if h.version == (cassVersion{}) { + h.version = from.version + } + if h.tokens == nil { + h.tokens = from.tokens + } } func (h *HostInfo) IsUp() bool { @@ -372,7 +419,6 @@ type ringDescriber struct { session *Session mu sync.Mutex prevHosts []*HostInfo - localHost *HostInfo prevPartitioner string } @@ -394,13 +440,13 @@ func checkSystemSchema(control *controlConn) (bool, error) { // Given a map that represents a row from either system.local or system.peers // return as much information as we can in *HostInfo -func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (*HostInfo, error) { +func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) { const assertErrorMsg = "Assertion failed for %s" var ok bool // Default to our connected port if the cluster doesn't have port information host := HostInfo{ - port: r.session.cfg.Port, + port: port, } for key, value := range row { @@ -492,86 +538,51 @@ func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (*HostInfo, // Not sure what the port field will be called until the JIRA issue is complete } - return &host, nil -} - -// Ask the control node for it's local host information -func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) { - it := r.session.control.query("SELECT * FROM system.local WHERE key='local'") - if it == nil { - return nil, errors.New("Attempted to query 'system.local' on a closed control connection") - } - host, err := r.extractHostInfo(it) - if err != nil { - return nil, err - } - - if host.invalidConnectAddr() { - host.SetConnectAddress(r.session.control.GetHostInfo().ConnectAddress()) - } - - return host, nil -} - -// Given an ip address and port, return a peer that matched the ip address -func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) { - it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip) - if it == nil { - return nil, errors.New("Attempted to query 'system.peers' on a closed control connection") - } - return r.extractHostInfo(it) -} - -func (r *ringDescriber) extractHostInfo(it *Iter) (*HostInfo, error) { - row := make(map[string]interface{}) + ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port) + host.connectAddress = ip + host.port = port - // expect only 1 row - it.MapScan(row) - if err := it.Close(); err != nil { - return nil, err - } - - // extract all available info about the host - return r.hostInfoFromMap(row) + return &host, nil } // Ask the control node for host info on all it's known peers -func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) { +func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) { var hosts []*HostInfo + iter := r.session.control.withConnHost(func(ch *connHost) *Iter { + hosts = append(hosts, ch.host) + return ch.conn.query("SELECT * FROM system.peers") + }) - // Ask the node for a list of it's peers - it := r.session.control.query("SELECT * FROM system.peers") - if it == nil { - return nil, errors.New("Attempted to query 'system.peers' on a closed connection") + if iter == nil { + return nil, errNoControl } - for { - row := make(map[string]interface{}) - if !it.MapScan(row) { - break - } + rows, err := iter.SliceMap() + if err != nil { + // TODO(zariel): make typed error + return nil, fmt.Errorf("unable to fetch peer host info: %s", err) + } + + for _, row := range rows { // extract all available info about the peer - host, err := r.hostInfoFromMap(row) + host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port) if err != nil { return nil, err - } - - // If it's not a valid peer - if !r.IsValidPeer(host) { - Logger.Printf("Found invalid peer '%+v' "+ + } else if !isValidPeer(host) { + // If it's not a valid peer + Logger.Printf("Found invalid peer '%s' "+ "Likely due to a gossip or snitch issue, this host will be ignored", host) continue } + hosts = append(hosts, host) } - if it.err != nil { - return nil, fmt.Errorf("while scanning 'system.peers' table: %s", it.err) - } + return hosts, nil } // Return true if the host is a valid peer -func (r *ringDescriber) IsValidPeer(host *HostInfo) bool { +func isValidPeer(host *HostInfo) bool { return !(len(host.RPCAddress()) == 0 || host.hostId == "" || host.dataCenter == "" || @@ -584,84 +595,58 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { r.mu.Lock() defer r.mu.Unlock() - // Update the localHost info with data from the connected host - localHost, err := r.GetLocalHostInfo() + hosts, err := r.getClusterPeerInfo() if err != nil { return r.prevHosts, r.prevPartitioner, err - } else if localHost.invalidConnectAddr() { - panic(fmt.Sprintf("unable to get localhost connect address: %v", localHost)) } - // Update our list of hosts by querying the cluster - hosts, err := r.GetClusterPeerInfo() - if err != nil { - return r.prevHosts, r.prevPartitioner, err - } - - hosts = append(hosts, localHost) - - // Filter the hosts if filter is provided - filteredHosts := hosts - if r.session.cfg.HostFilter != nil { - filteredHosts = filteredHosts[:0] - for _, host := range hosts { - if r.session.cfg.HostFilter.Accept(host) { - filteredHosts = append(filteredHosts, host) - } - } + var partitioner string + if len(hosts) > 0 { + partitioner = hosts[0].Partitioner() } - r.prevHosts = filteredHosts - r.prevPartitioner = localHost.partitioner - r.localHost = localHost - - return filteredHosts, localHost.partitioner, nil + return hosts, partitioner, nil } // Given an ip/port return HostInfo for the specified ip/port -func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) { - // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup? - // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true? - - // Ignore the port and connect address and use the address/port we already have - if r.session.control == nil || r.session.cfg.IgnorePeerAddr { - return &HostInfo{connectAddress: ip, port: port}, nil - } +func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) { + var host *HostInfo + iter := r.session.control.withConnHost(func(ch *connHost) *Iter { + if ch.host.ConnectAddress().Equal(ip) { + host = ch.host + return nil + } - // Attempt to get the host info for our control connection - controlHost := r.session.control.GetHostInfo() - if controlHost == nil { - return nil, errors.New("invalid control connection") - } + return ch.conn.query("SELECT * FROM system.peers") + }) - var ( - host *HostInfo - err error - ) + if iter != nil { + rows, err := iter.SliceMap() + if err != nil { + return nil, err + } - // If we are asking about the same node our control connection has a connection too - if controlHost.ConnectAddress().Equal(ip) { - host, err = r.GetLocalHostInfo() - } else { - host, err = r.GetPeerHostInfo(ip, port) - } + for _, row := range rows { + h, err := r.session.hostInfoFromMap(row, port) + if err != nil { + return nil, err + } - // No host was found matching this ip/port - if err != nil { - return nil, err - } + if h.ConnectAddress().Equal(ip) { + host = h + break + } + } - if controlHost.ConnectAddress().Equal(ip) { - // Always respect the provided control node address and disregard the ip address - // the cassandra node provides. We do this as we are already connected and have a - // known valid ip address. This insulates gocql from client connection issues stemming - // from node misconfiguration. For instance when a node is run from a container, by - // default the node will report its ip address as 127.0.0.1 which is typically invalid. - host.SetConnectAddress(ip) + if host == nil { + return nil, errors.New("host not found in peers table") + } } - if host.invalidConnectAddr() { - return nil, fmt.Errorf("host ConnectAddress invalid: %v", host) + if host == nil { + return nil, errors.New("unable to fetch host info: invalid control connection") + } else if host.invalidConnectAddr() { + return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host) } return host, nil @@ -681,6 +666,10 @@ func (r *ringDescriber) refreshRing() error { // TODO: move this to session for _, h := range hosts { + if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) { + continue + } + if host, ok := r.session.ring.addHostIfMissing(h); !ok { r.session.pool.addHost(h) r.session.policy.AddHost(h) diff --git a/vendor/github.com/gocql/gocql/marshal.go b/vendor/github.com/gocql/gocql/marshal.go index 8d6a55cfc2..0383fed7cb 100644 --- a/vendor/github.com/gocql/gocql/marshal.go +++ b/vendor/github.com/gocql/gocql/marshal.go @@ -1667,6 +1667,16 @@ func marshalTuple(info TypeInfo, value interface{}) ([]byte, error) { return nil, marshalErrorf("cannot marshal %T into %s", value, tuple) } +func readBytes(p []byte) ([]byte, []byte) { + // TODO: really should use a framer + size := readInt(p) + p = p[4:] + if size < 0 { + return nil, p + } + return p[:size], p[size:] +} + // currently only support unmarshal into a list of values, this makes it possible // to support tuples without changing the query API. In the future this can be extend // to allow unmarshalling into custom tuple types. @@ -1680,14 +1690,13 @@ func unmarshalTuple(info TypeInfo, data []byte, value interface{}) error { case []interface{}: for i, elem := range tuple.Elems { // each element inside data is a [bytes] - size := readInt(data) - data = data[4:] + var p []byte + p, data = readBytes(data) - err := Unmarshal(elem, data[:size], v[i]) + err := Unmarshal(elem, p, v[i]) if err != nil { return err } - data = data[size:] } return nil @@ -1864,18 +1873,11 @@ func unmarshalUDT(info TypeInfo, data []byte, value interface{}) error { if len(data) == 0 { return nil } - size := readInt(data[:4]) - data = data[4:] - var err error - if size < 0 { - err = v.UnmarshalUDT(e.Name, e.Type, nil) - } else { - err = v.UnmarshalUDT(e.Name, e.Type, data[:size]) - data = data[size:] - } + var p []byte + p, data = readBytes(data) - if err != nil { + if err := v.UnmarshalUDT(e.Name, e.Type, p); err != nil { return err } } @@ -1905,20 +1907,13 @@ func unmarshalUDT(info TypeInfo, data []byte, value interface{}) error { if len(data) == 0 { return nil } - size := readInt(data[:4]) - data = data[4:] val := reflect.New(goType(e.Type)) - var err error - if size < 0 { - err = Unmarshal(e.Type, nil, val.Interface()) - } else { - err = Unmarshal(e.Type, data[:size], val.Interface()) - data = data[size:] - } + var p []byte + p, data = readBytes(data) - if err != nil { + if err := Unmarshal(e.Type, p, val.Interface()); err != nil { return err } @@ -1958,29 +1953,26 @@ func unmarshalUDT(info TypeInfo, data []byte, value interface{}) error { return nil } - size := readInt(data[:4]) - data = data[4:] + var p []byte + p, data = readBytes(data) - if size >= 0 { - f, ok := fields[e.Name] - if !ok { - f = k.FieldByName(e.Name) - if f == emptyValue { - // skip fields which exist in the UDT but not in - // the struct passed in - continue - } + f, ok := fields[e.Name] + if !ok { + f = k.FieldByName(e.Name) + if f == emptyValue { + // skip fields which exist in the UDT but not in + // the struct passed in + continue } + } - if !f.IsValid() || !f.CanAddr() { - return unmarshalErrorf("cannot unmarshal %s into %T: field %v is not valid", info, value, e.Name) - } + if !f.IsValid() || !f.CanAddr() { + return unmarshalErrorf("cannot unmarshal %s into %T: field %v is not valid", info, value, e.Name) + } - fk := f.Addr().Interface() - if err := Unmarshal(e.Type, data[:size], fk); err != nil { - return err - } - data = data[size:] + fk := f.Addr().Interface() + if err := Unmarshal(e.Type, p, fk); err != nil { + return err } } diff --git a/vendor/github.com/gocql/gocql/policies.go b/vendor/github.com/gocql/gocql/policies.go index b53bd45fbe..0001db1181 100644 --- a/vendor/github.com/gocql/gocql/policies.go +++ b/vendor/github.com/gocql/gocql/policies.go @@ -187,6 +187,9 @@ func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration { napDuration := minFloat * math.Pow(2, float64(attempts-1)) // add some jitter napDuration += rand.Float64()*minFloat - (minFloat / 2) + if napDuration > float64(e.Max) { + return time.Duration(e.Max) + } return time.Duration(napDuration) } @@ -294,6 +297,9 @@ type tokenAwareHostPolicy struct { } func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { + t.mu.Lock() + defer t.mu.Unlock() + if t.partitioner != partitioner { t.fallback.SetPartitioner(partitioner) t.partitioner = partitioner @@ -303,6 +309,9 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { } func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { + t.mu.Lock() + defer t.mu.Unlock() + t.hosts.add(host) t.fallback.AddHost(host) @@ -310,6 +319,9 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { + t.mu.Lock() + defer t.mu.Unlock() + t.hosts.remove(host.ConnectAddress()) t.fallback.RemoveHost(host) @@ -325,9 +337,6 @@ func (t *tokenAwareHostPolicy) HostDown(host *HostInfo) { } func (t *tokenAwareHostPolicy) resetTokenRing() { - t.mu.Lock() - defer t.mu.Unlock() - if t.partitioner == "" { // partitioner not yet set return @@ -537,43 +546,36 @@ func (host selectedHostPoolHost) Mark(err error) { } type dcAwareRR struct { - local string - + local string + pos uint32 mu sync.RWMutex - localHosts map[string]*HostInfo - remoteHosts map[string]*HostInfo + localHosts cowHostList + remoteHosts cowHostList } -// DCAwareRoundRobinPolicy is a host selection policies which will priorities and +// DCAwareRoundRobinPolicy is a host selection policies which will prioritize and // return hosts which are in the local datacentre before returning hosts in all // other datercentres func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy { return &dcAwareRR{ - local: localDC, - localHosts: make(map[string]*HostInfo), - remoteHosts: make(map[string]*HostInfo), + local: localDC, } } func (d *dcAwareRR) AddHost(host *HostInfo) { - d.mu.Lock() - if host.DataCenter() == d.local { - d.localHosts[host.HostID()] = host + d.localHosts.add(host) } else { - d.remoteHosts[host.HostID()] = host + d.remoteHosts.add(host) } - - d.mu.Unlock() } func (d *dcAwareRR) RemoveHost(host *HostInfo) { - d.mu.Lock() - - delete(d.localHosts, host.HostID()) - delete(d.remoteHosts, host.HostID()) - - d.mu.Unlock() + if host.DataCenter() == d.local { + d.localHosts.remove(host.ConnectAddress()) + } else { + d.remoteHosts.remove(host.ConnectAddress()) + } } func (d *dcAwareRR) HostUp(host *HostInfo) { @@ -587,29 +589,28 @@ func (d *dcAwareRR) HostDown(host *HostInfo) { func (d *dcAwareRR) SetPartitioner(p string) {} func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { - d.mu.RLock() - - // TODO: this is O(len(hosts)) and requires calculating a full query plan for - // every query. On the other hand it is stupidly simply and provides random host - // order prefering local dcs over remote ones. - hosts := make([]*HostInfo, 0, len(d.localHosts)+len(d.remoteHosts)) - for _, host := range d.localHosts { - hosts = append(hosts, host) - } - for _, host := range d.remoteHosts { - hosts = append(hosts, host) - } - - d.mu.RUnlock() - + var i int return func() SelectedHost { + var hosts []*HostInfo + localHosts := d.localHosts.get() + remoteHosts := d.remoteHosts.get() + if len(localHosts) != 0 { + hosts = localHosts + } else { + hosts = remoteHosts + } if len(hosts) == 0 { return nil } - host := hosts[0] - hosts = hosts[1:] - + // always increment pos to evenly distribute traffic in case of + // failures + pos := atomic.AddUint32(&d.pos, 1) - 1 + if i >= len(localHosts)+len(remoteHosts) { + return nil + } + host := hosts[(pos)%uint32(len(hosts))] + i++ return (*selectedHost)(host) } } diff --git a/vendor/github.com/gocql/gocql/query_executor.go b/vendor/github.com/gocql/gocql/query_executor.go index 019beff280..4f98730161 100644 --- a/vendor/github.com/gocql/gocql/query_executor.go +++ b/vendor/github.com/gocql/gocql/query_executor.go @@ -17,6 +17,15 @@ type queryExecutor struct { policy HostSelectionPolicy } +func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter { + start := time.Now() + iter := qry.execute(conn) + + qry.attempt(time.Since(start)) + + return iter +} + func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { rt := qry.retryPolicy() hostIter := q.policy.Pick(qry) @@ -38,10 +47,7 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { continue } - start := time.Now() - iter = qry.execute(conn) - - qry.attempt(time.Since(start)) + iter = q.attemptQuery(qry, conn) // Update host hostResponse.Mark(iter.err) diff --git a/vendor/github.com/gocql/gocql/ring.go b/vendor/github.com/gocql/gocql/ring.go index d5a8ad0052..856afae376 100644 --- a/vendor/github.com/gocql/gocql/ring.go +++ b/vendor/github.com/gocql/gocql/ring.go @@ -64,6 +64,8 @@ func (r *ring) currentHosts() map[string]*HostInfo { } func (r *ring) addHost(host *HostInfo) bool { + // TODO(zariel): key all host info by HostID instead of + // ip addresses if host.invalidConnectAddr() { panic(fmt.Sprintf("invalid host: %v", host)) } @@ -140,8 +142,8 @@ type clusterMetadata struct { } func (c *clusterMetadata) setPartitioner(partitioner string) { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() if c.partitioner != partitioner { // TODO: update other things now diff --git a/vendor/github.com/gocql/gocql/session.go b/vendor/github.com/gocql/gocql/session.go index 74a0a2e746..7fafff233f 100644 --- a/vendor/github.com/gocql/gocql/session.go +++ b/vendor/github.com/gocql/gocql/session.go @@ -78,7 +78,7 @@ var queryPool = &sync.Pool{ func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) { var hosts []*HostInfo for _, hostport := range addrs { - host, err := hostInfo(hostport, defaultPort) + resolvedHosts, err := hostInfo(hostport, defaultPort) if err != nil { // Try other hosts if unable to resolve DNS name if _, ok := err.(*net.DNSError); ok { @@ -88,7 +88,7 @@ func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) { return nil, err } - hosts = append(hosts, host) + hosts = append(hosts, resolvedHosts...) } if len(hosts) == 0 { return nil, errors.New("failed to resolve any of the provided hostnames") @@ -160,6 +160,7 @@ func (s *Session) init() error { if err != nil { return err } + s.ring.endpoints = hosts if !s.cfg.disableControlConn { s.control = createControlConn(s) @@ -182,17 +183,29 @@ func (s *Session) init() error { if !s.cfg.DisableInitialHostLookup { var partitioner string - hosts, partitioner, err = s.hostSource.GetHosts() + newHosts, partitioner, err := s.hostSource.GetHosts() if err != nil { return err } s.policy.SetPartitioner(partitioner) + filteredHosts := make([]*HostInfo, 0, len(newHosts)) + for _, host := range newHosts { + if !s.cfg.filterHost(host) { + filteredHosts = append(filteredHosts, host) + } + } + hosts = filteredHosts } } + hostMap := make(map[string]*HostInfo, len(hosts)) for _, host := range hosts { + hostMap[host.ConnectAddress().String()] = host + } + + for _, host := range hostMap { host = s.ring.addOrUpdate(host) - s.handleNodeUp(host.ConnectAddress(), host.Port(), false) + s.addNewNode(host) } // TODO(zariel): we probably dont need this any more as we verify that we @@ -210,7 +223,8 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - s.useSystemSchema = hosts[0].Version().Major >= 3 + host := s.ring.rrHost() + s.useSystemSchema = host.Version().Major >= 3 } if s.pool.Size() == 0 { @@ -639,7 +653,7 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) } func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) { - return Connect(host, s.connCfg, errorHandler, s) + return s.dial(host.ConnectAddress(), host.Port(), s.connCfg, errorHandler) } // Query represents a CQL statement that can be executed. @@ -1052,8 +1066,21 @@ func (iter *Iter) Columns() []ColumnInfo { } type Scanner interface { + // Next advances the row pointer to point at the next row, the row is valid until + // the next call of Next. It returns true if there is a row which is available to be + // scanned into with Scan. + // Next must be called before every call to Scan. Next() bool + + // Scan copies the current row's columns into dest. If the length of dest does not equal + // the number of columns returned in the row an error is returned. If an error is encountered + // when unmarshalling a column into the value in dest an error is returned and the row is invalidated + // until the next call to Next. + // Next must be called before calling Scan, if it is not an error is returned. Scan(...interface{}) error + + // Err returns the if there was one during iteration that resulted in iteration being unable to complete. + // Err will also release resources held by the iterator, the Scanner should not used after being called. Err() error } @@ -1062,10 +1089,6 @@ type iterScanner struct { cols [][]byte } -// Next advances the row pointer to point at the next row, the row is valid until -// the next call of Next. It returns true if there is a row which is available to be -// scanned into with Scan. -// Next must be called before every call to Scan. func (is *iterScanner) Next() bool { iter := is.iter if iter.err != nil { @@ -1119,11 +1142,6 @@ func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) { } } -// Scan copies the current row's columns into dest. If the length of dest does not equal -// the number of columns returned in the row an error is returned. If an error is encountered -// when unmarshalling a column into the value in dest an error is returned and the row is invalidated -// until the next call to Next. -// Next must be called before calling Scan, if it is not an error is returned. func (is *iterScanner) Scan(dest ...interface{}) error { if is.cols == nil { return errors.New("gocql: Scan called without calling Next") @@ -1154,8 +1172,6 @@ func (is *iterScanner) Scan(dest ...interface{}) error { return err } -// Err returns the if there was one during iteration that resulted in iteration being unable to complete. -// Err will also release resources held by the iterator and should not used after being called. func (is *iterScanner) Err() error { iter := is.iter is.iter = nil @@ -1299,11 +1315,17 @@ type nextIter struct { pos int once sync.Once next *Iter + conn *Conn } func (n *nextIter) fetch() *Iter { n.once.Do(func() { - n.next = n.qry.session.executeQuery(&n.qry) + iter := n.qry.session.executor.attemptQuery(&n.qry, n.conn) + if iter != nil && iter.err == nil { + n.next = iter + } else { + n.next = n.qry.session.executeQuery(&n.qry) + } }) return n.next } From d59b58cdf17a05c3d0ddb04b7a9c7a4b370511be Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 7 Dec 2017 14:48:25 +0800 Subject: [PATCH 2/4] use a context.WithTimeout when writing data to cassandra --- mdata/store_cassandra.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 9c15773182..e009f9b3ff 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -101,6 +101,7 @@ type CassandraStore struct { ttlTables TTLTables omitReadTimeout time.Duration tracer opentracing.Tracer + timeout time.Duration } func ttlUnits(ttl uint32) float64 { @@ -290,6 +291,7 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, omitReadTimeout: time.Duration(omitReadTimeout) * time.Second, ttlTables: ttlTables, tracer: opentracing.NoopTracer{}, + timeout: cluster.Timeout, } for i := 0; i < writers; i++ { @@ -398,7 +400,9 @@ func (c *CassandraStore) insertChunk(key string, t0, ttl uint32, data []byte) er query := fmt.Sprintf("INSERT INTO %s (key, ts, data) values(?,?,?) USING TTL %d", table, ttl) row_key := fmt.Sprintf("%s_%d", key, t0/Month_sec) // "month number" based on unix timestamp (rounded down) pre := time.Now() - ret := c.Session.Query(query, row_key, t0, data).Exec() + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + ret := c.Session.Query(query, row_key, t0, data).WithContext(ctx).Exec() + cancel() cassPutExecDuration.Value(time.Now().Sub(pre)) return ret } From 572140a65902ca5072cf9c961ec52936a2509475 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 7 Dec 2017 14:49:15 +0800 Subject: [PATCH 3/4] pass the request context to the gocql query. Also return an error when a query fails rather then just silently ignoring it. --- mdata/store_cassandra.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index e009f9b3ff..45ff23a9e2 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -441,7 +441,7 @@ func (c *CassandraStore) processReadQueue() { iter := outcome{ month: crr.month, sortKey: crr.sortKey, - i: c.Session.Query(crr.q, crr.p...).Iter(), + i: c.Session.Query(crr.q, crr.p...).WithContext(crr.ctx).Iter(), err: nil, } cassGetExecDuration.Value(time.Since(pre)) @@ -588,9 +588,14 @@ LOOP: } err := outcome.i.Close() if err != nil { + if err == context.Canceled || err == context.DeadlineExceeded { + // query was aborted. + return nil, nil + } tracing.Failure(span) tracing.Error(span, err) errmetrics.Inc(err) + return nil, err } else { cassChunksPerRow.Value(int(chunks)) } From be9f75beedbb399a00ad71ee0cb7b3675781faa8 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Mon, 1 Jan 2018 08:47:13 +0800 Subject: [PATCH 4/4] update gocql --- .../github.com/gocql/gocql/host_source_gen.go | 45 +++++++++++++++++++ .../gocql/internal/murmur/murmur_unsafe.go | 15 +++++++ 2 files changed, 60 insertions(+) create mode 100644 vendor/github.com/gocql/gocql/host_source_gen.go create mode 100644 vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go diff --git a/vendor/github.com/gocql/gocql/host_source_gen.go b/vendor/github.com/gocql/gocql/host_source_gen.go new file mode 100644 index 0000000000..c82193cbd4 --- /dev/null +++ b/vendor/github.com/gocql/gocql/host_source_gen.go @@ -0,0 +1,45 @@ +// +build genhostinfo + +package main + +import ( + "fmt" + "reflect" + "sync" + + "github.com/gocql/gocql" +) + +func gen(clause, field string) { + fmt.Printf("if h.%s == %s {\n", field, clause) + fmt.Printf("\th.%s = from.%s\n", field, field) + fmt.Println("}") +} + +func main() { + t := reflect.ValueOf(&gocql.HostInfo{}).Elem().Type() + mu := reflect.TypeOf(sync.RWMutex{}) + + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + if f.Type == mu { + continue + } + + switch f.Type.Kind() { + case reflect.Slice: + gen("nil", f.Name) + case reflect.String: + gen(`""`, f.Name) + case reflect.Int: + gen("0", f.Name) + case reflect.Struct: + gen("("+f.Type.Name()+"{})", f.Name) + case reflect.Bool, reflect.Int32: + continue + default: + panic(fmt.Sprintf("unknown field: %s", f)) + } + } + +} diff --git a/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go b/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go new file mode 100644 index 0000000000..501537c77e --- /dev/null +++ b/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go @@ -0,0 +1,15 @@ +// +build !appengine + +package murmur + +import ( + "unsafe" +) + +func getBlock(data []byte, n int) (int64, int64) { + block := (*[2]int64)(unsafe.Pointer(&data[n*16])) + + k1 := block[0] + k2 := block[1] + return k1, k2 +}