diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 9671b92ad4..0692dfdad7 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -393,8 +393,7 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu } // GetFlushableSegments get segment ids with Sealed State and flushable (meets flushPolicy) -func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, - t Timestamp) ([]UniqueID, error) { +func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, t Timestamp) ([]UniqueID, error) { s.mu.Lock() defer s.mu.Unlock() sp, _ := trace.StartSpanFromContext(ctx) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c3ad5c5902..a48aad24a0 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -20,12 +20,13 @@ import ( "sync/atomic" "time" - "github.com/milvus-io/milvus/internal/rootcoord" - "github.com/milvus-io/milvus/internal/util/metricsinfo" - datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/logutil" + "github.com/milvus-io/milvus/internal/rootcoord" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/mqclient" + "github.com/milvus-io/milvus/internal/util/tsoutil" "go.uber.org/zap" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -360,8 +361,8 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { log.Error("new msg stream failed", zap.Error(err)) return } - ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, - Params.DataCoordSubscriptionName) + ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName}, + Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest) log.Debug("dataCoord create time tick channel consumer", zap.String("timeTickChannelName", Params.TimeTickChannelName), zap.String("subscriptionName", Params.DataCoordSubscriptionName)) @@ -403,6 +404,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { log.Warn("failed to expire allocations", zap.Error(err)) continue } + physical, _ := tsoutil.ParseTS(ts) + if time.Since(physical).Minutes() > 1 { + // if lag behind, log every 1 mins about + log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical)) + } segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts) if err != nil { log.Warn("get flushable segments failed", zap.Error(err)) diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index fa0ebc789e..d673d64e4f 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/mqclient" "github.com/stretchr/testify/assert" ) @@ -60,8 +61,10 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack { return make(chan *msgstream.MsgPack, 100) } -func (mtm *mockTtMsgStream) AsProducer(channels []string) {} -func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {} +func (mtm *mockTtMsgStream) AsProducer(channels []string) {} +func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {} +func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) { +} func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {} func (mtm *mockTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { return make([][]int32, 0) diff --git a/internal/log/global.go b/internal/log/global.go index a33b478e6d..34a5cdcf29 100644 --- a/internal/log/global.go +++ b/internal/log/global.go @@ -59,6 +59,39 @@ func Fatal(msg string, fields ...zap.Field) { L().WithOptions(zap.AddCallerSkip(1)).Fatal(msg, fields...) } +// RatedDebug print logs at debug level +// it limit log print to avoid too many logs +// return true if log successfully +func RatedDebug(cost float64, msg string, fields ...zap.Field) bool { + if R().CheckCredit(cost) { + L().WithOptions(zap.AddCallerSkip(1)).Debug(msg, fields...) + return true + } + return false +} + +// RatedInfo print logs at debug level +// it limit log print to avoid too many logs +// return true if log successfully +func RatedInfo(cost float64, msg string, fields ...zap.Field) bool { + if R().CheckCredit(cost) { + L().WithOptions(zap.AddCallerSkip(1)).Info(msg, fields...) + return true + } + return false +} + +// RatedWarn print logs at debug level +// it limit log print to avoid too many logs +// return true if log successfully +func RatedWarn(cost float64, msg string, fields ...zap.Field) bool { + if R().CheckCredit(cost) { + L().WithOptions(zap.AddCallerSkip(1)).Warn(msg, fields...) + return true + } + return false +} + // With creates a child logger and adds structured context to it. // Fields added to the child don't affect the parent, and vice versa. func With(fields ...zap.Field) *zap.Logger { diff --git a/internal/log/log.go b/internal/log/log.go index face2ac80e..48fea383de 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -31,21 +31,25 @@ import ( "errors" + "github.com/uber/jaeger-client-go/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" lumberjack "gopkg.in/natefinch/lumberjack.v2" ) -var _globalL, _globalP, _globalS atomic.Value +var _globalL, _globalP, _globalS, _globalR atomic.Value +var rateLimiter *utils.ReconfigurableRateLimiter func init() { l, p := newStdLogger() _globalL.Store(l) _globalP.Store(p) - s := _globalL.Load().(*zap.Logger).Sugar() _globalS.Store(s) + + r := utils.NewRateLimiter(1.0, 60.0) + _globalR.Store(r) } // InitLogger initializes a zap logger. @@ -136,6 +140,10 @@ func S() *zap.SugaredLogger { return _globalS.Load().(*zap.SugaredLogger) } +func R() *utils.ReconfigurableRateLimiter { + return _globalR.Load().(*utils.ReconfigurableRateLimiter) +} + // ReplaceGlobals replaces the global Logger and SugaredLogger. // It's safe for concurrent use. func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) { diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 3225a67c54..fb655da900 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -31,6 +31,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -128,3 +129,42 @@ func TestSampling(t *testing.T) { } } } + +func TestRatedLog(t *testing.T) { + ts := newTestLogSpy(t) + conf := &Config{Level: "debug", DisableTimestamp: true} + logger, p, _ := InitTestLogger(ts, conf) + ReplaceGlobals(logger, p) + + time.Sleep(time.Duration(1) * time.Second) + success := RatedDebug(1.0, "test") + assert.True(t, success) + + time.Sleep(time.Duration(1) * time.Second) + success = RatedInfo(1.0, "test") + assert.True(t, success) + + time.Sleep(time.Duration(1) * time.Second) + success = RatedWarn(1.0, "test") + assert.True(t, success) + + time.Sleep(time.Duration(1) * time.Second) + success = RatedInfo(100.0, "test") + assert.False(t, success) + + successNum := 0 + for i := 0; i < 1000; i++ { + if RatedInfo(1.0, "test") { + successNum++ + } + time.Sleep(time.Duration(1) * time.Millisecond) + } + // due to the rate limit, not all + assert.True(t, successNum < 1000) + assert.True(t, successNum > 10) + + time.Sleep(time.Duration(3) * time.Second) + success = RatedInfo(3.0, "test") + assert.True(t, success) + Sync() +} diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 2834afaf05..573c4a3870 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -116,6 +116,11 @@ func (ms *mqMsgStream) AsProducer(channels []string) { // Create consumer to receive message from channels func (ms *mqMsgStream) AsConsumer(channels []string, subName string) { + ms.AsConsumerWithPosition(channels, subName, mqclient.SubscriptionPositionEarliest) +} + +// Create consumer to receive message from channels, with initial position +func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) { for _, channel := range channels { if _, ok := ms.consumers[channel]; ok { continue @@ -126,7 +131,7 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) { Topic: channel, SubscriptionName: subName, Type: mqclient.KeyShared, - SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, + SubscriptionInitialPosition: position, MessageChannel: receiveChannel, }) if err != nil { @@ -597,6 +602,10 @@ func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string) // AsConsumer subscribes channels as consumer for a MsgStream func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) { + ms.AsConsumerWithPosition(channels, subName, mqclient.SubscriptionPositionEarliest) +} + +func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) { for _, channel := range channels { if _, ok := ms.consumers[channel]; ok { continue @@ -607,7 +616,7 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) { Topic: channel, SubscriptionName: subName, Type: mqclient.KeyShared, - SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, + SubscriptionInitialPosition: position, MessageChannel: receiveChannel, }) if err != nil { diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 114b76d49c..3c1fb15422 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -53,6 +53,7 @@ type MsgStream interface { Chan() <-chan *MsgPack AsProducer(channels []string) AsConsumer(channels []string, subName string) + AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) SetRepackFunc(repackFunc RepackFunc) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 GetProduceChannels() []string diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 80cbbc6c2b..1e4fd5c766 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -17,17 +17,13 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/proto/schemapb" - "github.com/milvus-io/milvus/internal/msgstream" - - "github.com/milvus-io/milvus/internal/util/funcutil" - - "github.com/milvus-io/milvus/internal/util/uniquegenerator" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/mqclient" + "github.com/milvus-io/milvus/internal/util/uniquegenerator" ) type mockTimestampAllocatorInterface struct { @@ -276,6 +272,9 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) { func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) { } +func (ms *simpleMockMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) { +} + func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { if len(tsMsgs) <= 0 { return nil