Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

fix deadlock in processWriteQueue #778

Merged
merged 4 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions mdata/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type CassandraStore struct {
ttlTables TTLTables
omitReadTimeout time.Duration
tracer opentracing.Tracer
timeout time.Duration
}

func ttlUnits(ttl uint32) float64 {
Expand Down Expand Up @@ -290,6 +291,7 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password,
omitReadTimeout: time.Duration(omitReadTimeout) * time.Second,
ttlTables: ttlTables,
tracer: opentracing.NoopTracer{},
timeout: cluster.Timeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that we default to 1000 ms, this seems too aggressive. looking at some customer dashboards, some customers are seeing multi-second query durations (cassGetExecDuration)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this timeout is only used for writes. reads just use the existing requestContext, which we dont currently set a timeout on and instead just rely on the context being canceled, eg when the client disconnects.

  2. queries will fail if they run longer then cluster.Timeout.

    var timeoutCh <-chan time.Time
    if c.timeout > 0 {
    if call.timer == nil {
    call.timer = time.NewTimer(0)
    <-call.timer.C
    } else {
    if !call.timer.Stop() {
    select {
    case <-call.timer.C:
    default:
    }
    }
    }
    call.timer.Reset(c.timeout)
    timeoutCh = call.timer.C
    }
    var ctxDone <-chan struct{}
    if ctx != nil {
    ctxDone = ctx.Done()
    }
    select {
    case err := <-call.resp:
    close(call.timeout)
    if err != nil {
    if !c.Closed() {
    // if the connection is closed then we cant release the stream,
    // this is because the request is still outstanding and we have
    // been handed another error from another stream which caused the
    // connection to close.
    c.releaseStream(stream)
    }
    return nil, err
    }
    case <-timeoutCh:
    close(call.timeout)
    c.handleTimeout()
    return nil, ErrTimeoutNoResponse
    case <-ctxDone:
    close(call.timeout)
    return nil, ctx.Err()
    case <-c.quit:
    return nil, ErrConnectionClosed
    }

So if we have queries that are running longer then this, then it will be because they are blocked waiting for a stream to become available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ah yes..
  2. in insertChunk we call context.WithTimeout(context.Background(), c.timeout)
    doesn't that mean that in the select either of the 2 operations (between lines 648 and 655 in the code shown above) can succeed at practically the same time?

one comes from the timeout set from c.timeout which is the same timeout value we specified in the config, whereas the ctxDone fires when the context's deadline triggers, which is a deadline we set also based on the same timeout?

IOW i don't see the use for the context on the insert queries since they seem to achieve the same as the already setup timeout mechanism. am i missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.timeout is only measured from after the request is sent to cassandra.
the context.Deadline is set when the request is constructed, so includes the time the request sits waiting to be sent.

Copy link
Contributor

@Dieterbe Dieterbe Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename this field to writeTimeout to avoid confusion, and a note explaining what you said in your above comment would also help a lot to anyone reading the code.

Copy link
Member Author

@woodsaj woodsaj Jan 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename this field to writeTimeout to avoid confusion

No. CassandraStore.timeout is assigned the same value as the gocql cluster.Timeout, which is for reads and writes.

Variables should be named based on the information they contain, not on how they are used.

}

for i := 0; i < writers; i++ {
Expand Down Expand Up @@ -398,7 +400,9 @@ func (c *CassandraStore) insertChunk(key string, t0, ttl uint32, data []byte) er
query := fmt.Sprintf("INSERT INTO %s (key, ts, data) values(?,?,?) USING TTL %d", table, ttl)
row_key := fmt.Sprintf("%s_%d", key, t0/Month_sec) // "month number" based on unix timestamp (rounded down)
pre := time.Now()
ret := c.Session.Query(query, row_key, t0, data).Exec()
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
ret := c.Session.Query(query, row_key, t0, data).WithContext(ctx).Exec()
cancel()
cassPutExecDuration.Value(time.Now().Sub(pre))
return ret
}
Expand Down Expand Up @@ -437,7 +441,7 @@ func (c *CassandraStore) processReadQueue() {
iter := outcome{
month: crr.month,
sortKey: crr.sortKey,
i: c.Session.Query(crr.q, crr.p...).Iter(),
i: c.Session.Query(crr.q, crr.p...).WithContext(crr.ctx).Iter(),
err: nil,
}
cassGetExecDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -584,9 +588,14 @@ LOOP:
}
err := outcome.i.Close()
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
// query was aborted.
return nil, nil
}
tracing.Failure(span)
tracing.Error(span, err)
errmetrics.Inc(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line triggers this code:

func (m *ErrMetrics) Inc(err error) {
	if err == gocql.ErrTimeoutNoResponse {
		m.cassErrTimeout.Inc()
	} else if err == gocql.ErrTooManyTimeouts {
		m.cassErrTooManyTimeouts.Inc()
	} else if err == gocql.ErrConnectionClosed {
		m.cassErrConnClosed.Inc()
	} else if err == gocql.ErrNoConnections {
		m.cassErrNoConns.Inc()
	} else if err == gocql.ErrUnavailable {
		m.cassErrUnavailable.Inc()
	} else if strings.HasPrefix(err.Error(), "Cannot achieve consistency level") {
		m.cassErrCannotAchieveConsistency.Inc()
	} else {
		m.cassErrOther.Inc()
	}
}

seems weird that various gocql timeouts result in err metrics being incremented, whereas context.DeadlineExceeded does not. I think we should also increment a timeout metric when context.DeadlineExceeded triggers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store_cassandra.go does not set a context Timeout on the read path.

So if context.Canceled or context.DeadlineExceeded is returned, it is because the caller wanted us to give up on the request. There was no issue preventing us from executing the request.

currently, we dont set a deadline on the request context anywhere, so context.DeadlineExceeded will never be returned. It is just here for completeness incase we oneday add a deadline to the context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't we want context.DeadlineExceeded to mark the spans as failed and the ErrMetrics to be incremented? it seems to me any such deadline being hit is a failure that should be reported

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If DeadlineExceeded is reached it is because the caller set a timeout and they should emit their own error metrics and set the span to failed.

The error would have nothing to do with cassandra, so you should not increment the cassandraErr counters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for better or worse, it used to be by design that we would continue here and just use as much data as we could get back. remember the days before @replay started, and we would see charts with missing data here and there because certain chunks timed out? we would see the timeout errors on our MT dashboard, and MT always tried to make the most out of it.
anyway the importance of "correct or error" (as opposed to "best effort") is become more and more clear as we see with some customers and their alerting queries. so 👍

} else {
cassChunksPerRow.Value(int(chunks))
}
Expand Down
6 changes: 6 additions & 0 deletions vendor/github.com/gocql/gocql/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/gocql/gocql/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/github.com/gocql/gocql/cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 71 additions & 45 deletions vendor/github.com/gocql/gocql/conn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading