diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 5eac69fc00..56cf4203a6 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -99,9 +99,9 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { tenantMeta := pb.TenantMeta{} - err := proto.UnmarshalText(value, &tenantMeta) + err := proto.Unmarshal([]byte(value), &tenantMeta) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.TenantMeta err:%w", err) } mt.tenantID2Meta[tenantMeta.ID] = tenantMeta } @@ -113,9 +113,9 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { proxyMeta := pb.ProxyMeta{} - err = proto.UnmarshalText(value, &proxyMeta) + err = proto.Unmarshal([]byte(value), &proxyMeta) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.ProxyMeta err:%w", err) } mt.proxyID2Meta[proxyMeta.ID] = proxyMeta } @@ -127,9 +127,9 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { collInfo := pb.CollectionInfo{} - err = proto.UnmarshalText(value, &collInfo) + err = proto.Unmarshal([]byte(value), &collInfo) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.CollectionInfo err:%w", err) } mt.collID2Meta[collInfo.ID] = collInfo mt.collName2ID[collInfo.Schema.Name] = collInfo.ID @@ -141,9 +141,9 @@ func (mt *metaTable) reloadFromKV() error { } for _, value := range values { segmentIndexInfo := pb.SegmentIndexInfo{} - err = proto.UnmarshalText(value, &segmentIndexInfo) + err = proto.Unmarshal([]byte(value), &segmentIndexInfo) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.SegmentIndexInfo err:%w", err) } // update partID2SegID @@ -173,9 +173,9 @@ func (mt *metaTable) reloadFromKV() error { } for _, value := range values { meta := pb.IndexInfo{} - err = proto.UnmarshalText(value, &meta) + err = proto.Unmarshal([]byte(value), &meta) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.IndexInfo err:%w", err) } mt.indexID2Meta[meta.IndexID] = meta } @@ -186,9 +186,9 @@ func (mt *metaTable) reloadFromKV() error { } for _, value := range values { aliasInfo := pb.CollectionInfo{} - err = proto.UnmarshalText(value, &aliasInfo) + err = proto.Unmarshal([]byte(value), &aliasInfo) if err != nil { - return fmt.Errorf("RootCoord UnmarshalText pb.AliasInfo err:%w", err) + return fmt.Errorf("RootCoord Unmarshal pb.AliasInfo err:%w", err) } mt.collAlias2ID[aliasInfo.Schema.Name] = aliasInfo.ID } @@ -216,9 +216,9 @@ func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { defer mt.tenantLock.Unlock() k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID) - v := proto.MarshalTextString(te) + v, _ := proto.Marshal(te) - err := mt.client.Save(k, v, ts) + err := mt.client.Save(k, string(v), ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") @@ -232,9 +232,9 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { defer mt.proxyLock.Unlock() k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID) - v := proto.MarshalTextString(po) + v, _ := proto.Marshal(po) - err := mt.client.Save(k, v, ts) + err := mt.client.Save(k, string(v), ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") @@ -267,8 +267,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam for _, i := range idx { k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID) - v := proto.MarshalTextString(i) - meta[k] = v + v, _ := proto.Marshal(i) + meta[k] = string(v) } // save ddOpStr into etcd @@ -281,9 +281,9 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam mt.collID2Meta[coll.ID] = *coll mt.collName2ID[coll.Schema.Name] = coll.ID k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID) - v1 := proto.MarshalTextString(coll) - meta[k1] = v1 - return k1, v1, nil + v1, _ := proto.Marshal(coll) + meta[k1] = string(v1) + return k1, string(v1), nil } err := mt.client.MultiSave(meta, ts, addition, saveColl) @@ -392,7 +392,7 @@ func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeut return nil, err } colMeta := pb.CollectionInfo{} - err = proto.UnmarshalText(val, &colMeta) + err = proto.Unmarshal([]byte(val), &colMeta) if err != nil { return nil, err } @@ -423,7 +423,7 @@ func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Time } for _, val := range vals { collMeta := pb.CollectionInfo{} - err = proto.UnmarshalText(val, &collMeta) + err = proto.Unmarshal([]byte(val), &collMeta) if err != nil { log.Debug("unmarshal collection info failed", zap.Error(err)) continue @@ -455,7 +455,7 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.Coll } for _, val := range vals { collMeta := pb.CollectionInfo{} - err := proto.UnmarshalText(val, &collMeta) + err := proto.Unmarshal([]byte(val), &collMeta) if err != nil { log.Debug("unmarshal collection info failed", zap.Error(err)) } @@ -546,10 +546,10 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string mt.collID2Meta[collID] = coll k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) - v1 := proto.MarshalTextString(&coll) - meta[k1] = v1 + v1, _ := proto.Marshal(&coll) + meta[k1] = string(v1) - return k1, v1, nil + return k1, string(v1), nil } err := mt.client.MultiSave(meta, ts, addition, saveColl) @@ -581,7 +581,7 @@ func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, return "", err } collMeta := pb.CollectionInfo{} - err = proto.UnmarshalText(collVal, &collMeta) + err = proto.Unmarshal([]byte(collVal), &collMeta) if err != nil { return "", err } @@ -612,7 +612,7 @@ func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName return 0, err } collMeta := pb.CollectionInfo{} - err = proto.UnmarshalText(collVal, &collMeta) + err = proto.Unmarshal([]byte(collVal), &collMeta) if err != nil { return 0, err } @@ -683,7 +683,9 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } delete(mt.partID2SegID, partID) - meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} + k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)) + v, _ := proto.Marshal(&collMeta) + meta := map[string]string{k: string(v)} delMetaKeys := []string{} for _, idxInfo := range collMeta.FieldIndexes { k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID) @@ -744,9 +746,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID) - v := proto.MarshalTextString(segIdxInfo) + v, _ := proto.Marshal(segIdxInfo) - err := mt.client.Save(k, v, ts) + err := mt.client.Save(k, string(v), ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") @@ -799,7 +801,9 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeuti } collMeta.FieldIndexes = fieldIdxInfo mt.collID2Meta[collID] = collMeta - saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} + k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)) + v, _ := proto.Marshal(&collMeta) + saveMeta := map[string]string{k: string(v)} delete(mt.indexID2Meta, dropIdxID) @@ -977,20 +981,20 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx) mt.collID2Meta[collMeta.ID] = collMeta k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10)) - v1 := proto.MarshalTextString(&collMeta) + v1, _ := proto.Marshal(&collMeta) mt.indexID2Meta[idx.IndexID] = *idxInfo k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10)) - v2 := proto.MarshalTextString(idxInfo) - meta := map[string]string{k1: v1, k2: v2} + v2, _ := proto.Marshal(idxInfo) + meta := map[string]string{k1: string(v1), k2: string(v2)} if dupIdx != 0 { dupInfo := mt.indexID2Meta[dupIdx] dupInfo.IndexName = dupInfo.IndexName + "_bak" mt.indexID2Meta[dupIdx] = dupInfo k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10)) - v := proto.MarshalTextString(&dupInfo) - meta[k] = v + v, _ := proto.Marshal(&dupInfo) + meta[k] = string(v) } err = mt.client.MultiSave(meta, ts) if err != nil { @@ -1003,15 +1007,15 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id existInfo.IndexName = idxInfo.IndexName mt.indexID2Meta[existInfo.IndexID] = existInfo k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10)) - v := proto.MarshalTextString(&existInfo) - meta := map[string]string{k: v} + v, _ := proto.Marshal(&existInfo) + meta := map[string]string{k: string(v)} if dupIdx != 0 { dupInfo := mt.indexID2Meta[dupIdx] dupInfo.IndexName = dupInfo.IndexName + "_bak" mt.indexID2Meta[dupIdx] = dupInfo k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10)) - v := proto.MarshalTextString(&dupInfo) - meta[k] = v + v, _ := proto.Marshal(&dupInfo) + meta[k] = string(v) } err = mt.client.MultiSave(meta, ts) @@ -1116,9 +1120,9 @@ func (mt *metaTable) AddAlias(collectionAlias string, collectionName string, addition := mt.getAdditionKV(ddOpStr, meta) saveAlias := func(ts typeutil.Timestamp) (string, string, error) { k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias) - v1 := proto.MarshalTextString(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}}) - meta[k1] = v1 - return k1, v1, nil + v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}}) + meta[k1] = string(v1) + return k1, string(v1), nil } err := mt.client.MultiSave(meta, ts, addition, saveAlias) @@ -1166,9 +1170,9 @@ func (mt *metaTable) AlterAlias(collectionAlias string, collectionName string, t addition := mt.getAdditionKV(ddOpStr, meta) alterAlias := func(ts typeutil.Timestamp) (string, string, error) { k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias) - v1 := proto.MarshalTextString(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}}) - meta[k1] = v1 - return k1, v1, nil + v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}}) + meta[k1] = string(v1) + return k1, string(v1), nil } err := mt.client.MultiSave(meta, ts, addition, alterAlias) diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 8c3a8b1299..c69f929463 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -68,53 +68,58 @@ func Test_MockKV(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "load prefix error") + // tenant prefix[TenantMetaPrefix] = []string{"tenant-prefix"} _, err = NewMetaTable(k1) assert.NotNil(t, err) - assert.EqualError(t, err, "RootCoord UnmarshalText pb.TenantMeta err:line 1.0: unknown field name \"tenant-prefix\" in milvus.proto.etcd.TenantMeta") - prefix[TenantMetaPrefix] = []string{proto.MarshalTextString(&pb.TenantMeta{})} + value, _ := proto.Marshal(&pb.TenantMeta{}) + prefix[TenantMetaPrefix] = []string{string(value)} _, err = NewMetaTable(k1) assert.NotNil(t, err) + // proxy prefix[ProxyMetaPrefix] = []string{"porxy-meta"} _, err = NewMetaTable(k1) assert.NotNil(t, err) - assert.EqualError(t, err, "RootCoord UnmarshalText pb.ProxyMeta err:line 1.0: unknown field name \"porxy-meta\" in milvus.proto.etcd.ProxyMeta") - prefix[ProxyMetaPrefix] = []string{proto.MarshalTextString(&pb.ProxyMeta{})} + value, _ = proto.Marshal(&pb.ProxyMeta{}) + prefix[ProxyMetaPrefix] = []string{string(value)} _, err = NewMetaTable(k1) assert.NotNil(t, err) + // collection prefix[CollectionMetaPrefix] = []string{"collection-meta"} _, err = NewMetaTable(k1) assert.NotNil(t, err) - assert.EqualError(t, err, "RootCoord UnmarshalText pb.CollectionInfo err:line 1.0: unknown field name \"collection-meta\" in milvus.proto.etcd.CollectionInfo") - prefix[CollectionMetaPrefix] = []string{proto.MarshalTextString(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})} + value, _ = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}}) + prefix[CollectionMetaPrefix] = []string{string(value)} _, err = NewMetaTable(k1) assert.NotNil(t, err) + // segment index prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"} _, err = NewMetaTable(k1) assert.NotNil(t, err) - assert.EqualError(t, err, "RootCoord UnmarshalText pb.SegmentIndexInfo err:line 1.0: unknown field name \"segment-index-meta\" in milvus.proto.etcd.SegmentIndexInfo") - prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{})} + value, _ = proto.Marshal(&pb.SegmentIndexInfo{}) + prefix[SegmentIndexMetaPrefix] = []string{string(value)} _, err = NewMetaTable(k1) assert.NotNil(t, err) - prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{}), proto.MarshalTextString(&pb.SegmentIndexInfo{})} + prefix[SegmentIndexMetaPrefix] = []string{string(value), string(value)} _, err = NewMetaTable(k1) assert.NotNil(t, err) assert.EqualError(t, err, "load prefix error") + // index prefix[IndexMetaPrefix] = []string{"index-meta"} _, err = NewMetaTable(k1) assert.NotNil(t, err) - assert.EqualError(t, err, "RootCoord UnmarshalText pb.IndexInfo err:line 1.0: unknown field name \"index-meta\" in milvus.proto.etcd.IndexInfo") - prefix[IndexMetaPrefix] = []string{proto.MarshalTextString(&pb.IndexInfo{})} + value, _ = proto.Marshal(&pb.IndexInfo{}) + prefix[IndexMetaPrefix] = []string{string(value)} m1, err := NewMetaTable(k1) assert.NotNil(t, err) assert.EqualError(t, err, "load prefix error") diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7a678c0500..c816885bbd 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -56,7 +56,7 @@ import ( // DdOperation used to save ddMsg into ETCD type DdOperation struct { - Body string `json:"body"` + Body []byte `json:"body"` Type string `json:"type"` } @@ -1006,7 +1006,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { switch ddOp.Type { case CreateCollectionDDType: var ddReq = internalpb.CreateCollectionRequest{} - if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { return err } collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) @@ -1019,7 +1019,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { invalidateCache = false case DropCollectionDDType: var ddReq = internalpb.DropCollectionRequest{} - if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { return err } ts = ddReq.Base.Timestamp @@ -1034,7 +1034,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { invalidateCache = true case CreatePartitionDDType: var ddReq = internalpb.CreatePartitionRequest{} - if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { return err } ts = ddReq.Base.Timestamp @@ -1052,7 +1052,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { invalidateCache = true case DropPartitionDDType: var ddReq = internalpb.DropPartitionRequest{} - if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { return err } ts = ddReq.Base.Timestamp diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 56b2982ad4..ce56892397 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -730,7 +730,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, CreateCollectionDDType, ddOp.Type) var ddCollReq = internalpb.CreateCollectionRequest{} - err = proto.UnmarshalText(ddOp.Body, &ddCollReq) + err = proto.Unmarshal(ddOp.Body, &ddCollReq) assert.Nil(t, err) assert.Equal(t, createMeta.ID, ddCollReq.CollectionID) assert.Equal(t, createMeta.PartitionIDs[0], ddCollReq.PartitionID) @@ -900,7 +900,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, CreatePartitionDDType, ddOp.Type) var ddReq = internalpb.CreatePartitionRequest{} - err = proto.UnmarshalText(ddOp.Body, &ddReq) + err = proto.Unmarshal(ddOp.Body, &ddReq) assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID) @@ -1235,7 +1235,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, DropPartitionDDType, ddOp.Type) var ddReq = internalpb.DropPartitionRequest{} - err = proto.UnmarshalText(ddOp.Body, &ddReq) + err = proto.Unmarshal(ddOp.Body, &ddReq) assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) assert.Equal(t, dropPartID, ddReq.PartitionID) @@ -1325,7 +1325,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, DropCollectionDDType, ddOp.Type) var ddReq = internalpb.DropCollectionRequest{} - err = proto.UnmarshalText(ddOp.Body, &ddReq) + err = proto.Unmarshal(ddOp.Body, &ddReq) assert.Nil(t, err) assert.Equal(t, collMeta.ID, ddReq.CollectionID) diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index ec158a8f34..ebbb84756e 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -72,9 +72,12 @@ func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueI // EncodeDdOperation serialize DdOperation into string func EncodeDdOperation(m proto.Message, ddType string) (string, error) { - mStr := proto.MarshalTextString(m) + mByte, err := proto.Marshal(m) + if err != nil { + return "", err + } ddOp := DdOperation{ - Body: mStr, + Body: mByte, Type: ddType, } ddOpByte, err := json.Marshal(ddOp)