diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 2bab21c0d0..23fd16d273 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -304,13 +304,6 @@ func (m *CollectionManager) PutCollection(collection *Collection) error { return m.putCollection(collection, true) } -func (m *CollectionManager) PutCollectionWithoutSave(collection *Collection) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - - m.putCollection(collection, false) -} - func (m *CollectionManager) UpdateCollection(collection *Collection) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index ea3d729401..48fbf5b314 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -17,6 +17,8 @@ package meta import ( + "time" + "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -26,12 +28,14 @@ import ( type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo dmChannels map[string]*DmChannel + createTime time.Time } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget { return &CollectionTarget{ segments: segments, dmChannels: dmChannels, + createTime: time.Now(), } } @@ -51,6 +55,10 @@ func (p *CollectionTarget) GetAllDmChannelNames() []string { return lo.Keys(p.dmChannels) } +func (p *CollectionTarget) GetCreateTime() time.Time { + return p.createTime +} + func (p *CollectionTarget) IsEmpty() bool { return len(p.dmChannels)+len(p.segments) == 0 } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 4e153a3dd9..cc0ea1ce09 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -20,6 +20,7 @@ import ( "context" "errors" "sync" + "time" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -402,6 +403,17 @@ func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, sco return collectionTarget.GetAllSegments()[id] } +func (mgr *TargetManager) GetNextTargetCreateTime(collectionID int64) time.Time { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + targetMap := mgr.getTarget(NextTarget) + collectionTarget := targetMap.getCollectionTarget(collectionID) + if collectionTarget == nil { + return time.Time{} + } + return collectionTarget.GetCreateTime() +} + func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 20a17d7ef4..dfc3630daf 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -34,12 +34,13 @@ import ( type CollectionObserver struct { stopCh chan struct{} - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - targetObserver *TargetObserver - collectionLoadedCount map[int64]int - partitionLoadedCount map[int64]int + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *TargetObserver + collectionLoadedCount map[int64]int + partitionLoadedCount map[int64]int + collectionNextTargetTime map[int64]time.Time stopOnce sync.Once } @@ -51,13 +52,14 @@ func NewCollectionObserver( targetObserver *TargetObserver, ) *CollectionObserver { return &CollectionObserver{ - stopCh: make(chan struct{}), - dist: dist, - meta: meta, - targetMgr: targetMgr, - targetObserver: targetObserver, - collectionLoadedCount: make(map[int64]int), - partitionLoadedCount: make(map[int64]int), + stopCh: make(chan struct{}), + dist: dist, + meta: meta, + targetMgr: targetMgr, + targetObserver: targetObserver, + collectionLoadedCount: make(map[int64]int), + partitionLoadedCount: make(map[int64]int), + collectionNextTargetTime: make(map[int64]time.Time), } } @@ -200,9 +202,16 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(collection.GetReplicaNumber()))) } - if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && updated.LoadPercentage != 100 { + targetTime := ob.targetMgr.GetNextTargetCreateTime(collection.CollectionID) + lastTime, ok := ob.collectionNextTargetTime[collection.CollectionID] + + if ok && targetTime.Equal(lastTime) && + loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && + updated.LoadPercentage != 100 { return } + + ob.collectionNextTargetTime[collection.GetCollectionID()] = targetTime ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { delete(ob.collectionLoadedCount, collection.GetCollectionID()) @@ -260,12 +269,17 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti zap.Int("loadSegmentCount", loadedCount-subChannelCount)) } updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(partition.GetReplicaNumber()))) - } - if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && updated.LoadPercentage != 100 { + targetTime := ob.targetMgr.GetNextTargetCreateTime(partition.GetCollectionID()) + lastTime, ok := ob.collectionNextTargetTime[partition.GetCollectionID()] + + if ok && targetTime.Equal(lastTime) && + loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && + updated.LoadPercentage != 100 { return } + ob.collectionNextTargetTime[partition.GetCollectionID()] = targetTime ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { delete(ob.partitionLoadedCount, partition.GetPartitionID())