diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 657c0f04be..7d525ead6c 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -194,3 +194,35 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan { rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix()) return rch } + +func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error { + ops := make([]clientv3.Op, 0, len(keys)) + for _, k := range keys { + op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix()) + ops = append(ops, op) + } + log.Printf("MultiRemoveWithPrefix") + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + return err +} + +func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { + ops := make([]clientv3.Op, 0, len(saves)) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) + } + + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) + } + + log.Printf("MultiSaveAndRemove") + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + return err +} diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 298881c5ef..9fac519af5 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -219,3 +219,74 @@ func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { assert.Error(t, err) assert.Empty(t, val) } + +func TestEtcdKV_MultiRemoveWithPrefix(t *testing.T) { + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + err = etcdKV.Save("x/abc/1", "1") + assert.Nil(t, err) + err = etcdKV.Save("x/abc/2", "2") + assert.Nil(t, err) + err = etcdKV.Save("x/def/1", "10") + assert.Nil(t, err) + err = etcdKV.Save("x/def/2", "20") + assert.Nil(t, err) + + err = etcdKV.MultiRemoveWithPrefix([]string{"x/abc", "x/def", "not-exist"}) + assert.Nil(t, err) + k, v, err := etcdKV.LoadWithPrefix("x") + assert.Nil(t, err) + assert.Zero(t, len(k)) + assert.Zero(t, len(v)) +} + +func TestEtcdKV_MultiSaveAndRemoveWithPrefix(t *testing.T) { + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + err = etcdKV.Save("x/abc/1", "1") + assert.Nil(t, err) + err = etcdKV.Save("x/abc/2", "2") + assert.Nil(t, err) + err = etcdKV.Save("x/def/1", "10") + assert.Nil(t, err) + err = etcdKV.Save("x/def/2", "20") + assert.Nil(t, err) + + err = etcdKV.MultiSaveAndRemoveWithPrefix(map[string]string{"y/k1": "v1", "y/k2": "v2"}, []string{"x/abc", "x/def", "not-exist"}) + assert.Nil(t, err) + k, v, err := etcdKV.LoadWithPrefix("x") + assert.Nil(t, err) + assert.Zero(t, len(k)) + assert.Zero(t, len(v)) + + k, v, err = etcdKV.LoadWithPrefix("y") + assert.Nil(t, err) + assert.Equal(t, len(k), len(v)) + assert.Equal(t, len(k), 2) + assert.Equal(t, k[0], rootPath+"/y/k1") + assert.Equal(t, k[1], rootPath+"/y/k2") + assert.Equal(t, v[0], "v1") + assert.Equal(t, v[1], "v2") +} diff --git a/internal/kv/kv.go b/internal/kv/kv.go index a685d9c7cf..a3b77cbd23 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -15,4 +15,6 @@ type Base interface { type TxnBase interface { Base MultiSaveAndRemove(saves map[string]string, removals []string) error + MultiRemoveWithPrefix(keys []string) error + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error } diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 0310ac90e6..858cb23297 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -128,3 +128,10 @@ func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error) { func (kv *MemoryKV) Close() { } + +func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error { + panic("not implement") +} +func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { + panic("not implement") +} diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index 80c1403581..61d4f20149 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -159,3 +159,10 @@ func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []stri err := kv.db.Write(kv.writeOptions, writeBatch) return err } + +func (kv *RocksdbKV) MultiRemoveWithPrefix(keys []string) error { + panic("not implement") +} +func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { + panic("not implement") +} diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 0e46602063..d14b0b056a 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -34,12 +34,13 @@ type metaTable struct { client kv.TxnBase // client of a reliable kv service, i.e. etcd client tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta - collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta, + collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta collName2ID map[string]typeutil.UniqueID // collection name to collection id - partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // partition id -> partition meta - segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta - indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // index id ->index meta + partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // collection_id/partition_id -> meta + segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta + indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // collection_id/index_id -> meta segID2CollID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> collection id + segID2PartitionID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> partition id partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID // partition id -> collection id tenantLock sync.RWMutex @@ -72,6 +73,7 @@ func (mt *metaTable) reloadFromKV() error { mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo) mt.partitionID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID) mt.segID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID) + mt.segID2PartitionID = make(map[typeutil.UniqueID]typeutil.UniqueID) _, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix) if err != nil { @@ -137,6 +139,7 @@ func (mt *metaTable) reloadFromKV() error { mt.partitionID2Meta[partitionInfo.PartitionID] = partitionInfo for _, segID := range partitionInfo.SegmentIDs { mt.segID2CollID[segID] = collID + mt.segID2PartitionID[segID] = partitionInfo.PartitionID } } @@ -203,14 +206,14 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn mt.indexID2Meta[i.IndexID] = *i } - k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10)) + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID) v1 := proto.MarshalTextString(coll) - k2 := path.Join(PartitionMetaPrefix, strconv.FormatInt(part.PartitionID, 10)) + k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, coll.ID, part.PartitionID) v2 := proto.MarshalTextString(part) meta := map[string]string{k1: v1, k2: v2} for _, i := range idx { - k := path.Join(IndexMetaPrefix, strconv.FormatInt(i.IndexID, 10)) + k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID) v := proto.MarshalTextString(i) meta[k] = v } @@ -232,11 +235,9 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { return fmt.Errorf("can't find collection. id = %d", collID) } - metaKeys := []string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))} delete(mt.collID2Meta, collID) delete(mt.collName2ID, collMeta.Schema.Name) for _, partID := range collMeta.PartitionIDs { - metaKeys = append(metaKeys, path.Join(PartitionMetaPrefix, strconv.FormatInt(partID, 10))) partMeta, ok := mt.partitionID2Meta[partID] if !ok { log.Warn("partition id not exist", zap.Int64("partition id", partID)) @@ -244,26 +245,29 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { } delete(mt.partitionID2Meta, partID) for _, segID := range partMeta.SegmentIDs { - segIndexMeta, ok := mt.segID2IndexMeta[segID] + _, ok := mt.segID2IndexMeta[segID] if !ok { log.Warn("segment id not exist", zap.Int64("segment id", segID)) continue } delete(mt.segID2IndexMeta, segID) - for indexID, segIdxMeta := range *segIndexMeta { - metaKeys = append(metaKeys, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(indexID, 10))) - indexMeta, ok := mt.indexID2Meta[segIdxMeta.IndexID] - if !ok { - //log.Printf("index id = %d not exist", segIdxMeta.IndexID) // segments may share index id - continue - } - delete(mt.indexID2Meta, segIdxMeta.IndexID) - metaKeys = append(metaKeys, path.Join(IndexMetaPrefix, strconv.FormatInt(indexMeta.IndexID, 10))) - } } } - err := mt.client.MultiRemove(metaKeys) - + for _, idxInfo := range collMeta.FieldIndexes { + _, ok := mt.indexID2Meta[idxInfo.IndexID] + if !ok { + log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID)) + continue + } + delete(mt.indexID2Meta, idxInfo.IndexID) + } + metas := []string{ + fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID), + fmt.Sprintf("%s/%d", PartitionMetaPrefix, collID), + fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID), + fmt.Sprintf("%s/%d", IndexMetaPrefix, collID), + } + err := mt.client.MultiRemoveWithPrefix(metas) if err != nil { _ = mt.reloadFromKV() return err @@ -370,9 +374,9 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string mt.collID2Meta[collID] = coll mt.partitionID2CollID[partitionID] = collID - k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10)) + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) v1 := proto.MarshalTextString(&coll) - k2 := path.Join(PartitionMetaPrefix, strconv.FormatInt(partitionID, 10)) + k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, partitionID) v2 := proto.MarshalTextString(&partMeta) meta := map[string]string{k1: v1, k2: v2} @@ -441,27 +445,24 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str collMeta.PartitionIDs = pd mt.collID2Meta[collID] = collMeta - delMetaKeys := []string{path.Join(PartitionMetaPrefix, strconv.FormatInt(partMeta.PartitionID, 10))} for _, segID := range partMeta.SegmentIDs { - segIndexMeta, ok := mt.segID2IndexMeta[segID] + _, ok := mt.segID2IndexMeta[segID] if !ok { log.Warn("segment has no index meta", zap.Int64("segment id", segID)) continue } delete(mt.segID2IndexMeta, segID) - for indexID, segIdxMeta := range *segIndexMeta { - delMetaKeys = append(delMetaKeys, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(indexID, 10))) - indexMeta, ok := mt.indexID2Meta[segIdxMeta.IndexID] - if !ok { - log.Warn("index id not exist", zap.Int64("index id", segIdxMeta.IndexID)) - continue - } - delete(mt.indexID2Meta, segIdxMeta.IndexID) - delMetaKeys = append(delMetaKeys, path.Join(IndexMetaPrefix, strconv.FormatInt(indexMeta.IndexID, 10))) - } } collKV := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} - err := mt.client.MultiSaveAndRemove(collKV, delMetaKeys) + delMetaKeys := []string{ + fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID), + } + for _, idxInfo := range collMeta.FieldIndexes { + k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partMeta.PartitionID) + delMetaKeys = append(delMetaKeys, k) + } + + err := mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys) if err != nil { _ = mt.reloadFromKV() @@ -513,7 +514,11 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.SegmentID) mt.partitionID2Meta[seg.PartitionID] = partMeta mt.segID2CollID[seg.SegmentID] = seg.CollectionID - err := mt.client.Save(path.Join(PartitionMetaPrefix, strconv.FormatInt(seg.PartitionID, 10)), proto.MarshalTextString(&partMeta)) + mt.segID2PartitionID[seg.SegmentID] = seg.PartitionID + k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID) + v := proto.MarshalTextString(&partMeta) + + err := mt.client.Save(k, v) if err != nil { _ = mt.reloadFromKV() @@ -544,6 +549,10 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error { if !ok { return fmt.Errorf("collection id = %d not found", collID) } + partID, ok := mt.segID2PartitionID[seg.SegmentID] + if !ok { + return fmt.Errorf("segment id = %d not belong to any partition", seg.SegmentID) + } exist := false for _, i := range collMeta.FieldIndexes { if i.IndexID == seg.IndexID { @@ -556,7 +565,7 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error { } (*(mt.segID2IndexMeta[seg.SegmentID]))[seg.IndexID] = *seg - k := path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(seg.SegmentID, 10), strconv.FormatInt(seg.IndexID, 10)) + k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, seg.IndexID, partID, seg.SegmentID) v := proto.MarshalTextString(seg) err := mt.client.Save(k, v) @@ -613,7 +622,6 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} delete(mt.indexID2Meta, dropIdxID) - delMeta := []string{path.Join(IndexMetaPrefix, strconv.FormatInt(dropIdxID, 10))} for _, partID := range collMeta.PartitionIDs { partMeta, ok := mt.partitionID2Meta[partID] @@ -627,13 +635,16 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. _, ok := (*segInfo)[dropIdxID] if ok { delete(*segInfo, dropIdxID) - delMeta = append(delMeta, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(dropIdxID, 10))) } } } } + delMeta := []string{ + fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID), + fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID), + } - err = mt.client.MultiSaveAndRemove(saveMeta, delMeta) + err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta) if err != nil { _ = mt.reloadFromKV() return 0, false, err