diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 48fbf5b314..ea3d729401 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -17,8 +17,6 @@ package meta import ( - "time" - "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -28,14 +26,12 @@ import ( type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo dmChannels map[string]*DmChannel - createTime time.Time } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget { return &CollectionTarget{ segments: segments, dmChannels: dmChannels, - createTime: time.Now(), } } @@ -55,10 +51,6 @@ func (p *CollectionTarget) GetAllDmChannelNames() []string { return lo.Keys(p.dmChannels) } -func (p *CollectionTarget) GetCreateTime() time.Time { - return p.createTime -} - func (p *CollectionTarget) IsEmpty() bool { return len(p.dmChannels)+len(p.segments) == 0 } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 1bc7d6fea6..7cc00de780 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -20,7 +20,6 @@ import ( "context" "errors" "sync" - "time" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -396,17 +395,6 @@ func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, sco return collectionTarget.GetAllSegments()[id] } -func (mgr *TargetManager) GetNextTargetCreateTime(collectionID int64) time.Time { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(NextTarget) - collectionTarget := targetMap.getCollectionTarget(collectionID) - if collectionTarget == nil { - return time.Time{} - } - return collectionTarget.GetCreateTime() -} - func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 601059f59a..52cfc08894 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -33,13 +33,12 @@ import ( type CollectionObserver struct { stopCh chan struct{} - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - targetObserver *TargetObserver - collectionLoadedCount map[int64]int - partitionLoadedCount map[int64]int - collectionNextTargetTime map[int64]time.Time + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *TargetObserver + collectionLoadedCount map[int64]int + partitionLoadedCount map[int64]int stopOnce sync.Once } @@ -51,14 +50,13 @@ func NewCollectionObserver( targetObserver *TargetObserver, ) *CollectionObserver { return &CollectionObserver{ - stopCh: make(chan struct{}), - dist: dist, - meta: meta, - targetMgr: targetMgr, - targetObserver: targetObserver, - collectionLoadedCount: make(map[int64]int), - partitionLoadedCount: make(map[int64]int), - collectionNextTargetTime: make(map[int64]time.Time), + stopCh: make(chan struct{}), + dist: dist, + meta: meta, + targetMgr: targetMgr, + targetObserver: targetObserver, + collectionLoadedCount: make(map[int64]int), + partitionLoadedCount: make(map[int64]int), } } @@ -201,16 +199,12 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(collection.GetReplicaNumber()))) } - targetTime := ob.targetMgr.GetNextTargetCreateTime(collection.CollectionID) - lastTime, ok := ob.collectionNextTargetTime[collection.CollectionID] - - if ok && targetTime.Equal(lastTime) && - loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && + if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && updated.LoadPercentage != 100 { + ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount return } - ob.collectionNextTargetTime[collection.GetCollectionID()] = targetTime ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { delete(ob.collectionLoadedCount, collection.GetCollectionID()) @@ -270,15 +264,11 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(partition.GetReplicaNumber()))) } - targetTime := ob.targetMgr.GetNextTargetCreateTime(partition.GetCollectionID()) - lastTime, ok := ob.collectionNextTargetTime[partition.GetCollectionID()] - - if ok && targetTime.Equal(lastTime) && - loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && + if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && updated.LoadPercentage != 100 { + ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount return } - ob.collectionNextTargetTime[partition.GetCollectionID()] = targetTime ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { delete(ob.partitionLoadedCount, partition.GetPartitionID())