From 1fff8d2c6654ebb11a8c4ead6823015cdbcc40bc Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 3 Aug 2023 10:37:06 +0800 Subject: [PATCH] Fix restore redrop enormous dup collections (#26029) Signed-off-by: yangxuan --- internal/rootcoord/root_coord.go | 29 ++++---- internal/rootcoord/root_coord_test.go | 102 ++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 15 deletions(-) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 0ce66b79c3..0e6a571c7f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -591,13 +591,12 @@ func (c *Core) restore(ctx context.Context) error { return err } for _, coll := range colls { - for _, part := range coll.Partitions { - ts, err := c.tsoAllocator.GenerateTSO(1) - if err != nil { - return err - } - - if coll.Available() { + ts, err := c.tsoAllocator.GenerateTSO(1) + if err != nil { + return err + } + if coll.Available() { + for _, part := range coll.Partitions { switch part.State { case pb.PartitionState_PartitionDropping: go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, part.Clone(), ts) @@ -605,14 +604,14 @@ func (c *Core) restore(ctx context.Context) error { go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts) default: } - } else { - switch coll.State { - case pb.CollectionState_CollectionDropping: - go c.garbageCollector.ReDropCollection(coll.Clone(), ts) - case pb.CollectionState_CollectionCreating: - go c.garbageCollector.RemoveCreatingCollection(coll.Clone()) - default: - } + } + } else { + switch coll.State { + case pb.CollectionState_CollectionDropping: + go c.garbageCollector.ReDropCollection(coll.Clone(), ts) + case pb.CollectionState_CollectionCreating: + go c.garbageCollector.RemoveCreatingCollection(coll.Clone()) + default: } } } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 7d46f83da0..5ba0a6a748 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -29,6 +29,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -40,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -1756,3 +1758,103 @@ func TestCore_Stop(t *testing.T) { assert.Equal(t, commonpb.StateCode_Abnormal, code) }) } + +type RootCoordSuite struct { + suite.Suite +} + +func (s *RootCoordSuite) TestRestore() { + meta := mockrootcoord.NewIMetaTable(s.T()) + gc := mockrootcoord.NewGarbageCollector(s.T()) + + finishCh := make(chan struct{}, 4) + gc.EXPECT().ReDropPartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once(). + Run(func(args mock.Arguments) { + finishCh <- struct{}{} + }) + gc.EXPECT().RemoveCreatingPartition(mock.Anything, mock.Anything, mock.Anything).Once(). + Run(func(args mock.Arguments) { + finishCh <- struct{}{} + }) + gc.EXPECT().ReDropCollection(mock.Anything, mock.Anything).Once(). + Run(func(args mock.Arguments) { + finishCh <- struct{}{} + }) + gc.EXPECT().RemoveCreatingCollection(mock.Anything).Once(). + Run(func(args mock.Arguments) { + finishCh <- struct{}{} + }) + + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything). + Return([]*model.Database{ + {Name: "available_colls_db"}, + {Name: "not_available_colls_db"}}, nil) + + meta.EXPECT().ListCollections(mock.Anything, "available_colls_db", mock.Anything, false). + Return([]*model.Collection{ + { + DBID: 1, + State: etcdpb.CollectionState_CollectionCreated, // available collection + PhysicalChannelNames: []string{"ch1"}, + Partitions: []*model.Partition{ + {State: etcdpb.PartitionState_PartitionDropping}, + {State: etcdpb.PartitionState_PartitionCreating}, + {State: etcdpb.PartitionState_PartitionDropped}, // ignored + }, + }, + }, nil) + meta.EXPECT().ListCollections(mock.Anything, "not_available_colls_db", mock.Anything, false). + Return([]*model.Collection{ + { + DBID: 1, + State: etcdpb.CollectionState_CollectionDropping, // not available collection + PhysicalChannelNames: []string{"ch1"}, + Partitions: []*model.Partition{ + {State: etcdpb.PartitionState_PartitionDropping}, + {State: etcdpb.PartitionState_PartitionCreating}, + {State: etcdpb.PartitionState_PartitionDropped}, + }, + }, + { + DBID: 1, + State: etcdpb.CollectionState_CollectionCreating, // not available collection + PhysicalChannelNames: []string{"ch1"}, + Partitions: []*model.Partition{ + {State: etcdpb.PartitionState_PartitionDropping}, + {State: etcdpb.PartitionState_PartitionCreating}, + {State: etcdpb.PartitionState_PartitionDropped}, + }, + }, + { + DBID: 1, + State: etcdpb.CollectionState_CollectionDropped, // ignored + PhysicalChannelNames: []string{"ch1"}, + Partitions: []*model.Partition{ + {State: etcdpb.PartitionState_PartitionDropping}, + {State: etcdpb.PartitionState_PartitionCreating}, + {State: etcdpb.PartitionState_PartitionDropped}, + }, + }, + }, nil) + + // ticker := newTickerWithMockNormalStream() + tsoAllocator := newMockTsoAllocator() + tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { + return 100, nil + } + core := newTestCore( + withGarbageCollector(gc), + // withTtSynchronizer(ticker), + withTsoAllocator(tsoAllocator), + // withValidProxyManager(), + withMeta(meta)) + core.restore(context.Background()) + + for i := 0; i < 4; i++ { + <-finishCh + } +} + +func TestRootCoordSuite(t *testing.T) { + suite.Run(t, new(RootCoordSuite)) +}