fix: Write back dbid modification for nonDB id collection (#33641)

See also #33608

Make `fixDefaultDBIDConsistency` also write back collection dbid
modification when nonDB id collection is found.

This fix shall prevent dropped collections of this kind show up again
after dropping and restart.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-06-06 14:29:53 +08:00 committed by GitHub
parent b69740c8f3
commit f6e251514f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 59 deletions

View File

@ -224,7 +224,7 @@ func (kc *Catalog) loadCollection(ctx context.Context, dbID int64, collectionID
if err != nil { if err != nil {
return nil, err return nil, err
} }
fixDefaultDBIDConsistency(info) kc.fixDefaultDBIDConsistency(ctx, info, ts)
return info, nil return info, nil
} }
return kc.loadCollectionFromDb(ctx, dbID, collectionID, ts) return kc.loadCollectionFromDb(ctx, dbID, collectionID, ts)
@ -640,7 +640,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
log.Warn("unmarshal collection info failed", zap.Error(err)) log.Warn("unmarshal collection info failed", zap.Error(err))
continue continue
} }
fixDefaultDBIDConsistency(&collMeta) kc.fixDefaultDBIDConsistency(ctx, &collMeta, ts)
collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts) collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -651,6 +651,22 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
return colls, nil return colls, nil
} }
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
// We have two versions of default databaseID (0 at legacy path, 1 at new path), we should keep consistent view when user use default database.
// all collections in default database should be marked with dbID 1.
// this method also update dbid in meta store when dbid is 0
// see also: https://github.com/milvus-io/milvus/issues/33608
func (kv *Catalog) fixDefaultDBIDConsistency(_ context.Context, collMeta *pb.CollectionInfo, ts typeutil.Timestamp) {
if collMeta.DbId == util.NonDBID {
coll := model.UnmarshalCollectionModel(collMeta)
cloned := coll.Clone()
cloned.DBID = util.DefaultDBID
kv.alterModifyCollection(coll, cloned, ts)
collMeta.DbId = util.DefaultDBID
}
}
func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix210, ts) _, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix210, ts)
if err != nil { if err != nil {
@ -1204,12 +1220,3 @@ func isDefaultDB(dbID int64) bool {
} }
return false return false
} }
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
// We have two versions of default databaseID (0 at legacy path, 1 at new path), we should keep consistent view when user use default database.
// all collections in default database should be marked with dbID 1.
func fixDefaultDBIDConsistency(coll *pb.CollectionInfo) {
if isDefaultDB(coll.DbId) {
coll.DbId = util.DefaultDBID
}
}

View File

