From d7ebb25701d65e949f23dc76fb0cc1b5781dd5c3 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 7 Nov 2022 17:05:06 +0800 Subject: [PATCH] skip handoff event on unloaded partition (#20306) Signed-off-by: Wei Liu Signed-off-by: Wei Liu --- internal/querycoordv2/job/job_test.go | 1 + .../observers/handoff_observer.go | 86 +++++++++-- .../observers/handoff_observer_test.go | 136 ++++++++++++------ internal/querycoordv2/server.go | 1 + internal/querycoordv2/server_test.go | 1 + internal/querycoordv2/services_test.go | 1 + 6 files changed, 177 insertions(+), 49 deletions(-) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 32d744f495..bb12aba8f5 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -132,6 +132,7 @@ func (suite *JobSuite) SetupTest() { suite.meta, suite.dist, suite.targetMgr, + suite.broker, ) suite.scheduler = NewScheduler() diff --git a/internal/querycoordv2/observers/handoff_observer.go b/internal/querycoordv2/observers/handoff_observer.go index c6791a7ecd..8e94549795 100644 --- a/internal/querycoordv2/observers/handoff_observer.go +++ b/internal/querycoordv2/observers/handoff_observer.go @@ -18,6 +18,7 @@ package observers import ( "context" + "strings" "sync" "time" @@ -64,33 +65,40 @@ type HandoffObserver struct { meta *meta.Meta dist *meta.DistributionManager target *meta.TargetManager + broker meta.Broker revision int64 collectionStatus map[int64]CollectionHandoffStatus handoffEventLock sync.RWMutex handoffEvents map[int64]*HandoffEvent - // partition id -> queue + // collection id -> queue handoffSubmitOrders map[int64]queue + // collectionId -> loaded partitionId, only for load collection case + loadedPartitions map[int64]typeutil.UniqueSet stopOnce sync.Once } -func NewHandoffObserver(store meta.Store, meta *meta.Meta, dist *meta.DistributionManager, target *meta.TargetManager) *HandoffObserver { +func NewHandoffObserver(store meta.Store, meta *meta.Meta, dist *meta.DistributionManager, target *meta.TargetManager, broker meta.Broker) *HandoffObserver { return &HandoffObserver{ store: store, c: make(chan struct{}), meta: meta, dist: dist, target: target, + broker: broker, collectionStatus: map[int64]CollectionHandoffStatus{}, handoffEvents: map[int64]*HandoffEvent{}, handoffSubmitOrders: map[int64]queue{}, + loadedPartitions: map[int64]typeutil.Set[int64]{}, } } func (ob *HandoffObserver) Register(collectionIDs ...int64) { ob.handoffEventLock.Lock() defer ob.handoffEventLock.Unlock() + log.Info("Register handoff for collection", + zap.Int64s("collectionIDs", collectionIDs)) for _, collectionID := range collectionIDs { ob.collectionStatus[collectionID] = CollectionHandoffStatusRegistered @@ -100,9 +108,19 @@ func (ob *HandoffObserver) Register(collectionIDs ...int64) { func (ob *HandoffObserver) Unregister(ctx context.Context, collectionIDs ...int64) { ob.handoffEventLock.Lock() defer ob.handoffEventLock.Unlock() + log.Info("Unregister handoff for collection", + zap.Int64s("collectionIDs", collectionIDs)) for _, collectionID := range collectionIDs { delete(ob.collectionStatus, collectionID) + delete(ob.handoffSubmitOrders, collectionID) + } + + collectionSet := typeutil.NewUniqueSet(collectionIDs...) + for segmentID, event := range ob.handoffEvents { + if collectionSet.Contain(event.Segment.GetCollectionID()) { + delete(ob.handoffEvents, segmentID) + } } } @@ -115,6 +133,13 @@ func (ob *HandoffObserver) StartHandoff(collectionIDs ...int64) { } } +func (ob *HandoffObserver) GetEventNum() int { + ob.handoffEventLock.Lock() + defer ob.handoffEventLock.Unlock() + + return len(ob.handoffEvents) +} + func (ob *HandoffObserver) consumeOutdatedHandoffEvent(ctx context.Context) error { _, handoffReqValues, revision, err := ob.store.LoadHandoffWithRevision() if err != nil { @@ -231,18 +256,19 @@ func (ob *HandoffObserver) tryHandoff(ctx context.Context, segment *querypb.Segm ) log.Info("try handoff segment...") - status, ok := ob.collectionStatus[segment.GetCollectionID()] + status, collectionRegistered := ob.collectionStatus[segment.GetCollectionID()] if Params.QueryCoordCfg.AutoHandoff && - ok && + collectionRegistered && + ob.checkLoadStatus(segment) && (segment.GetIsFake() || ob.meta.CollectionManager.ContainAnyIndex(segment.GetCollectionID(), indexIDs...)) { event := ob.handoffEvents[segment.SegmentID] if event == nil { // record submit order - _, ok := ob.handoffSubmitOrders[segment.GetPartitionID()] + _, ok := ob.handoffSubmitOrders[segment.GetCollectionID()] if !ok { - ob.handoffSubmitOrders[segment.GetPartitionID()] = make([]int64, 0) + ob.handoffSubmitOrders[segment.GetCollectionID()] = make([]int64, 0) } - ob.handoffSubmitOrders[segment.GetPartitionID()] = append(ob.handoffSubmitOrders[segment.GetPartitionID()], segment.GetSegmentID()) + ob.handoffSubmitOrders[segment.GetCollectionID()] = append(ob.handoffSubmitOrders[segment.GetCollectionID()], segment.GetSegmentID()) } if status == CollectionHandoffStatusRegistered { @@ -272,6 +298,48 @@ func (ob *HandoffObserver) tryHandoff(ctx context.Context, segment *querypb.Segm } } +func (ob *HandoffObserver) checkLoadStatus(segment *querypb.SegmentInfo) bool { + if ob.meta.GetCollection(segment.GetCollectionID()) != nil { + // if collection is loaded, should check whether the partition has been droped! + if ob.loadedPartitions[segment.GetCollectionID()] == nil { + ob.loadedPartitions[segment.GetCollectionID()] = typeutil.NewUniqueSet() + } + + // should updated loaded partitions when meet new partitionID + if !ob.loadedPartitions[segment.GetCollectionID()].Contain(segment.GetPartitionID()) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + err := retry.Do(ctx, func() error { + partitionIDs, err := ob.broker.GetPartitions(ctx, segment.GetCollectionID()) + if err == nil { + ob.loadedPartitions[segment.GetCollectionID()].Insert(partitionIDs...) + return nil + } + return err + }, retry.Attempts(5)) + + if err != nil { + // collection has been dropped or released + if strings.Contains(err.Error(), "CollectionNotExists") || + ob.meta.GetCollection(segment.GetCollectionID()) == nil { + return false + } + // collection not released , but can get partition list to check handoff + log.Warn("handoff check load status failed due to get partitions failed", + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("partitionID", segment.GetPartitionID()), + zap.String("channel", segment.GetDmChannel()), + zap.Int64("segmentID", segment.GetSegmentID())) + return false + } + } + + return ob.loadedPartitions[segment.GetCollectionID()].Contain(segment.GetPartitionID()) + } + + return ob.meta.GetPartition(segment.GetPartitionID()) != nil +} + func (ob *HandoffObserver) handoff(segment *querypb.SegmentInfo) { targets := ob.target.GetSegmentsByCollection(segment.GetCollectionID(), segment.GetPartitionID()) // when handoff event load a Segment, it sobuld remove all recursive handoff compact from @@ -378,7 +446,7 @@ func (ob *HandoffObserver) tryClean(ctx context.Context) { ob.handoffEventLock.Lock() defer ob.handoffEventLock.Unlock() - for partitionID, partitionSubmitOrder := range ob.handoffSubmitOrders { + for collectionID, partitionSubmitOrder := range ob.handoffSubmitOrders { pos := 0 for _, segmentID := range partitionSubmitOrder { event, ok := ob.handoffEvents[segmentID] @@ -403,7 +471,7 @@ func (ob *HandoffObserver) tryClean(ctx context.Context) { break } } - ob.handoffSubmitOrders[partitionID] = partitionSubmitOrder[pos:] + ob.handoffSubmitOrders[collectionID] = partitionSubmitOrder[pos:] } } diff --git a/internal/querycoordv2/observers/handoff_observer_test.go b/internal/querycoordv2/observers/handoff_observer_test.go index 4f53c12cd3..e38448a71f 100644 --- a/internal/querycoordv2/observers/handoff_observer_test.go +++ b/internal/querycoordv2/observers/handoff_observer_test.go @@ -24,9 +24,6 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/suite" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/milvus-io/milvus-proto/go-api/commonpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" @@ -37,6 +34,9 @@ import ( "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -65,6 +65,7 @@ type HandoffObserverTestSuit struct { meta *meta.Meta dist *meta.DistributionManager target *meta.TargetManager + broker *meta.MockBroker // Test Object observer *HandoffObserver @@ -117,9 +118,10 @@ func (suite *HandoffObserverTestSuit) SetupTest() { suite.meta = meta.NewMeta(suite.idAllocator, suite.store) suite.dist = meta.NewDistributionManager() suite.target = meta.NewTargetManager() + suite.broker = meta.NewMockBroker(suite.T()) // Test Object - suite.observer = NewHandoffObserver(suite.store, suite.meta, suite.dist, suite.target) + suite.observer = NewHandoffObserver(suite.store, suite.meta, suite.dist, suite.target, suite.broker) suite.observer.Register(suite.collection) suite.observer.StartHandoff(suite.collection) suite.load() @@ -143,6 +145,7 @@ func (suite *HandoffObserverTestSuit) TestFlushingHandoff() { Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second err := suite.observer.Start(context.Background()) suite.NoError(err) + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partition}, nil) flushingSegment := &querypb.SegmentInfo{ SegmentID: 3, @@ -197,7 +200,7 @@ func (suite *HandoffObserverTestSuit) TestCompactHandoff() { Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second err := suite.observer.Start(context.Background()) suite.NoError(err) - + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partition}, nil) compactSegment := &querypb.SegmentInfo{ SegmentID: 3, CollectionID: suite.collection, @@ -249,6 +252,7 @@ func (suite *HandoffObserverTestSuit) TestRecursiveHandoff() { Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, GrowingSegments: typeutil.NewUniqueSet(3), }) + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partition}, nil) Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second err := suite.observer.Start(context.Background()) @@ -464,57 +468,107 @@ func (suite *HandoffObserverTestSuit) load() { suite.target.AddSegment(suite.sealedSegments...) } -func (suite *HandoffObserverTestSuit) TestHandoffOnUnLoadedPartition() { - const ( - collectionID = 111 - loadedPartitionID = 1 - unloadedPartitionID = 2 - ) - err := suite.meta.PutPartition(&meta.Partition{ - PartitionLoadInfo: &querypb.PartitionLoadInfo{ - CollectionID: collectionID, - PartitionID: loadedPartitionID, - ReplicaNumber: suite.replicaNumber, - Status: querypb.LoadStatus_Loaded, - }, - }) +func (suite *HandoffObserverTestSuit) TestHandoffOnUnloadedPartition() { + Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second + err := suite.observer.Start(context.Background()) suite.NoError(err) - // init leader view - suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ ID: 1, - CollectionID: collectionID, + CollectionID: suite.collection, Channel: suite.channel.ChannelName, Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, }) + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 2, + CollectionID: suite.collection, + Channel: suite.channel.ChannelName, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, + }) + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{2222}, nil) + + suite.observer.Register(suite.collection) + suite.observer.StartHandoff(suite.collection) + defer suite.observer.Unregister(context.TODO(), suite.collection) + + compactSegment1 := &querypb.SegmentInfo{ + SegmentID: 111, + CollectionID: suite.collection, + PartitionID: 1111, + SegmentState: commonpb.SegmentState_Sealed, + CompactionFrom: []int64{1}, + CreatedByCompaction: true, + IndexInfos: []*querypb.FieldIndexInfo{{IndexID: defaultIndexID}}, + } + + compactSegment2 := &querypb.SegmentInfo{ + SegmentID: 222, + CollectionID: suite.collection, + PartitionID: 2222, + SegmentState: commonpb.SegmentState_Sealed, + CompactionFrom: []int64{1}, + CreatedByCompaction: true, + IndexInfos: []*querypb.FieldIndexInfo{{IndexID: defaultIndexID}}, + } + suite.produceHandOffEvent(compactSegment1) + suite.produceHandOffEvent(compactSegment2) + + suite.Eventually(func() bool { + return !suite.target.ContainSegment(111) && suite.target.ContainSegment(222) + }, 3*time.Second, 1*time.Second) +} + +func (suite *HandoffObserverTestSuit) TestUnRegisterHandoff() { Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second - err = suite.observer.Start(context.Background()) + err := suite.observer.Start(context.Background()) suite.NoError(err) - compactSegment := &querypb.SegmentInfo{ - SegmentID: 3, - CollectionID: collectionID, - PartitionID: unloadedPartitionID, + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 1, + CollectionID: suite.collection, + Channel: suite.channel.ChannelName, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, + }) + + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 2, + CollectionID: suite.collection, + Channel: suite.channel.ChannelName, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, + }) + + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1111, 2222}, nil) + suite.observer.Register(suite.collection) + compactSegment1 := &querypb.SegmentInfo{ + SegmentID: 111, + CollectionID: suite.collection, + PartitionID: 1111, + SegmentState: commonpb.SegmentState_Sealed, + CompactionFrom: []int64{1}, + CreatedByCompaction: true, + IndexInfos: []*querypb.FieldIndexInfo{{IndexID: defaultIndexID}}, + } + suite.produceHandOffEvent(compactSegment1) + suite.Eventually(func() bool { + return suite.observer.GetEventNum() == 1 + }, 3*time.Second, 1*time.Second) + suite.observer.Unregister(context.TODO(), suite.collection) + + suite.observer.Register(suite.collection) + defer suite.observer.Unregister(context.TODO(), suite.collection) + compactSegment2 := &querypb.SegmentInfo{ + SegmentID: 222, + CollectionID: suite.collection, + PartitionID: 2222, SegmentState: commonpb.SegmentState_Sealed, CompactionFrom: []int64{2}, CreatedByCompaction: true, IndexInfos: []*querypb.FieldIndexInfo{{IndexID: defaultIndexID}}, } - suite.produceHandOffEvent(compactSegment) - + suite.produceHandOffEvent(compactSegment2) suite.Eventually(func() bool { - return suite.target.ContainSegment(1) && suite.target.ContainSegment(2) - }, 3*time.Second, 1*time.Second) - - suite.Eventually(func() bool { - return !suite.target.ContainSegment(3) - }, 3*time.Second, 1*time.Second) - - suite.Eventually(func() bool { - key := fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, suite.collection, suite.partition, 3) - value, err := suite.kv.Load(key) - return len(value) == 0 && err != nil + return suite.observer.GetEventNum() == 1 }, 3*time.Second, 1*time.Second) } @@ -526,6 +580,7 @@ func (suite *HandoffObserverTestSuit) TestFilterOutEventByIndexID() { Channel: suite.channel.ChannelName, Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}, 2: {NodeID: 2, Version: 0}}, }) + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partition}, nil) Params.QueryCoordCfg.CheckHandoffInterval = 1 * time.Second err := suite.observer.Start(context.Background()) @@ -556,6 +611,7 @@ func (suite *HandoffObserverTestSuit) TestFakedSegmentHandoff() { Channel: suite.channel.ChannelName, Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}}, }) + suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partition}, nil) Params.QueryCoordCfg.CheckHandoffInterval = 200 * time.Millisecond err := suite.observer.Start(context.Background()) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 77e57c1459..d0bc72f6b1 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -291,6 +291,7 @@ func (s *Server) initObserver() { s.meta, s.dist, s.targetMgr, + s.broker, ) } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 2a93c15c78..1f6dbcf8a5 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -360,6 +360,7 @@ func (suite *ServerSuite) hackServer() { suite.server.meta, suite.server.dist, suite.server.targetMgr, + suite.server.broker, ) suite.server.distController = dist.NewDistController( suite.server.cluster, diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 67cda0b144..73d971c9f1 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -132,6 +132,7 @@ func (suite *ServiceSuite) SetupTest() { suite.meta, suite.dist, suite.targetMgr, + suite.broker, ) suite.balancer = balance.NewRowCountBasedBalancer( suite.taskScheduler,