Remove with prefix

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2021-03-06 16:00:41 +08:00 committed by yefu.chen
parent ce5d7c7ecc
commit efa7306ef6
6 changed files with 172 additions and 42 deletions

View File

@ -194,3 +194,35 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix())
return rch
}
func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error {
ops := make([]clientv3.Op, 0, len(keys))
for _, k := range keys {
op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix())
ops = append(ops, op)
}
log.Printf("MultiRemoveWithPrefix")
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}
func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
ops := make([]clientv3.Op, 0, len(saves))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
}
for _, keyDelete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
}
log.Printf("MultiSaveAndRemove")
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}

View File

@ -219,3 +219,74 @@ func TestEtcdKV_MultiSaveAndRemove(t *testing.T) {
assert.Error(t, err)
assert.Empty(t, val)
}
func TestEtcdKV_MultiRemoveWithPrefix(t *testing.T) {
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
err = etcdKV.Save("x/abc/1", "1")
assert.Nil(t, err)
err = etcdKV.Save("x/abc/2", "2")
assert.Nil(t, err)
err = etcdKV.Save("x/def/1", "10")
assert.Nil(t, err)
err = etcdKV.Save("x/def/2", "20")
assert.Nil(t, err)
err = etcdKV.MultiRemoveWithPrefix([]string{"x/abc", "x/def", "not-exist"})
assert.Nil(t, err)
k, v, err := etcdKV.LoadWithPrefix("x")
assert.Nil(t, err)
assert.Zero(t, len(k))
assert.Zero(t, len(v))
}
func TestEtcdKV_MultiSaveAndRemoveWithPrefix(t *testing.T) {
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
err = etcdKV.Save("x/abc/1", "1")
assert.Nil(t, err)
err = etcdKV.Save("x/abc/2", "2")
assert.Nil(t, err)
err = etcdKV.Save("x/def/1", "10")
assert.Nil(t, err)
err = etcdKV.Save("x/def/2", "20")
assert.Nil(t, err)
err = etcdKV.MultiSaveAndRemoveWithPrefix(map[string]string{"y/k1": "v1", "y/k2": "v2"}, []string{"x/abc", "x/def", "not-exist"})
assert.Nil(t, err)
k, v, err := etcdKV.LoadWithPrefix("x")
assert.Nil(t, err)
assert.Zero(t, len(k))
assert.Zero(t, len(v))
k, v, err = etcdKV.LoadWithPrefix("y")
assert.Nil(t, err)
assert.Equal(t, len(k), len(v))
assert.Equal(t, len(k), 2)
assert.Equal(t, k[0], rootPath+"/y/k1")
assert.Equal(t, k[1], rootPath+"/y/k2")
assert.Equal(t, v[0], "v1")
assert.Equal(t, v[1], "v2")
}

View File

@ -15,4 +15,6 @@ type Base interface {
type TxnBase interface {
Base
MultiSaveAndRemove(saves map[string]string, removals []string) error
MultiRemoveWithPrefix(keys []string) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
}

View File

@ -128,3 +128,10 @@ func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error) {
func (kv *MemoryKV) Close() {
}
func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implement")
}
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implement")
}

View File

@ -159,3 +159,10 @@ func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []stri
err := kv.db.Write(kv.writeOptions, writeBatch)
return err
}
func (kv *RocksdbKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implement")
}
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implement")
}

View File

