diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 42e02c95be..5fb6f18b30 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) // filterNode filter the invalid message of pipeline @@ -38,7 +37,7 @@ type filterNode struct { *BaseNode collectionID UniqueID manager *DataManager - excludedSegments *typeutil.ConcurrentMap[int64, uint64] + excludedSegments *ExcludedSegments channel string InsertMsgPolicys []InsertMsgFilter DeleteMsgPolicys []DeleteMsgFilter @@ -96,7 +95,9 @@ func (fNode *filterNode) Operate(in Msg) Msg { out.append(msg) } } - + if fNode.excludedSegments.ShouldClean() { + fNode.excludedSegments.CleanInvalid(streamMsgPack.EndTs) + } metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc() return out } @@ -133,7 +134,7 @@ func newFilterNode( collectionID int64, channel string, manager *DataManager, - excludedSegments *typeutil.ConcurrentMap[int64, uint64], + excludedSegments *ExcludedSegments, maxQueueLength int32, ) *filterNode { return &filterNode{ diff --git a/internal/querynodev2/pipeline/filter_node_test.go b/internal/querynodev2/pipeline/filter_node_test.go index 07df419c2d..b25d8d6930 100644 --- a/internal/querynodev2/pipeline/filter_node_test.go +++ b/internal/querynodev2/pipeline/filter_node_test.go @@ -18,6 +18,7 @@ package pipeline import ( "testing" + "time" "github.com/samber/lo" "github.com/stretchr/testify/suite" @@ -26,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) // test of filter node @@ -38,7 +38,7 @@ type FilterNodeSuite struct { channel string validSegmentIDs []int64 - excludedSegments *typeutil.ConcurrentMap[int64, uint64] + excludedSegments *ExcludedSegments excludedSegmentIDs []int64 insertSegmentIDs []int64 deleteSegmentSum int @@ -62,10 +62,12 @@ func (suite *FilterNodeSuite) SetupSuite() { suite.errSegmentID = 7 // init excludedSegment - suite.excludedSegments = typeutil.NewConcurrentMap[int64, uint64]() + suite.excludedSegments = NewExcludedSegments(0 * time.Second) + excludeInfo := map[int64]uint64{} for _, id := range suite.excludedSegmentIDs { - suite.excludedSegments.Insert(id, 1) + excludeInfo[id] = 1 } + suite.excludedSegments.Insert(excludeInfo) } // test filter node with collection load collection diff --git a/internal/querynodev2/pipeline/filter_policy.go b/internal/querynodev2/pipeline/filter_policy.go index 3e51bb7577..8e795452a9 100644 --- a/internal/querynodev2/pipeline/filter_policy.go +++ b/internal/querynodev2/pipeline/filter_policy.go @@ -18,7 +18,13 @@ package pipeline import ( "fmt" + "sync" + "time" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -54,11 +60,8 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error { } func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { - ts, ok := n.excludedSegments.Get(msg.SegmentID) + ok := n.excludedSegments.Verify(msg.SegmentID, msg.EndTimestamp) if !ok { - return nil - } - if msg.EndTimestamp <= ts { m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID()) return merr.WrapErrSegmentLack(msg.GetSegmentID(), m) } @@ -88,3 +91,60 @@ func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error { // all growing will be be in-memory to support dynamic partition load/release return nil } + +type ExcludedSegments struct { + mu sync.RWMutex + segments map[int64]uint64 // segmentID -> Excluded TS + lastClean atomic.Time + cleanInterval time.Duration +} + +func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments { + return &ExcludedSegments{ + segments: make(map[int64]uint64), + cleanInterval: cleanInterval, + } +} + +func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + for segmentID, ts := range excludeInfo { + log.Debug("add exclude info", + zap.Int64("segmentID", segmentID), + zap.Uint64("ts", ts), + ) + s.segments[segmentID] = ts + } +} + +func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool { + s.mu.RLock() + defer s.mu.RUnlock() + if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs { + return false + } + return true +} + +func (s *ExcludedSegments) CleanInvalid(ts uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + invalidExcludedInfos := []int64{} + for segmentsID, excludeTs := range s.segments { + if excludeTs < ts { + invalidExcludedInfos = append(invalidExcludedInfos, segmentsID) + } + } + + for _, segmentID := range invalidExcludedInfos { + delete(s.segments, segmentID) + } + s.lastClean.Store(time.Now()) +} + +func (s *ExcludedSegments) ShouldClean() bool { + return time.Since(s.lastClean.Load()) > s.cleanInterval +} diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index f5332d4d35..995adec414 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -17,15 +17,13 @@ package pipeline import ( - "go.uber.org/zap" + "time" "github.com/milvus-io/milvus/internal/querynodev2/delegator" base "github.com/milvus-io/milvus/internal/util/pipeline" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) // pipeline used for querynode @@ -37,18 +35,12 @@ type Pipeline interface { type pipeline struct { base.StreamPipeline - excludedSegments *typeutil.ConcurrentMap[int64, uint64] + excludedSegments *ExcludedSegments collectionID UniqueID } func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) { - for segmentID, ts := range excludeInfo { - log.Debug("pipeline add exclude info", - zap.Int64("segmentID", segmentID), - zap.Uint64("ts", ts), - ) - p.excludedSegments.Insert(segmentID, ts) - } + p.excludedSegments.Insert(excludeInfo) } func (p *pipeline) Close() { @@ -65,7 +57,7 @@ func NewPipeLine( delegator delegator.ShardDelegator, ) (Pipeline, error) { pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32() - excludedSegments := typeutil.NewConcurrentMap[int64, uint64]() + excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second)) p := &pipeline{ collectionID: collectionID, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 63c92abc84..96168e6089 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1909,9 +1909,6 @@ func (p *queryCoordConfig) init(base *BaseTable) { type queryNodeConfig struct { SoPath ParamItem `refreshable:"false"` - FlowGraphMaxQueueLength ParamItem `refreshable:"false"` - FlowGraphMaxParallelism ParamItem `refreshable:"false"` - // stats // Deprecated: Never used StatsPublishInterval ParamItem `refreshable:"true"` @@ -1979,6 +1976,11 @@ type queryNodeConfig struct { EnableWorkerSQCostMetrics ParamItem `refreshable:"true"` ExprEvalBatchSize ParamItem `refreshable:"false"` + + // pipeline + CleanExcludeSegInterval ParamItem `refreshable:"false"` + FlowGraphMaxQueueLength ParamItem `refreshable:"false"` + FlowGraphMaxParallelism ParamItem `refreshable:"false"` } func (p *queryNodeConfig) init(base *BaseTable) { @@ -2426,6 +2428,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.ExprEvalBatchSize.Init(base.mgr) + + p.CleanExcludeSegInterval = ParamItem{ + Key: "queryCoord.cleanExcludeSegmentInterval", + Version: "2.4.0", + DefaultValue: "60", + Doc: "the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds", + Export: true, + } + p.CleanExcludeSegInterval.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////