diff --git a/internal/rootcoord/ddl_callbacks_drop_partition.go b/internal/rootcoord/ddl_callbacks_drop_partition.go index 4184235300..8d7b3f74ea 100644 --- a/internal/rootcoord/ddl_callbacks_drop_partition.go +++ b/internal/rootcoord/ddl_callbacks_drop_partition.go @@ -144,5 +144,5 @@ func (t *partitionTombstone) ConfirmCanBeRemoved(ctx context.Context) (bool, err } func (t *partitionTombstone) Remove(ctx context.Context) error { - return t.meta.RemoveCollection(ctx, t.collectionID, 0) + return t.meta.RemovePartition(ctx, t.collectionID, t.partitionID, 0) } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index e2a7cbe130..aa47872b86 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -104,7 +104,7 @@ type IMetaTable interface { GetPChannelInfo(ctx context.Context, pchannel string) *rootcoordpb.GetPChannelInfoResponse AddPartition(ctx context.Context, partition *model.Partition) error DropPartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error - RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error + RemovePartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error // Alias AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error @@ -619,6 +619,9 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID log.Ctx(ctx).Warn("not found collection, skip remove", zap.Int64("collectionID", collectionID)) return nil } + if coll.State != pb.CollectionState_CollectionDropping { + return fmt.Errorf("remove collection which state is not dropping, collectionID: %d, state: %s", collectionID, coll.State.String()) + } ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) aliases := mt.listAliasesByID(collectionID) @@ -1150,18 +1153,15 @@ func (mt *MetaTable) DropPartition(ctx context.Context, collectionID UniqueID, p return nil } -func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { +func (mt *MetaTable) RemovePartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) - if err := mt.catalog.DropPartition(ctx1, dbID, collectionID, partitionID, ts); err != nil { - return err - } coll, ok := mt.collID2Meta[collectionID] if !ok { return nil } + loc := -1 for idx, part := range coll.Partitions { if part.PartitionID == partitionID { @@ -1169,9 +1169,20 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection break } } - if loc != -1 { - coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...) + if loc == -1 { + log.Ctx(ctx).Warn("not found partition, skip remove", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID)) + return nil } + partition := coll.Partitions[loc] + if partition.State != pb.PartitionState_PartitionDropping { + return fmt.Errorf("remove partition which state is not dropping, collection: %d, partition: %d, state: %s", collectionID, partitionID, partition.State.String()) + } + + ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) + if err := mt.catalog.DropPartition(ctx1, coll.DBID, collectionID, partitionID, ts); err != nil { + return err + } + coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...) log.Ctx(ctx).Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts)) return nil } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index a3494799bd..da8d5f27ad 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1177,6 +1177,10 @@ func TestMetaTable_RemoveCollection(t *testing.T) { ctx := context.Background() err := meta.RemoveCollection(ctx, 100, 9999) assert.Error(t, err) + + meta.collID2Meta[100].State = pb.CollectionState_CollectionDropping + err = meta.RemoveCollection(ctx, 100, 9999) + assert.Error(t, err) }) t.Run("normal case", func(t *testing.T) { @@ -1191,7 +1195,7 @@ func TestMetaTable_RemoveCollection(t *testing.T) { names: newNameDb(), aliases: newNameDb(), collID2Meta: map[typeutil.UniqueID]*model.Collection{ - 100: {Name: "collection"}, + 100: {Name: "collection", State: pb.CollectionState_CollectionDropping}, }, } channel.ResetStaticPChannelStatsManager() @@ -1205,6 +1209,74 @@ func TestMetaTable_RemoveCollection(t *testing.T) { }) } +func TestMetaTable_RemovePartition(t *testing.T) { + t.Run("catalog error", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("DropPartition", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.AnythingOfType("uint64"), + ).Return(errors.New("error mock AlterPartition")) + + meta := &MetaTable{ + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: { + CollectionID: 100, + DBID: int64(100), + Partitions: []*model.Partition{ + {PartitionID: 100, State: pb.PartitionState_PartitionCreated}, + }, + }, + }, + names: newNameDb(), + aliases: newNameDb(), + catalog: catalog, + } + + ctx := context.Background() + err := meta.RemovePartition(ctx, 100, 100, 9999) + assert.Error(t, err) + + meta.collID2Meta[100].Partitions[0].State = pb.PartitionState_PartitionDropping + err = meta.RemovePartition(ctx, 100, 100, 9999) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("DropPartition", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + names: newNameDb(), + aliases: newNameDb(), + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: { + Name: "collection", + Partitions: []*model.Partition{ + {PartitionID: 100, State: pb.PartitionState_PartitionDropping}, + }, + }, + }, + } + channel.ResetStaticPChannelStatsManager() + channel.RecoverPChannelStatsManager([]string{}) + meta.names.insert("", "collection", 100) + meta.names.insert("", "alias1", 100) + meta.names.insert("", "alias2", 100) + ctx := context.Background() + err := meta.RemovePartition(ctx, 100, 100, 9999) + assert.NoError(t, err) + }) +} + func TestMetaTable_reload(t *testing.T) { createMetaTableFn := func(catalogOpts ...func(*mocks.RootCoordCatalog)) *MetaTable { catalog := mocks.NewRootCoordCatalog(t) diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index 09ecacec30..51f8ae61b2 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -148,7 +148,7 @@ func (m mockMetaTable) ChangePartitionState(ctx context.Context, collectionID Un return m.ChangePartitionStateFunc(ctx, collectionID, partitionID, state, ts) } -func (m mockMetaTable) RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { +func (m mockMetaTable) RemovePartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error { return m.RemovePartitionFunc(ctx, collectionID, partitionID, ts) } diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index a7e0da0313..aa9b4e9543 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -3285,17 +3285,17 @@ func (_c *IMetaTable_RemoveCollection_Call) RunAndReturn(run func(context.Contex return _c } -// RemovePartition provides a mock function with given fields: ctx, dbID, collectionID, partitionID, ts -func (_m *IMetaTable) RemovePartition(ctx context.Context, dbID int64, collectionID int64, partitionID int64, ts uint64) error { - ret := _m.Called(ctx, dbID, collectionID, partitionID, ts) +// RemovePartition provides a mock function with given fields: ctx, collectionID, partitionID, ts +func (_m *IMetaTable) RemovePartition(ctx context.Context, collectionID int64, partitionID int64, ts uint64) error { + ret := _m.Called(ctx, collectionID, partitionID, ts) if len(ret) == 0 { panic("no return value specified for RemovePartition") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, uint64) error); ok { - r0 = rf(ctx, dbID, collectionID, partitionID, ts) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, uint64) error); ok { + r0 = rf(ctx, collectionID, partitionID, ts) } else { r0 = ret.Error(0) } @@ -3310,17 +3310,16 @@ type IMetaTable_RemovePartition_Call struct { // RemovePartition is a helper method to define mock.On call // - ctx context.Context -// - dbID int64 // - collectionID int64 // - partitionID int64 // - ts uint64 -func (_e *IMetaTable_Expecter) RemovePartition(ctx interface{}, dbID interface{}, collectionID interface{}, partitionID interface{}, ts interface{}) *IMetaTable_RemovePartition_Call { - return &IMetaTable_RemovePartition_Call{Call: _e.mock.On("RemovePartition", ctx, dbID, collectionID, partitionID, ts)} +func (_e *IMetaTable_Expecter) RemovePartition(ctx interface{}, collectionID interface{}, partitionID interface{}, ts interface{}) *IMetaTable_RemovePartition_Call { + return &IMetaTable_RemovePartition_Call{Call: _e.mock.On("RemovePartition", ctx, collectionID, partitionID, ts)} } -func (_c *IMetaTable_RemovePartition_Call) Run(run func(ctx context.Context, dbID int64, collectionID int64, partitionID int64, ts uint64)) *IMetaTable_RemovePartition_Call { +func (_c *IMetaTable_RemovePartition_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, ts uint64)) *IMetaTable_RemovePartition_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(uint64)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(uint64)) }) return _c } @@ -3330,7 +3329,7 @@ func (_c *IMetaTable_RemovePartition_Call) Return(_a0 error) *IMetaTable_RemoveP return _c } -func (_c *IMetaTable_RemovePartition_Call) RunAndReturn(run func(context.Context, int64, int64, int64, uint64) error) *IMetaTable_RemovePartition_Call { +func (_c *IMetaTable_RemovePartition_Call) RunAndReturn(run func(context.Context, int64, int64, uint64) error) *IMetaTable_RemovePartition_Call { _c.Call.Return(run) return _c }