@ -34,12 +34,13 @@ type metaTable struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta,
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta
collName2ID map[string]typeutil.UniqueID // collection name to collection id
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // partition id -> partition meta
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // index id ->index meta
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // collection_id/partition_id -> meta
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // collection_id/index_id -> meta
segID2CollID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> collection id
segID2PartitionID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> partition id
partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID // partition id -> collection id
tenantLock sync.RWMutex
@ -72,6 +73,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
mt.partitionID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID)
mt.segID2CollID = make(map[typeutil.UniqueID]typeutil.UniqueID)
mt.segID2PartitionID = make(map[typeutil.UniqueID]typeutil.UniqueID)
_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix)
if err != nil {
@ -137,6 +139,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.partitionID2Meta[partitionInfo.PartitionID] = partitionInfo
for _, segID := range partitionInfo.SegmentIDs {
mt.segID2CollID[segID] = collID
mt.segID2PartitionID[segID] = partitionInfo.PartitionID
}
}
@ -203,14 +206,14 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
mt.indexID2Meta[i.IndexID] = *i
}
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10))
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1 := proto.MarshalTextString(coll)
k2 := path.Join(PartitionMetaPrefix, strconv.FormatInt(part.PartitionID, 10))
k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, coll.ID, part.PartitionID)
v2 := proto.MarshalTextString(part)
meta := map[string]string{k1: v1, k2: v2}
for _, i := range idx {
k := path.Join(IndexMetaPrefix, strconv.FormatInt(i.IndexID, 10))
k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
v := proto.MarshalTextString(i)
meta[k] = v
}
@ -232,11 +235,9 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error {
return fmt.Errorf("can't find collection. id = %d", collID)
}
metaKeys := []string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))}
delete(mt.collID2Meta, collID)
delete(mt.collName2ID, collMeta.Schema.Name)
for _, partID := range collMeta.PartitionIDs {
metaKeys = append(metaKeys, path.Join(PartitionMetaPrefix, strconv.FormatInt(partID, 10)))
partMeta, ok := mt.partitionID2Meta[partID]
if !ok {
log.Warn("partition id not exist", zap.Int64("partition id", partID))
@ -244,26 +245,29 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error {
}
delete(mt.partitionID2Meta, partID)
for _, segID := range partMeta.SegmentIDs {
segIndexMeta, ok := mt.segID2IndexMeta[segID]
_, ok := mt.segID2IndexMeta[segID]
if !ok {
log.Warn("segment id not exist", zap.Int64("segment id", segID))
continue
}
delete(mt.segID2IndexMeta, segID)
for indexID, segIdxMeta := range *segIndexMeta {
metaKeys = append(metaKeys, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(indexID, 10)))
indexMeta, ok := mt.indexID2Meta[segIdxMeta.IndexID]
if !ok {
//log.Printf("index id = %d not exist", segIdxMeta.IndexID) // segments may share index id
continue
}
delete(mt.indexID2Meta, segIdxMeta.IndexID)
metaKeys = append(metaKeys, path.Join(IndexMetaPrefix, strconv.FormatInt(indexMeta.IndexID, 10)))
}
}
}
err := mt.client.MultiRemove(metaKeys)
for _, idxInfo := range collMeta.FieldIndexes {
_, ok := mt.indexID2Meta[idxInfo.IndexID]
if !ok {
log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
continue
}
delete(mt.indexID2Meta, idxInfo.IndexID)
}
metas := []string{
fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
fmt.Sprintf("%s/%d", PartitionMetaPrefix, collID),
fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
}
err := mt.client.MultiRemoveWithPrefix(metas)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -370,9 +374,9 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
mt.collID2Meta[collID] = coll
mt.partitionID2CollID[partitionID] = collID
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(coll.ID, 10))
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1 := proto.MarshalTextString(&coll)
k2 := path.Join(PartitionMetaPrefix, strconv.FormatInt(partitionID, 10))
k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, partitionID)
v2 := proto.MarshalTextString(&partMeta)
meta := map[string]string{k1: v1, k2: v2}
@ -441,27 +445,24 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
collMeta.PartitionIDs = pd
mt.collID2Meta[collID] = collMeta
delMetaKeys := []string{path.Join(PartitionMetaPrefix, strconv.FormatInt(partMeta.PartitionID, 10))}
for _, segID := range partMeta.SegmentIDs {
segIndexMeta, ok := mt.segID2IndexMeta[segID]
_, ok := mt.segID2IndexMeta[segID]
if !ok {
log.Warn("segment has no index meta", zap.Int64("segment id", segID))
continue
}
delete(mt.segID2IndexMeta, segID)
for indexID, segIdxMeta := range *segIndexMeta {
delMetaKeys = append(delMetaKeys, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(indexID, 10)))
indexMeta, ok := mt.indexID2Meta[segIdxMeta.IndexID]
if !ok {
log.Warn("index id not exist", zap.Int64("index id", segIdxMeta.IndexID))
continue
}
delete(mt.indexID2Meta, segIdxMeta.IndexID)
delMetaKeys = append(delMetaKeys, path.Join(IndexMetaPrefix, strconv.FormatInt(indexMeta.IndexID, 10)))
}
}
collKV := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
err := mt.client.MultiSaveAndRemove(collKV, delMetaKeys)
delMetaKeys := []string{
fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID),
}
for _, idxInfo := range collMeta.FieldIndexes {
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partMeta.PartitionID)
delMetaKeys = append(delMetaKeys, k)
}
err := mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys)
if err != nil {
_ = mt.reloadFromKV()
@ -513,7 +514,11 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.SegmentID)
mt.partitionID2Meta[seg.PartitionID] = partMeta
mt.segID2CollID[seg.SegmentID] = seg.CollectionID
err := mt.client.Save(path.Join(PartitionMetaPrefix, strconv.FormatInt(seg.PartitionID, 10)), proto.MarshalTextString(&partMeta))
mt.segID2PartitionID[seg.SegmentID] = seg.PartitionID
k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID)
v := proto.MarshalTextString(&partMeta)
err := mt.client.Save(k, v)
if err != nil {
_ = mt.reloadFromKV()
@ -544,6 +549,10 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error {
if !ok {
return fmt.Errorf("collection id = %d not found", collID)
}
partID, ok := mt.segID2PartitionID[seg.SegmentID]
if !ok {
return fmt.Errorf("segment id = %d not belong to any partition", seg.SegmentID)
}
exist := false
for _, i := range collMeta.FieldIndexes {
if i.IndexID == seg.IndexID {
@ -556,7 +565,7 @@ func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error {
}
(*(mt.segID2IndexMeta[seg.SegmentID]))[seg.IndexID] = *seg
k := path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(seg.SegmentID, 10), strconv.FormatInt(seg.IndexID, 10))
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, seg.IndexID, partID, seg.SegmentID)
v := proto.MarshalTextString(seg)
err := mt.client.Save(k, v)
@ -613,7 +622,6 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
delete(mt.indexID2Meta, dropIdxID)
delMeta := []string{path.Join(IndexMetaPrefix, strconv.FormatInt(dropIdxID, 10))}
for _, partID := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[partID]
@ -627,13 +635,16 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
_, ok := (*segInfo)[dropIdxID]
if ok {
delete(*segInfo, dropIdxID)
delMeta = append(delMeta, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(dropIdxID, 10)))
}
}
}
}
delMeta := []string{
fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
}
err = mt.client.MultiSaveAndRemove(saveMeta, delMeta)
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
if err != nil {
_ = mt.reloadFromKV()
return 0, false, err