@ -167,6 +167,7 @@ func TestCatalog_ListCollections(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts). kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return([]string{"key"}, []string{string(bColl)}, nil) Return([]string{"key"}, []string{string(bColl)}, nil)
kv.On("MultiSaveAndRemove", mock.Anything, mock.Anything, ts).Return(nil)
kc := Catalog{Snapshot: kv} kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, util.NonDBID, ts) ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
@ -245,6 +246,7 @@ func TestCatalog_ListCollections(t *testing.T) {
return strings.HasPrefix(prefix, FieldMetaPrefix) return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts). }), ts).
Return([]string{"key"}, []string{string(fm)}, nil) Return([]string{"key"}, []string{string(fm)}, nil)
kv.On("MultiSaveAndRemove", mock.Anything, mock.Anything, ts).Return(nil)
kc := Catalog{Snapshot: kv} kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, util.NonDBID, ts) ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
@ -259,37 +261,49 @@ func TestCatalog_ListCollections(t *testing.T) {
func TestCatalog_loadCollection(t *testing.T) { func TestCatalog_loadCollection(t *testing.T) {
t.Run("load failed", func(t *testing.T) { t.Run("load failed", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
snapshot := kv.NewMockSnapshotKV() kv := mocks.NewSnapShotKV(t)
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { kv.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("mock"))
return "", errors.New("mock") kc := Catalog{Snapshot: kv}
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.loadCollection(ctx, testDb, 1, 0) _, err := kc.loadCollection(ctx, testDb, 1, 0)
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("load, not collection info", func(t *testing.T) { t.Run("load, not collection info", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
snapshot := kv.NewMockSnapshotKV() kv := mocks.NewSnapShotKV(t)
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { kv.EXPECT().Load(mock.Anything, mock.Anything).Return("not in pb format", nil)
return "not in pb format", nil kc := Catalog{Snapshot: kv}
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.loadCollection(ctx, testDb, 1, 0) _, err := kc.loadCollection(ctx, testDb, 1, 0)
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("load, normal collection info", func(t *testing.T) { t.Run("load, normal collection info", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
snapshot := kv.NewMockSnapshotKV() coll := &pb.CollectionInfo{ID: 1, DbId: util.DefaultDBID}
coll := &pb.CollectionInfo{ID: 1}
value, err := proto.Marshal(coll) value, err := proto.Marshal(coll)
assert.NoError(t, err) assert.NoError(t, err)
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { kv := mocks.NewSnapShotKV(t)
return string(value), nil kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil)
kc := Catalog{Snapshot: kv}
got, err := kc.loadCollection(ctx, util.DefaultDBID, 1, 0)
assert.NoError(t, err)
assert.Equal(t, got.GetID(), coll.GetID())
})
t.Run("load, nonDBID collection info", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
ID: 1,
DbId: util.NonDBID,
Schema: &schemapb.CollectionSchema{},
} }
kc := Catalog{Snapshot: snapshot} value, err := proto.Marshal(coll)
got, err := kc.loadCollection(ctx, 0, 1, 0) assert.NoError(t, err)
kv := mocks.NewSnapShotKV(t)
kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil)
kv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil)
kc := Catalog{Snapshot: kv}
got, err := kc.loadCollection(ctx, util.NonDBID, 1, 0)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, got.GetID(), coll.GetID()) assert.Equal(t, got.GetID(), coll.GetID())
}) })
@ -344,9 +358,12 @@ func TestCatalog_GetCollectionByID(t *testing.T) {
ss := mocks.NewSnapShotKV(t) ss := mocks.NewSnapShotKV(t)
c := Catalog{Snapshot: ss} c := Catalog{Snapshot: ss}
ss.EXPECT().Load(mock.Anything, mock.Anything).Call.Return( ss.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("load error")).Twice()
func(key string, ts typeutil.Timestamp) string { coll, err := c.GetCollectionByID(ctx, 0, 1, 1)
if ts > 1000 { assert.Error(t, err)
assert.Nil(t, coll)
ss.EXPECT().Load(mock.Anything, mock.Anything).RunAndReturn(func(key string, ts uint64) (string, error) {
collByte, err := proto.Marshal(&pb.CollectionInfo{ collByte, err := proto.Marshal(&pb.CollectionInfo{
ID: 1, ID: 1,
Schema: &schemapb.CollectionSchema{ Schema: &schemapb.CollectionSchema{
@ -359,22 +376,9 @@ func TestCatalog_GetCollectionByID(t *testing.T) {
PartitionCreatedTimestamps: []uint64{1, 2, 3}, PartitionCreatedTimestamps: []uint64{1, 2, 3},
}) })
require.NoError(t, err) require.NoError(t, err)
return string(collByte) return string(collByte), nil
} }).Once()
return "" ss.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil)
},
func(key string, ts typeutil.Timestamp) error {
if ts > 1000 {
return nil
}
return errors.New("load error")
},
)
coll, err := c.GetCollectionByID(ctx, 0, 1, 1)
assert.Error(t, err)
assert.Nil(t, coll)
coll, err = c.GetCollectionByID(ctx, 0, 10000, 1) coll, err = c.GetCollectionByID(ctx, 0, 10000, 1)
assert.NoError(t, err) assert.NoError(t, err)
@ -397,7 +401,9 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
t.Run("partition version after 210", func(t *testing.T) { t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
coll := &pb.CollectionInfo{} coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
}
value, err := proto.Marshal(coll) value, err := proto.Marshal(coll)
assert.NoError(t, err) assert.NoError(t, err)
@ -425,7 +431,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background() ctx := context.Background()
partID := typeutil.UniqueID(1) partID := typeutil.UniqueID(1)
coll := &pb.CollectionInfo{PartitionIDs: []int64{partID}} coll := &pb.CollectionInfo{DbId: util.DefaultDBID, PartitionIDs: []int64{partID}}
value, err := proto.Marshal(coll) value, err := proto.Marshal(coll)
assert.NoError(t, err) assert.NoError(t, err)
@ -444,7 +450,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background() ctx := context.Background()
partition := "partition" partition := "partition"
coll := &pb.CollectionInfo{PartitionNames: []string{partition}} coll := &pb.CollectionInfo{DbId: util.DefaultDBID, PartitionNames: []string{partition}}
value, err := proto.Marshal(coll) value, err := proto.Marshal(coll)
assert.NoError(t, err) assert.NoError(t, err)
@ -463,6 +469,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background() ctx := context.Background()
coll := &pb.CollectionInfo{ coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
PartitionNames: []string{"partition"}, PartitionNames: []string{"partition"},
PartitionIDs: []int64{111}, PartitionIDs: []int64{111},
PartitionCreatedTimestamps: []uint64{111111}, PartitionCreatedTimestamps: []uint64{111111},
@ -698,7 +705,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
t.Run("partition version after 210", func(t *testing.T) { t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
coll := &pb.CollectionInfo{} coll := &pb.CollectionInfo{DbId: util.DefaultDBID}
value, err := proto.Marshal(coll) value, err := proto.Marshal(coll)
assert.NoError(t, err) assert.NoError(t, err)
@ -726,6 +733,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
ctx := context.Background() ctx := context.Background()
coll := &pb.CollectionInfo{ coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
PartitionIDs: []int64{101, 102}, PartitionIDs: []int64{101, 102},
PartitionNames: []string{"partition1", "partition2"}, PartitionNames: []string{"partition1", "partition2"},
PartitionCreatedTimestamps: []uint64{101, 102}, PartitionCreatedTimestamps: []uint64{101, 102},