diff --git a/api/v2/api.go b/api/v2/api.go index 8ce9b5920..fcb0f7f02 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -37,6 +37,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { v2.Use(middleware.ErrorHandleMiddleware()) v2.GET("status", api.serverStatus) + v2.POST("/log", api.setLogLevel) // For compatibility with the old API. // TiDB Operator relies on this API to determine whether the TiCDC node is healthy. router.GET("/status", api.serverStatus) diff --git a/api/v2/log.go b/api/v2/log.go new file mode 100644 index 000000000..7e1fb047a --- /dev/null +++ b/api/v2/log.go @@ -0,0 +1,52 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/logutil" + "go.uber.org/zap" +) + +// SetLogLevel changes TiCDC log level dynamically. +// @Summary Change TiCDC log level +// @Description change TiCDC log level dynamically +// @Tags common,v2 +// @Accept json +// @Produce json +// @Param log_level body LogLevelReq true "log level" +// @Success 200 {object} EmptyResponse +// @Failure 400 {object} model.HTTPError +// @Router /api/v2/log [post] +func (h *OpenAPIV2) setLogLevel(c *gin.Context) { + req := &LogLevelReq{Level: "info"} + err := c.BindJSON(&req) + if err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid log level: %s", err.Error())) + return + } + + err = logutil.SetLogLevel(req.Level) + if err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack( + "fail to change log level: %s", req.Level)) + return + } + log.Warn("log level changed", zap.String("level", req.Level)) + c.JSON(http.StatusOK, &EmptyResponse{}) +} diff --git a/maintainer/barrier.go b/maintainer/barrier.go index 807ed3806..49cb91248 100644 --- a/maintainer/barrier.go +++ b/maintainer/barrier.go @@ -60,7 +60,7 @@ func (b *Barrier) HandleStatus(from node.ID, request *heartbeatpb.BlockStatusRequest) *messaging.TargetMessage { log.Debug("handle block status", zap.String("from", from.String()), zap.String("changefeed", request.ChangefeedID.GetName()), - zap.String("detail", request.String())) + zap.Any("detail", request)) eventMap := make(map[*BarrierEvent][]*heartbeatpb.DispatcherID) var dispatcherStatus []*heartbeatpb.DispatcherStatus for _, status := range request.BlockStatuses { diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 979eac718..0f2b214b6 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -109,7 +109,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, log.Info("new block event is created", zap.String("changefeedID", cfID.Name()), zap.Uint64("block-ts", event.commitTs), - zap.Bool("sync-point", event.isSyncPoint)) + zap.Bool("sync-point", event.isSyncPoint), + zap.Any("detail", status)) return event } @@ -276,17 +277,22 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage { if time.Since(be.lastResendTime) < time.Second { return nil } - if time.Since(be.lastWarningLogTime) > time.Second*10 { - log.Warn("barrier event is not resolved", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Bool("isSyncPoint", be.isSyncPoint), - zap.Bool("selected", be.selected), - zap.Bool("writerDispatcherAdvanced", be.writerDispatcherAdvanced), - zap.String("coverage", be.rangeChecker.Detail()), - ) - be.lastWarningLogTime = time.Now() - } + var msgs []*messaging.TargetMessage + defer func() { + if time.Since(be.lastWarningLogTime) > time.Second*10 { + log.Warn("barrier event is not resolved", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Bool("isSyncPoint", be.isSyncPoint), + zap.Bool("selected", be.selected), + zap.Bool("writerDispatcherAdvanced", be.writerDispatcherAdvanced), + zap.String("coverage", be.rangeChecker.Detail()), + zap.Any("resend", msgs), + ) + be.lastWarningLogTime = time.Now() + } + }() + // still waiting for all dispatcher to reach the block commit ts if !be.selected { return nil @@ -304,10 +310,12 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage { // todo: select a new writer return nil } - return []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID())} + msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID())} + } else { + // the writer dispatcher is advanced, resend pass action + msgs = be.sendPassAction() } - // the writer dispatcher is advanced, resend pass action - return be.sendPassAction() + return msgs } func (be *BarrierEvent) newWriterActionMessage(capture node.ID) *messaging.TargetMessage { diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index 1013fab81..7fb546820 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -143,6 +143,7 @@ func (r *SpanReplication) NewAddDispatcherMessage(server node.ID) (*messaging.Ta ChangefeedID: r.ChangefeedID.ToPB(), Config: &heartbeatpb.DispatcherConfig{ DispatcherID: r.ID.ToPB(), + SchemaID: r.schemaID, Span: r.Span, StartTs: r.status.Load().CheckpointTs, CurrentPdTs: ts, diff --git a/maintainer/replica/replication_span_test.go b/maintainer/replica/replication_span_test.go index 9c0f892ab..981459a32 100644 --- a/maintainer/replica/replication_span_test.go +++ b/maintainer/replica/replication_span_test.go @@ -55,6 +55,7 @@ func TestSpanReplication_NewAddDispatcherMessage(t *testing.T) { require.Equal(t, heartbeatpb.ScheduleAction_Create, req.ScheduleAction) require.Equal(t, oracle.ComposeTS(10, 1), req.Config.CurrentPdTs) require.Equal(t, replicaSet.ID.ToPB(), req.Config.DispatcherID) + require.Equal(t, replicaSet.schemaID, req.Config.SchemaID) tsoClient.EXPECT().GetTS(gomock.Any()).Return(int64(1), int64(1), errors.New("error")).AnyTimes() _, err = replicaSet.NewAddDispatcherMessage("node1")