From 7290aedd573d02946ea8f063e7dd46935e7b3c4b Mon Sep 17 00:00:00 2001 From: Kiran RG Date: Fri, 30 Jun 2017 11:33:22 -0700 Subject: [PATCH] Storehost: fail RPC calls until ready (#242) --- services/storehost/storehost.go | 67 +++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index bc368d39..930d5f05 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -206,6 +206,9 @@ type ( // status of the node nodeStatus atomic.Value + + // started? + started int32 } ) @@ -299,12 +302,17 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.extStatsReporter = NewExtStatsReporter(hostID, t.xMgr, t.mClient, t.logger) t.extStatsReporter.Start() + atomic.StoreInt32(&t.started, 1) // started + t.logger.WithField("options", fmt.Sprintf("Store=%v BaseDir=%v", t.opts.Store, t.opts.BaseDir)). Info("StoreHost: started") } // Stop stops the service func (t *StoreHost) Stop() { + + atomic.StoreInt32(&t.started, 0) // stopped + t.loadReporter.Stop() t.hostIDHeartbeater.Stop() t.storageMonitor.Stop() @@ -349,6 +357,7 @@ func getInConnArgs(ctx thrift.Context) (args *inConnArgs, err error) { // OpenAppendStreamHandler is websocket handler for opening write stream func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Request) { + req, err := common.GetOpenAppendStreamRequestHTTP(r.Header) if err != nil { t.logger.WithField(`error`, err).Error("unable to parse all needed headers") @@ -382,11 +391,17 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + return newInternalServiceError("StoreHost not started") + } + // If the disk available space is low, we should fail any request to write extent if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("Storage avaialble space is extremely low, enters read only mode") + return newInternalServiceError("StoreHost in read-only mode") } // read in args passed in via the Thrift context headers @@ -518,6 +533,12 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) + return newInternalServiceError("StoreHost not started") + } + // read in args passed in via the Thrift context headers args, e := getOutConnArgs(ctx) @@ -580,8 +601,14 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp // GetAddressFromTimestamp is the implementation of the thrift handler for store host func (t *StoreHost) GetAddressFromTimestamp(ctx thrift.Context, req *store.GetAddressFromTimestampRequest) (res *store.GetAddressFromTimestampResult_, err error) { + t.m3Client.IncCounter(metrics.GetAddressFromTimestampScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + t.m3Client.IncCounter(metrics.GetAddressFromTimestampScope, metrics.StorageFailures) + return nil, newInternalServiceError("StoreHost not started") + } + sw := t.m3Client.StartTimer(metrics.GetAddressFromTimestampScope, metrics.StorageLatencyTimer) defer sw.Stop() @@ -668,6 +695,11 @@ func (t *StoreHost) SealExtent(ctx thrift.Context, req *store.SealExtentRequest) t.m3Client.IncCounter(metrics.SealExtentScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + t.m3Client.IncCounter(metrics.SealExtentScope, metrics.StorageFailures) + return newInternalServiceError("StoreHost not started") + } + var sealSeqNum, unspecifiedSealSeqNum = seqNumUnspecifiedSeal, true if req.IsSetSequenceNumber() { @@ -847,6 +879,10 @@ func (t *StoreHost) SealExtent(ctx thrift.Context, req *store.SealExtentRequest) // GetExtentInfo is the implementation of the thrift handler for the store host func (t *StoreHost) GetExtentInfo(ctx thrift.Context, extReq *store.GetExtentInfoRequest) (*store.ExtentInfo, error) { + if atomic.LoadInt32(&t.started) == 0 { + return nil, newInternalServiceError("StoreHost not started") + } + sw := t.m3Client.StartTimer(metrics.GetExtentInfoScope, metrics.StorageLatencyTimer) defer sw.Stop() t.m3Client.IncCounter(metrics.GetExtentInfoScope, metrics.StorageRequests) @@ -915,6 +951,11 @@ func (t *StoreHost) PurgeMessages(ctx thrift.Context, req *store.PurgeMessagesRe t.m3Client.IncCounter(metrics.PurgeMessagesScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + t.m3Client.IncCounter(metrics.PurgeMessagesScope, metrics.StorageFailures) + return nil, newInternalServiceError("StoreHost not started") + } + sw := t.m3Client.StartTimer(metrics.PurgeMessagesScope, metrics.StorageLatencyTimer) defer sw.Stop() @@ -1055,8 +1096,12 @@ func (t *StoreHost) PurgeMessages(ctx thrift.Context, req *store.PurgeMessagesRe } // ReadMessages reads a set of messages start from the set StartAddress -func (t *StoreHost) ReadMessages( - ctx thrift.Context, req *store.ReadMessagesRequest) (result *store.ReadMessagesResult_, err error) { +func (t *StoreHost) ReadMessages(ctx thrift.Context, req *store.ReadMessagesRequest) (result *store.ReadMessagesResult_, err error) { + + if atomic.LoadInt32(&t.started) == 0 { + t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) + return nil, newInternalServiceError("StoreHost not started") + } startAddr := req.GetStartAddress() numRequested := int64(req.GetNumMessages()) @@ -1248,6 +1293,8 @@ readMsgsLoop: // Shutdown storehost func (t *StoreHost) Shutdown() { + + atomic.StoreInt32(&t.started, 0) // shutdown t.logger.Info("Storehost: shutting down") close(t.shutdownC) // 'broadcast' shutdown, by closing the shutdownC // wait until all connections have been closed @@ -1422,6 +1469,10 @@ func getReplicationArgsFromRemoteReplicateRequest(req *store.RemoteReplicateExte // ReplicateExtent creates a replica of the extent in the local store func (t *StoreHost) ReplicateExtent(tCtx thrift.Context, req *store.ReplicateExtentRequest) (err error) { + if atomic.LoadInt32(&t.started) == 0 { + return newInternalServiceError("StoreHost not started") + } + log := t.logger.WithFields(bark.Fields{ common.TagDst: common.FmtExt(req.GetDestinationUUID()), common.TagExt: common.FmtExt(req.GetExtentUUID()), @@ -1503,6 +1554,11 @@ func (t *StoreHost) ReplicateExtent(tCtx thrift.Context, req *store.ReplicateExt // RemoteReplicateExtent replicates a remote extent from replicator func (t *StoreHost) RemoteReplicateExtent(tCtx thrift.Context, req *store.RemoteReplicateExtentRequest) (err error) { + + if atomic.LoadInt32(&t.started) == 0 { + return newInternalServiceError("StoreHost not started") + } + log := t.logger.WithFields(bark.Fields{ common.TagDst: common.FmtExt(req.GetDestinationUUID()), common.TagExt: common.FmtExt(req.GetExtentUUID()), @@ -1568,6 +1624,11 @@ func (t *StoreHost) ListExtents(tCtx thrift.Context) (res *store.ListExtentsResu t.m3Client.IncCounter(metrics.ListExtentsScope, metrics.StorageRequests) + if atomic.LoadInt32(&t.started) == 0 { + t.m3Client.IncCounter(metrics.ListExtentsScope, metrics.StorageFailures) + return nil, newInternalServiceError("StoreHost not started") + } + sw := t.m3Client.StartTimer(metrics.ListExtentsScope, metrics.StorageLatencyTimer) defer sw.Stop()