diff --git a/internal/core/src/plan/PlanNode.h b/internal/core/src/plan/PlanNode.h index 6dd6754799..38138e1382 100644 --- a/internal/core/src/plan/PlanNode.h +++ b/internal/core/src/plan/PlanNode.h @@ -167,7 +167,7 @@ class FilterBitsNode : public PlanNode { std::string ToString() const override { - return fmt::format("FilterBitsNode:\n\t[filter_expr:{}]", + return fmt::format("FilterBitsNode:[filter_expr:{}]", filter_->ToString()); } @@ -228,7 +228,7 @@ class ElementFilterNode : public PlanNode { std::string ToString() const override { return fmt::format( - "ElementFilterNode:\n\t[struct_name:{}, element_filter:{}]", + "ElementFilterNode:[struct_name:{}, element_filter:{}]", struct_name_, element_filter_->ToString()); } @@ -285,7 +285,7 @@ class ElementFilterBitsNode : public PlanNode { std::string ToString() const override { return fmt::format( - "ElementFilterBitsNode:\n\t[struct_name:{}, element_filter:{}]", + "ElementFilterBitsNode:[struct_name:{}, element_filter:{}]", struct_name_, element_filter_->ToString()); } @@ -327,7 +327,7 @@ class MvccNode : public PlanNode { std::string ToString() const override { - return fmt::format("MvccNode:\n\t[source node:{}]", SourceToString()); + return fmt::format("MvccNode:[source_node:{}]", SourceToString()); } private: @@ -360,7 +360,7 @@ class RandomSampleNode : public PlanNode { std::string ToString() const override { - return fmt::format("RandomSampleNode:\n\t[factor:{}]", factor_); + return fmt::format("RandomSampleNode:[factor:{}]", factor_); } float @@ -398,7 +398,7 @@ class VectorSearchNode : public PlanNode { std::string ToString() const override { - return fmt::format("VectorSearchNode:\n\t[source node:{}]", + return fmt::format("VectorSearchNode:[source_node:{}]", SourceToString()); } @@ -430,8 +430,7 @@ class GroupByNode : public PlanNode { std::string ToString() const override { - return fmt::format("GroupByNode:\n\t[source node:{}]", - SourceToString()); + return fmt::format("GroupByNode:[source_node:{}]", SourceToString()); } private: @@ -463,7 +462,7 @@ class CountNode : public PlanNode { std::string ToString() const override { - return fmt::format("CountNode:\n\t[source node:{}]", SourceToString()); + return fmt::format("CountNode:[source_node:{}]", SourceToString()); } private: @@ -510,8 +509,7 @@ class RescoresNode : public PlanNode { std::string ToString() const override { - return fmt::format("RescoresNode:\n\t[source node:{}]", - SourceToString()); + return fmt::format("RescoresNode:[source_node:{}]", SourceToString()); } private: diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 09a07824fb..d8abbc8283 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -1118,7 +1118,6 @@ func (s *Server) ListIndexes(ctx context.Context, req *indexpb.ListIndexesReques UserIndexParams: index.UserIndexParams, } }) - log.Debug("List index success") return &indexpb.ListIndexesResponse{ Status: merr.Success(), IndexInfos: indexInfos, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 9cd8b87f3b..f40b7fe567 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -453,7 +453,6 @@ func GetSegmentsChanPart(m *meta, collectionID int64, filters ...SegmentFilter) for _, entry := range mDimEntry { result = append(result, entry) } - log.Ctx(context.TODO()).Debug("GetSegmentsChanPart", zap.Int("length", len(result))) return result } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8a45aba160..f13b59533f 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -836,7 +836,6 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), ) - log.Info("get recovery info request received") resp := &datapb.GetRecoveryInfoResponse{ Status: merr.Success(), } @@ -973,7 +972,6 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), ) - log.Info("get recovery info request received") resp := &datapb.GetRecoveryInfoResponseV2{ Status: merr.Success(), } diff --git a/internal/datacoord/task_update_external_collection.go b/internal/datacoord/task_update_external_collection.go index 67c2e67061..d792f5ec59 100644 --- a/internal/datacoord/task_update_external_collection.go +++ b/internal/datacoord/task_update_external_collection.go @@ -21,9 +21,9 @@ import ( "fmt" "time" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" globalTask "github.com/milvus-io/milvus/internal/datacoord/task" diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 00e58ee886..67fd5df080 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -347,16 +347,11 @@ func (m *ChannelDistManager) updateCollectionIndex() { func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica) *DmChannel { m.rwmutex.RLock() defer m.rwmutex.RUnlock() - logger := log.With(zap.String("Scope", "ChannelDistManager"), zap.String("channelName", channelName), - zap.Int64("replicaID", replica.GetID())) + var setReason string channels := m.collectionIndex[replica.GetCollectionID()] var candidates *DmChannel - for chIdx, channel := range channels { - logger := logger.With(zap.Int("channelIdx", chIdx)) - logger.Debug("process", zap.Int64("channelID", channel.Node), zap.Int64("channelVersion", channel.Version), - zap.String("channel name", channel.GetChannelName()), - zap.Bool("replicaContains", replica.Contains(channel.Node))) + for _, channel := range channels { if channel.GetChannelName() == channelName && replica.Contains(channel.Node) { if candidates == nil { candidates = channel @@ -370,26 +365,34 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica case !candidatesServiceable && channelServiceable: // Current candidate is not serviceable but new channel is updateNeeded = true - logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node), - zap.Int64("channel version", channel.Version)) + setReason = "serviceable" case candidatesServiceable == channelServiceable && channel.Version > candidates.Version: // Same service status but higher version updateNeeded = true - logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node), - zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version)) + setReason = "version_updated" } if updateNeeded { candidates = channel - } else { - logger.Debug("not set any channel to candidates in this round") } } } } - if candidates != nil { - logger.Debug("final", zap.Any("candidates", candidates), - zap.Int64("candidates version", candidates.Version), - zap.Int64("candidates node", candidates.Node)) + if log.Level().Enabled(zap.DebugLevel) { + logger := log.With( + zap.String("Scope", "ChannelDistManager"), + zap.String("channelName", channelName), + zap.Int64("replicaID", replica.GetID()), + ).WithRateGroup("ChannelDistManager", 1.0, 60.0) + if candidates != nil { + logger.RatedDebug(1.0, "final", + zap.String("candidates", candidates.GetChannelName()), + zap.Int64("candidates version", candidates.Version), + zap.Int64("candidates node", candidates.Node), + zap.String("reason", setReason), + ) + } else { + logger.RatedDebug(1.0, "no candidates found") + } } return candidates } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 9cc0ffac85..524e1fa20c 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -102,8 +102,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, collectionID int64) bool { log := log.With(zap.Int64("collectionID", collectionID)) - log.Debug("start to update current target for collection") - newTarget := mgr.next.getCollectionTarget(collectionID) if newTarget == nil || newTarget.IsEmpty() { log.Info("next target does not exist, skip it") diff --git a/internal/querynodev2/delegator/exclude_info.go b/internal/querynodev2/delegator/exclude_info.go index dcafdc86fd..2bf12f4054 100644 --- a/internal/querynodev2/delegator/exclude_info.go +++ b/internal/querynodev2/delegator/exclude_info.go @@ -17,7 +17,6 @@ package delegator import ( - "context" "sync" "time" @@ -42,18 +41,34 @@ func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments { } func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) { + if log.Level().Enabled(zap.DebugLevel) { + defer func() { + s.logExcludeInfo(excludeInfo) + }() + } + s.mu.Lock() defer s.mu.Unlock() for segmentID, ts := range excludeInfo { - log.Ctx(context.TODO()).Debug("add exclude info", - zap.Int64("segmentID", segmentID), - zap.Uint64("ts", ts), - ) s.segments[segmentID] = ts } } +func (s *ExcludedSegments) logExcludeInfo(excludeInfo map[int64]uint64) { + segmentIDs := make([]int64, 0, len(excludeInfo)) + timeTicks := make([]uint64, 0, len(excludeInfo)) + for segmentID, ts := range excludeInfo { + segmentIDs = append(segmentIDs, segmentID) + timeTicks = append(timeTicks, ts) + } + log.Debug("add exclude info", + zap.Int("count", len(excludeInfo)), + zap.Int64s("segmentIDs", segmentIDs), + zap.Uint64s("timeTicks", timeTicks), + ) +} + // return false if segment has been excluded func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool { s.mu.RLock() @@ -65,6 +80,17 @@ func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool { } func (s *ExcludedSegments) CleanInvalid(ts uint64) { + removedSegmentIDs := make([]int64, 0, 32) + if log.Level().Enabled(zap.DebugLevel) { + defer func() { + log.Debug("remove segment from exclude info", + zap.Int("count", len(removedSegmentIDs)), + zap.Uint64("ts", ts), + zap.Int64s("segmentIDs", removedSegmentIDs), + ) + }() + } + s.mu.Lock() defer s.mu.Unlock() @@ -77,7 +103,7 @@ func (s *ExcludedSegments) CleanInvalid(ts uint64) { for _, segmentID := range invalidExcludedInfos { delete(s.segments, segmentID) - log.Ctx(context.TODO()).Debug("remove segment from exclude info", zap.Int64("segmentID", segmentID)) + removedSegmentIDs = append(removedSegmentIDs, segmentID) } s.lastClean.Store(time.Now()) } diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 306b514cb2..0fcb2adfe8 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -261,22 +261,12 @@ func (c *Collection) GetLoadType() querypb.LoadType { func (c *Collection) Ref(count uint32) uint32 { refCount := c.refCount.Add(count) - log.Debug("collection ref increment", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.Int64("collectionID", c.ID()), - zap.Uint32("refCount", refCount), - ) putOrUpdateStorageContext(c.Schema().GetProperties(), c.ID()) return refCount } func (c *Collection) Unref(count uint32) uint32 { refCount := c.refCount.Sub(count) - log.Debug("collection ref decrement", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.Int64("collectionID", c.ID()), - zap.Uint32("refCount", refCount), - ) return refCount } diff --git a/internal/querynodev2/segments/validate.go b/internal/querynodev2/segments/validate.go index 6bcfd46c7d..1f425c527b 100644 --- a/internal/querynodev2/segments/validate.go +++ b/internal/querynodev2/segments/validate.go @@ -19,9 +19,6 @@ package segments import ( "context" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) @@ -31,8 +28,6 @@ func validate(ctx context.Context, manager *Manager, collectionID int64, partiti return nil, merr.WrapErrCollectionNotFound(collectionID) } - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) - // validate segment segments := make([]Segment, 0, len(segmentIDs)) var err error diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index d8a70ed7f2..53eb52cae9 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -285,8 +285,6 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, collectio func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool { log := log.Ctx(ctx).With(zap.Int64("collection", collectionID), zap.Int64("partition", partitionID)) - log.Info("confirming if gc is finished") - req := &datapb.GcConfirmRequest{CollectionId: collectionID, PartitionId: partitionID} resp, err := b.s.mixCoord.GcConfirm(ctx, req) if err != nil { @@ -300,6 +298,5 @@ func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID return false } - log.Info("received gc_confirm response", zap.Bool("finished", resp.GetGcFinished())) return resp.GetGcFinished() } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 423588b272..19956a6534 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -702,7 +702,6 @@ func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collec return coll.Clone(), nil } if !coll.Available() { - log.Warn("collection not available", zap.Int64("collectionID", collectionID), zap.Any("state", coll.State)) return nil, merr.WrapErrCollectionNotFound(collectionID) } return filterUnavailable(coll), nil diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ae72f58b87..5c0a8a570d 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1135,7 +1135,9 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe } if err := t.WaitToFinish(); err != nil { - log.Warn("failed to describe collection", zap.Error(err)) + if !errors.Is(err, merr.ErrCollectionNotFound) { + log.Warn("failed to describe collection", zap.Error(err)) + } metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index 6cd3c4aebb..de3454171c 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -291,7 +291,11 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { // Push the confirmed messages into pending queue for consuming. if s.logger.Level().Enabled(zap.DebugLevel) { for _, m := range msgs { - s.logger.Debug("push committed message into pending queue", zap.Uint64("committedTimeTick", msg.TimeTick()), log.FieldMessage(m)) + s.logger.Debug( + "push message into pending queue", + zap.Uint64("committedTimeTick", msg.TimeTick()), + log.FieldMessage(m), + ) } } s.pendingQueue.Add(msgs) @@ -328,10 +332,4 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { // Observe the filtered message. s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes()) s.metrics.ObservePassedMessage(isTailing, msg.MessageType(), msg.EstimateSize()) - if s.logger.Level().Enabled(zap.DebugLevel) { - // Log the message if the log level is debug. - s.logger.Debug("push message into reorder buffer", - log.FieldMessage(msg), - zap.Bool("tailing", isTailing)) - } } diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go index 3b9fe9de7a..212e7301ab 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go @@ -172,9 +172,6 @@ func (impl *shardInterceptor) handleInsertMessage(ctx context.Context, msg messa // 2. partition is fenced. // 3. segment is not ready. // we just redo it to refresh a new latest timetick. - if impl.shardManager.Logger().Level().Enabled(zap.DebugLevel) { - impl.shardManager.Logger().Debug("segment assign interceptor redo insert message", zap.Object("message", msg), zap.Error(err)) - } return nil, redo.ErrRedo } if errors.IsAny(err, shards.ErrTooLargeInsert, shards.ErrPartitionNotFound, shards.ErrCollectionNotFound) { diff --git a/internal/streamingnode/server/wal/metricsutil/append.go b/internal/streamingnode/server/wal/metricsutil/append.go index 24b647e770..96801b885c 100644 --- a/internal/streamingnode/server/wal/metricsutil/append.go +++ b/internal/streamingnode/server/wal/metricsutil/append.go @@ -11,7 +11,10 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" ) -const maxRedoLogged = 3 +const ( + maxLogged = 3 + logThreshold = 10 * time.Millisecond +) type InterceptorMetrics struct { Before time.Duration @@ -19,8 +22,12 @@ type InterceptorMetrics struct { After time.Duration } +func (im *InterceptorMetrics) ShouldBeLogged() bool { + return im.Before > logThreshold || im.After > logThreshold || im.BeforeErr != nil +} + func (im *InterceptorMetrics) String() string { - return fmt.Sprintf("before: %s, after: %s, before_err: %v", im.Before, im.After, im.BeforeErr) + return fmt.Sprintf("b:%s,a:%s,err:%s", im.Before, im.After, im.BeforeErr) } // AppendMetrics is the metrics for append operation. @@ -67,16 +74,10 @@ func (m *AppendMetrics) StartAppendGuard() *AppendMetricsGuard { func (m *AppendMetrics) IntoLogFields() []zap.Field { fields := []zap.Field{ log.FieldMessage(m.msg), - zap.Duration("appendDuration", m.appendDuration), - zap.Duration("implAppendDuration", m.implAppendDuration), - } - for name, ims := range m.interceptors { - for i, im := range ims { - if i <= maxRedoLogged { - fields = append(fields, zap.Any(fmt.Sprintf("%s_%d", name, i), im)) - } - } + zap.Duration("duration", m.appendDuration), + zap.Duration("implDuration", m.implAppendDuration), } + if m.err != nil { fields = append(fields, zap.Error(m.err)) } else { @@ -87,6 +88,22 @@ func (m *AppendMetrics) IntoLogFields() []zap.Field { fields = append(fields, zap.Int64("txnID", int64(m.result.TxnCtx.TxnID))) } } + loggedInterceptorCount := 0 +L: + for name, ims := range m.interceptors { + for i, im := range ims { + if !im.ShouldBeLogged() { + continue + } + if loggedInterceptorCount <= maxLogged { + fields = append(fields, zap.Stringer(fmt.Sprintf("%s_%d", name, i), im)) + loggedInterceptorCount++ + } + if loggedInterceptorCount >= maxLogged { + break L + } + } + } return fields } diff --git a/internal/streamingnode/server/wal/metricsutil/wal_write.go b/internal/streamingnode/server/wal/metricsutil/wal_write.go index a4d447ec6b..f82355bc97 100644 --- a/internal/streamingnode/server/wal/metricsutil/wal_write.go +++ b/internal/streamingnode/server/wal/metricsutil/wal_write.go @@ -5,7 +5,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap/zapcore" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/v2/log" @@ -104,8 +103,9 @@ func (m *WriteMetrics) done(appendMetrics *AppendMetrics) { m.Logger().Warn("append message into wal too slow", appendMetrics.IntoLogFields()...) return } - if m.Logger().Level().Enabled(zapcore.DebugLevel) { - m.Logger().Debug("append message into wal", appendMetrics.IntoLogFields()...) + logLV := appendMetrics.msg.MessageType().LogLevel() + if m.Logger().Level().Enabled(logLV) { + m.Logger().Log(logLV, "append message into wal", appendMetrics.IntoLogFields()...) } } diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 45cc936ae2..370e69c54f 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -351,9 +351,6 @@ func (r *recoveryStorageImpl) handleInsert(msg message.ImmutableInsertMessageV1) for _, partition := range msg.Header().GetPartitions() { if segment, ok := r.segments[partition.SegmentAssignment.SegmentId]; ok && segment.IsGrowing() { segment.ObserveInsert(msg.TimeTick(), partition) - if r.Logger().Level().Enabled(zap.DebugLevel) { - r.Logger().Debug("insert entity", log.FieldMessage(msg), zap.Uint64("segmentRows", segment.Rows()), zap.Uint64("segmentBinary", segment.BinarySize())) - } } else { r.detectInconsistency(msg, "segment not found") } @@ -362,10 +359,6 @@ func (r *recoveryStorageImpl) handleInsert(msg message.ImmutableInsertMessageV1) // handleDelete handles the delete message. func (r *recoveryStorageImpl) handleDelete(msg message.ImmutableDeleteMessageV1) { - // nothing, current delete operation is managed by flowgraph, not recovery storage. - if r.Logger().Level().Enabled(zap.DebugLevel) { - r.Logger().Debug("delete entity", log.FieldMessage(msg)) - } } // handleCreateSegment handles the create segment message. diff --git a/pkg/streaming/util/message/marshal_log_object.go b/pkg/streaming/util/message/marshal_log_object.go index 8940ef2d47..bdeb8e0535 100644 --- a/pkg/streaming/util/message/marshal_log_object.go +++ b/pkg/streaming/util/message/marshal_log_object.go @@ -79,6 +79,7 @@ func (m *immutableTxnMessageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) er if m == nil { return nil } + enc.AddInt("messageCount", m.Size()) enc.AddArray("txn", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { txnMessage := AsImmutableTxnMessage(m) txnMessage.RangeOver(func(im ImmutableMessage) error { diff --git a/pkg/streaming/util/message/message_type.go b/pkg/streaming/util/message/message_type.go index 33660b3df2..7f95ace733 100644 --- a/pkg/streaming/util/message/message_type.go +++ b/pkg/streaming/util/message/message_type.go @@ -3,11 +3,118 @@ package message import ( "strconv" + "go.uber.org/zap/zapcore" + "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" ) type MessageType messagespb.MessageType +type MessageTypeProperties struct { + // The log level of the message, only used for logging, info by default. + LogLevel zapcore.Level + // A system message type is used to determine the commit behavior, used to achieve the consistency (timetick confirmed, txn) of the message. + IsSystem bool + // A self controlled message type is generated by the wal-system itself, not by the client. + SelfControlled bool + // An exclusive required message type should be appended exclusively at related vchannel(pchannel), + ExclusiveRequired bool + // a cipher enabled message type will be encrypted before appending to the wal if cipher is enabled. + CipherEnabled bool +} + +var messageTypePropertiesMap = map[MessageType]MessageTypeProperties{ + MessageTypeTimeTick: { + LogLevel: zapcore.DebugLevel, + IsSystem: true, + SelfControlled: true, + }, + MessageTypeInsert: { + LogLevel: zapcore.DebugLevel, + CipherEnabled: true, + }, + MessageTypeDelete: { + LogLevel: zapcore.DebugLevel, + CipherEnabled: true, + }, + MessageTypeCreateCollection: { + ExclusiveRequired: true, + }, + MessageTypeDropCollection: { + ExclusiveRequired: true, + }, + MessageTypeTruncateCollection: { + ExclusiveRequired: true, + }, + MessageTypeCreatePartition: { + ExclusiveRequired: true, + }, + MessageTypeDropPartition: { + ExclusiveRequired: true, + }, + MessageTypeImport: {}, + MessageTypeCreateSegment: { + SelfControlled: true, + }, + MessageTypeFlush: { + SelfControlled: true, + }, + MessageTypeManualFlush: { + ExclusiveRequired: true, + }, + MessageTypeAlterReplicateConfig: { + ExclusiveRequired: true, + }, + MessageTypeBeginTxn: { + LogLevel: zapcore.DebugLevel, + IsSystem: true, + }, + MessageTypeCommitTxn: { + LogLevel: zapcore.DebugLevel, + IsSystem: true, + }, + MessageTypeRollbackTxn: { + LogLevel: zapcore.DebugLevel, + IsSystem: true, + }, + MessageTypeTxn: { + LogLevel: zapcore.DebugLevel, + IsSystem: true, + }, + MessageTypeSchemaChange: { + ExclusiveRequired: true, + }, + MessageTypeAlterCollection: { + ExclusiveRequired: true, + }, + MessageTypeAlterLoadConfig: {}, + MessageTypeDropLoadConfig: {}, + MessageTypeCreateDatabase: {}, + MessageTypeAlterDatabase: {}, + MessageTypeDropDatabase: {}, + MessageTypeAlterAlias: {}, + MessageTypeDropAlias: {}, + MessageTypeAlterUser: {}, + MessageTypeDropUser: {}, + MessageTypeAlterRole: {}, + MessageTypeDropRole: {}, + MessageTypeAlterUserRole: {}, + MessageTypeDropUserRole: {}, + MessageTypeAlterPrivilege: {}, + MessageTypeDropPrivilege: {}, + MessageTypeAlterPrivilegeGroup: {}, + MessageTypeDropPrivilegeGroup: {}, + MessageTypeRestoreRBAC: {}, + MessageTypeAlterResourceGroup: {}, + MessageTypeDropResourceGroup: {}, + MessageTypeCreateIndex: {}, + MessageTypeAlterIndex: {}, + MessageTypeDropIndex: {}, + MessageTypeFlushAll: { + ExclusiveRequired: true, + }, +} + // String implements fmt.Stringer interface. func (t MessageType) String() string { return messagespb.MessageType_name[int32(t)] @@ -29,26 +136,27 @@ func (t MessageType) Valid() bool { // An exclusive required message type is that the message's timetick should keep same order with message id. // And when the message is appending, other messages with the same vchannel cannot append concurrently. func (t MessageType) IsExclusiveRequired() bool { - _, ok := exclusiveRequiredMessageType[t] - return ok + return messageTypePropertiesMap[t].ExclusiveRequired } // CanEnableCipher checks if the MessageType can enable cipher. func (t MessageType) CanEnableCipher() bool { - _, ok := cipherMessageType[t] - return ok + return messageTypePropertiesMap[t].CipherEnabled } // IsSysmtem checks if the MessageType is a system type. func (t MessageType) IsSystem() bool { - _, ok := systemMessageType[t] - return ok + return messageTypePropertiesMap[t].IsSystem } // IsSelfControlled checks if the MessageType is self controlled. func (t MessageType) IsSelfControlled() bool { - _, ok := selfControlledMessageType[t] - return ok + return messageTypePropertiesMap[t].SelfControlled +} + +// LogLevel returns the log level of the MessageType. +func (t MessageType) LogLevel() zapcore.Level { + return messageTypePropertiesMap[t].LogLevel } // unmarshalMessageType unmarshal MessageType from string. diff --git a/pkg/streaming/util/message/reflect_info.go b/pkg/streaming/util/message/reflect_info.go index 7885cff937..201c79e40f 100644 --- a/pkg/streaming/util/message/reflect_info.go +++ b/pkg/streaming/util/message/reflect_info.go @@ -1921,6 +1921,7 @@ type ( ImmutableFlushAllMessageV2 = SpecializedImmutableMessage[*FlushAllMessageHeader, *FlushAllMessageBody] BroadcastFlushAllMessageV2 = SpecializedBroadcastMessage[*FlushAllMessageHeader, *FlushAllMessageBody] BroadcastResultFlushAllMessageV2 = BroadcastResult[*FlushAllMessageHeader, *FlushAllMessageBody] + AckResultFlushAllMessageV2 = AckResult[*FlushAllMessageHeader, *FlushAllMessageBody] ) // MessageTypeWithVersion for FlushAllMessageV2 diff --git a/pkg/streaming/util/message/specialized_message.go b/pkg/streaming/util/message/specialized_message.go index aef7c7ad5f..4f5458eae9 100644 --- a/pkg/streaming/util/message/specialized_message.go +++ b/pkg/streaming/util/message/specialized_message.go @@ -8,39 +8,6 @@ import ( "google.golang.org/protobuf/proto" ) -// A system preserved message, should not allowed to provide outside of the streaming system. -var systemMessageType = map[MessageType]struct{}{ - MessageTypeTimeTick: {}, - MessageTypeBeginTxn: {}, - MessageTypeCommitTxn: {}, - MessageTypeRollbackTxn: {}, - MessageTypeTxn: {}, -} - -var selfControlledMessageType = map[MessageType]struct{}{ - MessageTypeTimeTick: {}, - MessageTypeCreateSegment: {}, - MessageTypeFlush: {}, -} - -var cipherMessageType = map[MessageType]struct{}{ - MessageTypeInsert: {}, - MessageTypeDelete: {}, -} - -var exclusiveRequiredMessageType = map[MessageType]struct{}{ - MessageTypeCreateCollection: {}, - MessageTypeDropCollection: {}, - MessageTypeCreatePartition: {}, - MessageTypeDropPartition: {}, - MessageTypeManualFlush: {}, - MessageTypeFlushAll: {}, - MessageTypeSchemaChange: {}, - MessageTypeAlterReplicateConfig: {}, - MessageTypeAlterCollection: {}, - MessageTypeTruncateCollection: {}, -} - // mustAsSpecializedMutableMessage converts a MutableMessage to a specialized MutableMessage. // It will panic if the message is not the target specialized message or failed to decode the specialized header. func mustAsSpecializedMutableMessage[H proto.Message, B proto.Message](msg BasicMessage) specializedMutableMessage[H, B] {