diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 559fc552ac..3cacf8c199 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -13,6 +13,7 @@ package masterservice import ( "context" + "encoding/json" "fmt" "math/rand" "sync" @@ -24,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" @@ -403,6 +405,26 @@ func TestMasterService(t *testing.T) { createMeta, err = core.MetaTable.GetCollectionByName("testColl-again") assert.Nil(t, err) assert.Equal(t, createMsg.CollectionID, createMeta.ID) + + // check DDMsg type and info + msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + assert.Nil(t, err) + assert.Equal(t, CreateCollectionMsgType, msgType) + + var meta map[string]string + metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) + assert.Nil(t, err) + err = json.Unmarshal([]byte(metaStr), &meta) + assert.Nil(t, err) + + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, createMeta.ID) + v1 := meta[k1] + var collInfo etcdpb.CollectionInfo + err = proto.UnmarshalText(v1, &collInfo) + assert.Nil(t, err) + assert.Equal(t, createMeta.ID, collInfo.ID) + assert.Equal(t, createMeta.CreateTime, collInfo.CreateTime) + assert.Equal(t, createMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) }) t.Run("has collection", func(t *testing.T) { @@ -524,6 +546,34 @@ func TestMasterService(t *testing.T) { assert.Equal(t, 1, len(pm.GetCollArray())) assert.Equal(t, "testColl", pm.GetCollArray()[0]) + + // check DDMsg type and info + msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + assert.Nil(t, err) + assert.Equal(t, CreatePartitionMsgType, msgType) + + var meta map[string]string + metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) + assert.Nil(t, err) + err = json.Unmarshal([]byte(metaStr), &meta) + assert.Nil(t, err) + + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID) + v1 := meta[k1] + var collInfo etcdpb.CollectionInfo + err = proto.UnmarshalText(v1, &collInfo) + assert.Nil(t, err) + assert.Equal(t, collMeta.ID, collInfo.ID) + assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime) + assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) + + k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID) + v2 := meta[k2] + var partInfo etcdpb.PartitionInfo + err = proto.UnmarshalText(v2, &partInfo) + assert.Nil(t, err) + assert.Equal(t, partMeta.PartitionName, partInfo.PartitionName) + assert.Equal(t, partMeta.PartitionID, partInfo.PartitionID) }) t.Run("has partition", func(t *testing.T) { @@ -913,6 +963,26 @@ func TestMasterService(t *testing.T) { assert.Equal(t, 2, len(pm.GetCollArray())) assert.Equal(t, "testColl", pm.GetCollArray()[1]) + + // check DDMsg type and info + msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + assert.Nil(t, err) + assert.Equal(t, DropPartitionMsgType, msgType) + + var meta map[string]string + metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) + assert.Nil(t, err) + err = json.Unmarshal([]byte(metaStr), &meta) + assert.Nil(t, err) + + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID) + v1 := meta[k1] + var collInfo etcdpb.CollectionInfo + err = proto.UnmarshalText(v1, &collInfo) + assert.Nil(t, err) + assert.Equal(t, collMeta.ID, collInfo.ID) + assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime) + assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) }) t.Run("drop collection", func(t *testing.T) { @@ -966,6 +1036,18 @@ func TestMasterService(t *testing.T) { collArray = pm.GetCollArray() assert.Equal(t, len(collArray), 3) assert.Equal(t, collArray[2], "testColl") + + // check DDMsg type and info + msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + assert.Nil(t, err) + assert.Equal(t, DropCollectionMsgType, msgType) + + var collID typeutil.UniqueID + collIDByte, err := core.MetaTable.client.Load(DDMsgPrefix) + assert.Nil(t, err) + err = json.Unmarshal([]byte(collIDByte), &collID) + assert.Nil(t, err) + assert.Equal(t, collMeta.ID, collID) }) t.Run("context_cancel", func(t *testing.T) { diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index fc26e8e0ec..336df1b9e5 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -12,6 +12,7 @@ package masterservice import ( + "encoding/json" "fmt" "path" "strconv" @@ -40,6 +41,15 @@ const ( PartitionMetaPrefix = ComponentPrefix + "/partition" SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" IndexMetaPrefix = ComponentPrefix + "/index" + + DDMsgPrefix = ComponentPrefix + "/dd-msg" + DDMsgTypePrefix = ComponentPrefix + "/dd-msg-type" + DDMsgFlagPrefix = ComponentPrefix + "/dd-msg-flag" + + CreateCollectionMsgType = "CreateCollection" + DropCollectionMsgType = "DropCollection" + CreatePartitionMsgType = "CreatePartition" + DropPartitionMsgType = "DropPartition" ) type metaTable struct { @@ -229,7 +239,6 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn if len(part.SegmentIDs) != 0 { return errors.New("segment should be empty when creating collection") } - if len(coll.PartitionIDs) != 0 { return errors.New("partitions should be empty when creating collection") } @@ -261,11 +270,21 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn meta[k] = v } - err := mt.client.MultiSave(meta) + // record ddmsg info and type + ddmsg, err := json.Marshal(meta) + if err != nil { + return err + } + meta[DDMsgPrefix] = string(ddmsg) + meta[DDMsgTypePrefix] = CreateCollectionMsgType + meta[DDMsgFlagPrefix] = "false" + + err = mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return err } + return nil } @@ -307,13 +326,25 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { } delete(mt.indexID2Meta, idxInfo.IndexID) } - metas := []string{ + delMetakeys := []string{ fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID), fmt.Sprintf("%s/%d", PartitionMetaPrefix, collID), fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID), fmt.Sprintf("%s/%d", IndexMetaPrefix, collID), } - err := mt.client.MultiRemoveWithPrefix(metas) + + // record ddmsg info and type + ddmsg, err := json.Marshal(collID) + if err != nil { + return err + } + saveMeta := map[string]string{ + DDMsgPrefix: string(ddmsg), + DDMsgTypePrefix: DropCollectionMsgType, + DDMsgFlagPrefix: "false", + } + + err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) if err != nil { _ = mt.reloadFromKV() return err @@ -426,8 +457,16 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string v2 := proto.MarshalTextString(&partMeta) meta := map[string]string{k1: v1, k2: v2} - err := mt.client.MultiSave(meta) + // record ddmsg info and type + ddmsg, err := json.Marshal(meta) + if err != nil { + return err + } + meta[DDMsgPrefix] = string(ddmsg) + meta[DDMsgTypePrefix] = CreatePartitionMsgType + meta[DDMsgFlagPrefix] = "false" + err = mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return err @@ -480,9 +519,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str partMeta = pm exist = true } - } - } if !exist { return 0, fmt.Errorf("partition %s does not exist", partitionName) @@ -512,8 +549,16 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str delMetaKeys = append(delMetaKeys, k) } - err := mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys) + // record ddmsg info and type + ddmsg, err := json.Marshal(collKV) + if err != nil { + return 0, err + } + collKV[DDMsgPrefix] = string(ddmsg) + collKV[DDMsgTypePrefix] = DropPartitionMsgType + collKV[DDMsgFlagPrefix] = "false" + err = mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys) if err != nil { _ = mt.reloadFromKV() return 0, err diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 6437e8fc37..8b5be630bc 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -430,8 +430,8 @@ func TestMetaTable(t *testing.T) { mockKV.multiSave = func(kvs map[string]string) error { return nil } - mockKV.multiRemoveWithPrefix = func(keys []string) error { - return fmt.Errorf("milti remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) error { + return fmt.Errorf("milti save and remove with prefix error") } collInfo.PartitionIDs = nil err := mt.AddCollection(collInfo, partInfo, idxInfo) @@ -440,7 +440,7 @@ func TestMetaTable(t *testing.T) { mt.indexID2Meta = make(map[int64]pb.IndexInfo) err = mt.DeleteCollection(collInfo.ID) assert.NotNil(t, err) - assert.EqualError(t, err, "milti remove with prefix error") + assert.EqualError(t, err, "milti save and remove with prefix error") }) t.Run("get collection failed", func(t *testing.T) { diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 6b078e376e..ab3254a7f0 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -133,6 +133,8 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { PartitionIDs: make([]typeutil.UniqueID, 0, 16), FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16), } + + // every collection has _default partition partMeta := etcdpb.PartitionInfo{ PartitionName: Params.DefaultPartitionName, PartitionID: partitionID, @@ -166,6 +168,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { if err != nil { return err } + + // schema is modified (add RowIDField and TimestampField), + // so need Marshal again schemaBytes, err := proto.Marshal(&schema) if err != nil { return err @@ -205,6 +210,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return err } + // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully + t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + return nil } @@ -269,6 +277,9 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { } }() + // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully + t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + return nil } @@ -450,6 +461,9 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { // error doesn't matter here _ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully + t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + return nil } @@ -504,6 +518,10 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { // error doesn't matter here _ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + + // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully + t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + return nil }