Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Storehost: fail RPC calls until ready (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiran RG authored Jun 30, 2017
1 parent 235afc9 commit 7290aed
Showing 1 changed file with 64 additions and 3 deletions.
67 changes: 64 additions & 3 deletions services/storehost/storehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type (

// status of the node
nodeStatus atomic.Value

// started?
started int32
}
)

Expand Down Expand Up @@ -299,12 +302,17 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) {
t.extStatsReporter = NewExtStatsReporter(hostID, t.xMgr, t.mClient, t.logger)
t.extStatsReporter.Start()

atomic.StoreInt32(&t.started, 1) // started

t.logger.WithField("options", fmt.Sprintf("Store=%v BaseDir=%v", t.opts.Store, t.opts.BaseDir)).
Info("StoreHost: started")
}

// Stop stops the service
func (t *StoreHost) Stop() {

atomic.StoreInt32(&t.started, 0) // stopped

t.loadReporter.Stop()
t.hostIDHeartbeater.Stop()
t.storageMonitor.Stop()
Expand Down Expand Up @@ -349,6 +357,7 @@ func getInConnArgs(ctx thrift.Context) (args *inConnArgs, err error) {

// OpenAppendStreamHandler is websocket handler for opening write stream
func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Request) {

req, err := common.GetOpenAppendStreamRequestHTTP(r.Header)
if err != nil {
t.logger.WithField(`error`, err).Error("unable to parse all needed headers")
Expand Down Expand Up @@ -382,11 +391,17 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore

t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost not started")
}

// If the disk available space is low, we should fail any request to write extent
if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
return newInternalServiceError("Storage avaialble space is extremely low, enters read only mode")
return newInternalServiceError("StoreHost in read-only mode")
}

// read in args passed in via the Thrift context headers
Expand Down Expand Up @@ -518,6 +533,12 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp

t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost not started")
}

// read in args passed in via the Thrift context headers
args, e := getOutConnArgs(ctx)

Expand Down Expand Up @@ -580,8 +601,14 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp

// GetAddressFromTimestamp is the implementation of the thrift handler for store host
func (t *StoreHost) GetAddressFromTimestamp(ctx thrift.Context, req *store.GetAddressFromTimestampRequest) (res *store.GetAddressFromTimestampResult_, err error) {

t.m3Client.IncCounter(metrics.GetAddressFromTimestampScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
t.m3Client.IncCounter(metrics.GetAddressFromTimestampScope, metrics.StorageFailures)
return nil, newInternalServiceError("StoreHost not started")
}

sw := t.m3Client.StartTimer(metrics.GetAddressFromTimestampScope, metrics.StorageLatencyTimer)
defer sw.Stop()

Expand Down Expand Up @@ -668,6 +695,11 @@ func (t *StoreHost) SealExtent(ctx thrift.Context, req *store.SealExtentRequest)

t.m3Client.IncCounter(metrics.SealExtentScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
t.m3Client.IncCounter(metrics.SealExtentScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost not started")
}

var sealSeqNum, unspecifiedSealSeqNum = seqNumUnspecifiedSeal, true

if req.IsSetSequenceNumber() {
Expand Down Expand Up @@ -847,6 +879,10 @@ func (t *StoreHost) SealExtent(ctx thrift.Context, req *store.SealExtentRequest)
// GetExtentInfo is the implementation of the thrift handler for the store host
func (t *StoreHost) GetExtentInfo(ctx thrift.Context, extReq *store.GetExtentInfoRequest) (*store.ExtentInfo, error) {

if atomic.LoadInt32(&t.started) == 0 {
return nil, newInternalServiceError("StoreHost not started")
}

sw := t.m3Client.StartTimer(metrics.GetExtentInfoScope, metrics.StorageLatencyTimer)
defer sw.Stop()
t.m3Client.IncCounter(metrics.GetExtentInfoScope, metrics.StorageRequests)
Expand Down Expand Up @@ -915,6 +951,11 @@ func (t *StoreHost) PurgeMessages(ctx thrift.Context, req *store.PurgeMessagesRe

t.m3Client.IncCounter(metrics.PurgeMessagesScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
t.m3Client.IncCounter(metrics.PurgeMessagesScope, metrics.StorageFailures)
return nil, newInternalServiceError("StoreHost not started")
}

sw := t.m3Client.StartTimer(metrics.PurgeMessagesScope, metrics.StorageLatencyTimer)
defer sw.Stop()

Expand Down Expand Up @@ -1055,8 +1096,12 @@ func (t *StoreHost) PurgeMessages(ctx thrift.Context, req *store.PurgeMessagesRe
}

// ReadMessages reads a set of messages start from the set StartAddress
func (t *StoreHost) ReadMessages(
ctx thrift.Context, req *store.ReadMessagesRequest) (result *store.ReadMessagesResult_, err error) {
func (t *StoreHost) ReadMessages(ctx thrift.Context, req *store.ReadMessagesRequest) (result *store.ReadMessagesResult_, err error) {

if atomic.LoadInt32(&t.started) == 0 {
t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures)
return nil, newInternalServiceError("StoreHost not started")
}

startAddr := req.GetStartAddress()
numRequested := int64(req.GetNumMessages())
Expand Down Expand Up @@ -1248,6 +1293,8 @@ readMsgsLoop:

// Shutdown storehost
func (t *StoreHost) Shutdown() {

atomic.StoreInt32(&t.started, 0) // shutdown
t.logger.Info("Storehost: shutting down")
close(t.shutdownC) // 'broadcast' shutdown, by closing the shutdownC
// wait until all connections have been closed
Expand Down Expand Up @@ -1422,6 +1469,10 @@ func getReplicationArgsFromRemoteReplicateRequest(req *store.RemoteReplicateExte
// ReplicateExtent creates a replica of the extent in the local store
func (t *StoreHost) ReplicateExtent(tCtx thrift.Context, req *store.ReplicateExtentRequest) (err error) {

if atomic.LoadInt32(&t.started) == 0 {
return newInternalServiceError("StoreHost not started")
}

log := t.logger.WithFields(bark.Fields{
common.TagDst: common.FmtExt(req.GetDestinationUUID()),
common.TagExt: common.FmtExt(req.GetExtentUUID()),
Expand Down Expand Up @@ -1503,6 +1554,11 @@ func (t *StoreHost) ReplicateExtent(tCtx thrift.Context, req *store.ReplicateExt

// RemoteReplicateExtent replicates a remote extent from replicator
func (t *StoreHost) RemoteReplicateExtent(tCtx thrift.Context, req *store.RemoteReplicateExtentRequest) (err error) {

if atomic.LoadInt32(&t.started) == 0 {
return newInternalServiceError("StoreHost not started")
}

log := t.logger.WithFields(bark.Fields{
common.TagDst: common.FmtExt(req.GetDestinationUUID()),
common.TagExt: common.FmtExt(req.GetExtentUUID()),
Expand Down Expand Up @@ -1568,6 +1624,11 @@ func (t *StoreHost) ListExtents(tCtx thrift.Context) (res *store.ListExtentsResu

t.m3Client.IncCounter(metrics.ListExtentsScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
t.m3Client.IncCounter(metrics.ListExtentsScope, metrics.StorageFailures)
return nil, newInternalServiceError("StoreHost not started")
}

sw := t.m3Client.StartTimer(metrics.ListExtentsScope, metrics.StorageLatencyTimer)
defer sw.Stop()

Expand Down

0 comments on commit 7290aed

Please sign in to comment.