diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 5fa1729638..9386ee44db 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -65,7 +65,7 @@ type MetaKv interface { type SnapShotKV interface { Save(key string, value string, ts typeutil.Timestamp) error Load(key string, ts typeutil.Timestamp) (string, error) - MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error + MultiSave(kvs map[string]string, ts typeutil.Timestamp) error LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) - MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error } diff --git a/internal/rootcoord/meta_snapshot.go b/internal/rootcoord/meta_snapshot.go index fc1e219649..48032d9aa0 100644 --- a/internal/rootcoord/meta_snapshot.go +++ b/internal/rootcoord/meta_snapshot.go @@ -322,26 +322,18 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) return string(resp.Kvs[0].Value), nil } -func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { +func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - ops := make([]clientv3.Op, 0, len(kvs)+2) + ops := make([]clientv3.Op, 0, len(kvs)+1) for key, value := range kvs { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } strTs := strconv.FormatInt(int64(ts), 10) - for _, addition := range additions { - if addition == nil { - continue - } - if k, v, e := addition(ts); e == nil { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v)) - } - } ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() if err != nil { @@ -350,6 +342,7 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, ms.putTs(resp.Header.Revision, ts) return nil } + func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { ms.lock.RLock() defer ms.lock.RUnlock() @@ -387,26 +380,18 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str return keys, values, nil } -func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { +func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2) + ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } strTs := strconv.FormatInt(int64(ts), 10) - for _, addition := range additions { - if addition == nil { - continue - } - if k, v, e := addition(ts); e == nil { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v)) - } - } for _, key := range removals { ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix())) } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 40bf4bc29a..6e6246cf3d 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -220,20 +220,6 @@ func (mt *MetaTable) reloadFromKV() error { return nil } -func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) { - if op == nil { - return nil - } - meta[DDMsgSendPrefix] = "false" - return func(ts typeutil.Timestamp) (string, string, error) { - val, err := op(ts) - if err != nil { - return "", "", err - } - return DDOperationPrefix, val, nil - } -} - // AddTenant add tenant func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { mt.tenantLock.Lock() @@ -277,7 +263,7 @@ func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { } // AddCollection add collection -func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -293,11 +279,24 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam return fmt.Errorf("incorrect index id when creating collection") } + coll.CreateTime = ts + if len(coll.PartitionCreatedTimestamps) == 1 { + coll.PartitionCreatedTimestamps[0] = ts + } + mt.collID2Meta[coll.ID] = *coll + mt.collName2ID[coll.Schema.Name] = coll.ID for _, i := range idx { mt.indexID2Meta[i.IndexID] = *i } - meta := make(map[string]string) + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID) + v1, err := proto.Marshal(coll) + if err != nil { + log.Error("MetaTable AddCollection saveColl Marshal fail", + zap.String("key", k1), zap.Error(err)) + return fmt.Errorf("MetaTable AddCollection Marshal fail key:%s, err:%w", k1, err) + } + meta := map[string]string{k1: string(v1)} for _, i := range idx { k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID) @@ -311,26 +310,10 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam } // save ddOpStr into etcd - addition := mt.getAdditionKV(ddOpStr, meta) - saveColl := func(ts typeutil.Timestamp) (string, string, error) { - coll.CreateTime = ts - if len(coll.PartitionCreatedTimestamps) == 1 { - coll.PartitionCreatedTimestamps[0] = ts - } - mt.collID2Meta[coll.ID] = *coll - mt.collName2ID[coll.Schema.Name] = coll.ID - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID) - v1, err := proto.Marshal(coll) - if err != nil { - log.Error("MetaTable AddCollection saveColl Marshal fail", - zap.String("key", k1), zap.Error(err)) - return "", "", fmt.Errorf("MetaTable AddCollection saveColl Marshal fail key:%s, err:%w", k1, err) - } - meta[k1] = string(v1) - return k1, string(v1), nil - } + meta[DDMsgSendPrefix] = "false" + meta[DDOperationPrefix] = ddOpStr - err := mt.client.MultiSave(meta, ts, addition, saveColl) + err = mt.client.MultiSave(meta, ts) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") @@ -340,7 +323,7 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam } // DeleteCollection delete collection -func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -396,9 +379,12 @@ func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time } // save ddOpStr into etcd - var saveMeta = map[string]string{} - addition := mt.getAdditionKV(ddOpStr, saveMeta) - err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition) + var saveMeta = map[string]string{ + DDMsgSendPrefix: "false", + DDOperationPrefix: ddOpStr, + } + + err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts) if err != nil { log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err)) panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail") @@ -551,7 +537,7 @@ func (mt *MetaTable) ListCollectionPhysicalChannels() []string { } // AddPartition add partition -func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] @@ -585,30 +571,26 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string } // no necessary to check created timestamp } - meta := make(map[string]string) + + coll.PartitionIDs = append(coll.PartitionIDs, partitionID) + coll.PartitionNames = append(coll.PartitionNames, partitionName) + coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts) + mt.collID2Meta[collID] = coll + + k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) + v1, err := proto.Marshal(&coll) + if err != nil { + log.Error("MetaTable AddPartition saveColl Marshal fail", + zap.String("key", k1), zap.Error(err)) + return fmt.Errorf("MetaTable AddPartition Marshal fail, k1:%s, err:%w", k1, err) + } + meta := map[string]string{k1: string(v1)} // save ddOpStr into etcd - addition := mt.getAdditionKV(ddOpStr, meta) + meta[DDMsgSendPrefix] = "false" + meta[DDOperationPrefix] = ddOpStr - saveColl := func(ts typeutil.Timestamp) (string, string, error) { - coll.PartitionIDs = append(coll.PartitionIDs, partitionID) - coll.PartitionNames = append(coll.PartitionNames, partitionName) - coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts) - mt.collID2Meta[collID] = coll - - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) - v1, err := proto.Marshal(&coll) - if err != nil { - log.Error("MetaTable AddPartition saveColl Marshal fail", - zap.String("key", k1), zap.Error(err)) - return "", "", fmt.Errorf("MetaTable AddPartition saveColl Marshal fail, k1:%s, err:%w", k1, err) - } - meta[k1] = string(v1) - - return k1, string(v1), nil - } - - err := mt.client.MultiSave(meta, ts, addition, saveColl) + err = mt.client.MultiSave(meta, ts) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") @@ -697,7 +679,7 @@ func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string } // DeletePartition delete partition -func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) { +func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr string) (typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -758,9 +740,10 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } // save ddOpStr into etcd - addition := mt.getAdditionKV(ddOpStr, meta) + meta[DDMsgSendPrefix] = "false" + meta[DDOperationPrefix] = ddOpStr - err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition) + err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts) if err != nil { log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err)) panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail") diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index f1ab3d1b70..7b66e27628 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -31,8 +31,8 @@ type mockTestKV struct { loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error) save func(key, value string, ts typeutil.Timestamp) error - multiSave func(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error - multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error + multiSave func(kvs map[string]string, ts typeutil.Timestamp) error + multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp) error } func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { @@ -46,12 +46,12 @@ func (m *mockTestKV) Save(key, value string, ts typeutil.Timestamp) error { return m.save(key, value, ts) } -func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { - return m.multiSave(kvs, ts, additions...) +func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { + return m.multiSave(kvs, ts) } -func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { - return m.multiSaveAndRemoveWithPrefix(saves, removals, ts, additions...) +func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return m.multiSaveAndRemoveWithPrefix(saves, removals, ts) } func Test_MockKV(t *testing.T) { @@ -251,16 +251,12 @@ func TestMetaTable(t *testing.T) { }, } - ddOp := func(ts typeutil.Timestamp) (string, error) { - return "", nil - } - t.Run("add collection", func(t *testing.T) { ts := ftso() - err = mt.AddCollection(collInfo, ts, nil, ddOp) + err = mt.AddCollection(collInfo, ts, nil, "") assert.NotNil(t, err) - err = mt.AddCollection(collInfo, ts, idxInfo, ddOp) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) assert.Equal(t, uint64(1), ts) @@ -311,7 +307,7 @@ func TestMetaTable(t *testing.T) { t.Run("add partition", func(t *testing.T) { ts := ftso() - err = mt.AddPartition(collID, partName, partID, ts, ddOp) + err = mt.AddPartition(collID, partName, partID, ts, "") assert.Nil(t, err) //assert.Equal(t, ts, uint64(2)) @@ -467,7 +463,7 @@ func TestMetaTable(t *testing.T) { t.Run("drop partition", func(t *testing.T) { ts := ftso() - id, err := mt.DeletePartition(collID, partName, ts, nil) + id, err := mt.DeletePartition(collID, partName, ts, "") assert.Nil(t, err) assert.Equal(t, partID, id) @@ -479,12 +475,12 @@ func TestMetaTable(t *testing.T) { t.Run("drop collection", func(t *testing.T) { ts := ftso() - err = mt.DeleteCollection(collIDInvalid, ts, nil) + err = mt.DeleteCollection(collIDInvalid, ts, "") assert.NotNil(t, err) ts2 := ftso() err = mt.AddAlias(aliasName2, collName, ts2) assert.Nil(t, err) - err = mt.DeleteCollection(collID, ts, nil) + err = mt.DeleteCollection(collID, ts, "") assert.Nil(t, err) ts3 := ftso() err = mt.DropAlias(aliasName2, ts3) @@ -504,39 +500,27 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save error") } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, nil) }) + assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, "") }) //err = mt.AddCollection(collInfo, 0, idxInfo, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") }) t.Run("delete collection failed", func(t *testing.T) { - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } - mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save and remove with prefix error") } - collInfo.PartitionIDs = nil - collInfo.PartitionNames = nil - collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) - assert.Nil(t, err) - mt.indexID2Meta = make(map[int64]pb.IndexInfo) - ts = ftso() - assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, nil) }) + assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, "") }) //err = mt.DeleteCollection(collInfo.ID, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save and remove with prefix error") @@ -551,7 +535,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -575,18 +559,18 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) ts = ftso() - err = mt.AddPartition(2, "no-part", 22, ts, nil) + err = mt.AddPartition(2, "no-part", 22, ts, "") assert.NotNil(t, err) assert.EqualError(t, err, "can't find collection. id = 2") coll := mt.collID2Meta[collInfo.ID] coll.PartitionIDs = make([]int64, Params.MaxPartitionNum) mt.collID2Meta[coll.ID] = coll - err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil) + err = mt.AddPartition(coll.ID, "no-part", 22, ts, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)) @@ -594,20 +578,15 @@ func TestMetaTable(t *testing.T) { coll.PartitionNames = []string{partName} coll.PartitionCreatedTimestamps = []uint64{ftso()} mt.collID2Meta[coll.ID] = coll - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save error") } - assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, nil) }) + assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, "") }) //err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } collInfo.PartitionIDs = nil @@ -619,10 +598,10 @@ func TestMetaTable(t *testing.T) { //_, err = mt.AddPartition(coll.ID, partName, partID, nil) //assert.Nil(t, err) ts = ftso() - err = mt.AddPartition(coll.ID, partName, 22, ts, nil) + err = mt.AddPartition(coll.ID, partName, 22, ts, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partName)) - err = mt.AddPartition(coll.ID, "no-part", partID, ts, nil) + err = mt.AddPartition(coll.ID, "no-part", partID, ts, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partID)) }) @@ -631,7 +610,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } err := mt.reloadFromKV() @@ -641,7 +620,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) assert.False(t, mt.HasPartition(collInfo.ID, "no-partName", 0)) @@ -654,12 +633,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } err := mt.reloadFromKV() @@ -669,28 +643,28 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = []string{partName} collInfo.PartitionCreatedTimestamps = []uint64{ftso()} ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) ts = ftso() - _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, nil) + _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, "") assert.NotNil(t, err) assert.EqualError(t, err, "default partition cannot be deleted") - _, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil) + _, err = mt.DeletePartition(collInfo.ID, "abc", ts, "") assert.NotNil(t, err) assert.EqualError(t, err, "partition abc does not exist") - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save and remove with prefix error") } - assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, nil) }) + assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, "") }) //_, err = mt.DeletePartition(collInfo.ID, partName, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save and remove with prefix error") mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil) + _, err = mt.DeletePartition(collInfo.ID, "abc", ts, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID)) }) @@ -699,12 +673,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key, value string, ts typeutil.Timestamp) error { @@ -717,7 +686,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) segIdxInfo := pb.SegmentIndexInfo{ @@ -745,7 +714,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts = ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) segIdxInfo.IndexID = indexID @@ -763,12 +732,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { @@ -781,7 +745,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) ts = ftso() @@ -823,9 +787,9 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil coll.PartitionCreatedTimestamps = nil ts = ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save and remove with prefix error") } ts = ftso() @@ -839,12 +803,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key, value string, ts typeutil.Timestamp) error { @@ -857,7 +816,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) seg, err := mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc") @@ -894,7 +853,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { @@ -907,7 +866,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -966,12 +925,7 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { @@ -984,7 +938,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil, 0) @@ -998,7 +952,7 @@ func TestMetaTable(t *testing.T) { assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID)) mt.indexID2Meta = bakMeta - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save error") } assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) }) @@ -1006,12 +960,7 @@ func TestMetaTable(t *testing.T) { //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } collInfo.PartitionIDs = nil @@ -1037,7 +986,7 @@ func TestMetaTable(t *testing.T) { mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx idx.IndexName = idxInfo[0].IndexName - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save error") } assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) }) @@ -1058,12 +1007,7 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { - for _, a := range addition { - if a != nil { - a(ts) - } - } + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { @@ -1076,7 +1020,7 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil ts := ftso() - err = mt.AddCollection(collInfo, ts, idxInfo, nil) + err = mt.AddCollection(collInfo, ts, idxInfo, "") assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) _, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName) @@ -1133,7 +1077,7 @@ func TestMetaWithTimestamp(t *testing.T) { collInfo.PartitionNames = []string{partName1} collInfo.PartitionCreatedTimestamps = []uint64{ftso()} t1 := ftso() - err = mt.AddCollection(collInfo, t1, nil, nil) + err = mt.AddCollection(collInfo, t1, nil, "") assert.Nil(t, err) collInfo.ID = 2 @@ -1143,7 +1087,7 @@ func TestMetaWithTimestamp(t *testing.T) { collInfo.Schema.Name = collName2 t2 := ftso() - err = mt.AddCollection(collInfo, t2, nil, nil) + err = mt.AddCollection(collInfo, t2, nil, "") assert.Nil(t, err) assert.True(t, mt.HasCollection(collID1, 0)) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 2d89514166..e03980bf34 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -362,19 +362,20 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 PhysicalChannelNames: chanNames, } - // build DdOperation and save it into etcd, when ddmsg send fail, - // system can restore ddmsg from etcd and re-send - ddOp := func(ts typeutil.Timestamp) (string, error) { - ddCollReq.Base.Timestamp = ts - return EncodeDdOperation(&ddCollReq, CreateCollectionDDType) - } - reason := fmt.Sprintf("create collection %d", collID) ts, err := core.TSOAllocator(1) if err != nil { return fmt.Errorf("TSO alloc fail, error = %w", err) } + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddCollReq.Base.Timestamp = ts + ddOpStr, err := EncodeDdOperation(&ddCollReq, CreateCollectionDDType) + if err != nil { + return fmt.Errorf("EncodeDdOperation fail, error = %w", err) + } + // use lambda function here to guarantee all resources to be released createCollectionFn := func() error { // lock for ddl operation @@ -385,7 +386,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 // clear ddl timetick in all conditions defer core.chanTimeTick.RemoveDdlTimeTick(ts, reason) - err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp) + err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr) if err != nil { return fmt.Errorf("meta table add collection failed,error = %w", err) } diff --git a/internal/rootcoord/suffix_snapshot.go b/internal/rootcoord/suffix_snapshot.go index b3b247ef37..056695e644 100644 --- a/internal/rootcoord/suffix_snapshot.go +++ b/internal/rootcoord/suffix_snapshot.go @@ -345,18 +345,9 @@ func (ss *suffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error // MultiSave save muiltple kvs // if ts == 0, act like TxnKV -// additions is executed before process each kvs -// each key-value (including additions result) will be treat in same logic like Save -func (ss *suffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { - // apply additions - for _, addition := range additions { - k, v, err := addition(ts) - if err != nil { - continue - } - kvs[k] = v - } - // if ts == 0, act like TxnKV, with additions executed +// each key-value will be treat in same logic like Save +func (ss *suffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { + // if ts == 0, act like TxnKV if ts == 0 { return ss.TxnKV.MultiSave(kvs) } @@ -487,19 +478,9 @@ func (ss *suffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s // MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well // if ts == 0, act like TxnKV -// additions is executed before process each kvs -// each key-value (including additions result) will be treat in same logic like Save -func (ss *suffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { - // apply additions - for _, addition := range additions { - k, v, err := addition(ts) - if err != nil { - continue - } - saves[k] = v - } - - // if ts == 0, act like TxnKV, with additions executed +// each key-value will be treat in same logic like Save +func (ss *suffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + // if ts == 0, act like TxnKV if ts == 0 { return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals) } diff --git a/internal/rootcoord/suffix_snapshot_test.go b/internal/rootcoord/suffix_snapshot_test.go index 6cce4d62a2..3667a4550d 100644 --- a/internal/rootcoord/suffix_snapshot_test.go +++ b/internal/rootcoord/suffix_snapshot_test.go @@ -1,7 +1,6 @@ package rootcoord import ( - "errors" "fmt" "math/rand" "testing" @@ -314,11 +313,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} vtso = typeutil.Timestamp(100 + i*5) ts := ftso() - err = ss.MultiSave(saves, ts, func(ts typeutil.Timestamp) (string, string, error) { - return "extra", "extra-value", nil - }, func(ts typeutil.Timestamp) (string, string, error) { - return "extra", "extra-value", errors.New("out of range") - }) + err = ss.MultiSave(saves, ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -400,11 +395,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { dm := []string{fmt.Sprintf("kd-%04d", i-20)} vtso = typeutil.Timestamp(100 + i*5) ts := ftso() - err = ss.MultiSaveAndRemoveWithPrefix(sm, dm, ts, func(ts typeutil.Timestamp) (string, string, error) { - return "extra", "extra-value", nil - }, func(ts typeutil.Timestamp) (string, string, error) { - return "extra", "extra-value", errors.New("out of range") - }) + err = ss.MultiSaveAndRemoveWithPrefix(sm, dm, ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 11c2cc76dc..647f6cece9 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -174,19 +174,20 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { PhysicalChannelNames: chanNames, } - // build DdOperation and save it into etcd, when ddmsg send fail, - // system can restore ddmsg from etcd and re-send - ddOp := func(ts typeutil.Timestamp) (string, error) { - ddCollReq.Base.Timestamp = ts - return EncodeDdOperation(&ddCollReq, CreateCollectionDDType) - } - reason := fmt.Sprintf("create collection %d", collID) ts, err := t.core.TSOAllocator(1) if err != nil { return fmt.Errorf("TSO alloc fail, error = %w", err) } + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddCollReq.Base.Timestamp = ts + ddOpStr, err := EncodeDdOperation(&ddCollReq, CreateCollectionDDType) + if err != nil { + return fmt.Errorf("EncodeDdOperation fail, error = %w", err) + } + // use lambda function here to guarantee all resources to be released createCollectionFn := func() error { // lock for ddl operation @@ -210,7 +211,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { Data: ids[pchan], }) } - err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp) + err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr) if err != nil { t.core.dmlChannels.RemoveProducerChannels(chanNames...) // it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic @@ -264,19 +265,20 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { CollectionID: collMeta.ID, } - // build DdOperation and save it into etcd, when ddmsg send fail, - // system can restore ddmsg from etcd and re-send - ddOp := func(ts typeutil.Timestamp) (string, error) { - ddReq.Base.Timestamp = ts - return EncodeDdOperation(&ddReq, DropCollectionDDType) - } - reason := fmt.Sprintf("drop collection %d", collMeta.ID) ts, err := t.core.TSOAllocator(1) if err != nil { return fmt.Errorf("TSO alloc fail, error = %w", err) } + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddReq.Base.Timestamp = ts + ddOpStr, err := EncodeDdOperation(&ddReq, DropCollectionDDType) + if err != nil { + return fmt.Errorf("EncodeDdOperation fail, error = %w", err) + } + aliases := t.core.MetaTable.ListAliases(collMeta.ID) // use lambda function here to guarantee all resources to be released @@ -289,7 +291,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) - err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOp) + err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr) if err != nil { return err } @@ -493,19 +495,20 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { PartitionID: partID, } - // build DdOperation and save it into etcd, when ddmsg send fail, - // system can restore ddmsg from etcd and re-send - ddOp := func(ts typeutil.Timestamp) (string, error) { - ddReq.Base.Timestamp = ts - return EncodeDdOperation(&ddReq, CreatePartitionDDType) - } - reason := fmt.Sprintf("create partition %s", t.Req.PartitionName) ts, err := t.core.TSOAllocator(1) if err != nil { return fmt.Errorf("TSO alloc fail, error = %w", err) } + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddReq.Base.Timestamp = ts + ddOpStr, err := EncodeDdOperation(&ddReq, CreatePartitionDDType) + if err != nil { + return fmt.Errorf("EncodeDdOperation fail, error = %w", err) + } + // use lambda function here to guarantee all resources to be released createPartitionFn := func() error { // lock for ddl operation @@ -516,7 +519,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) - err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOp) + err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr) if err != nil { return err } @@ -588,19 +591,20 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { PartitionID: partID, } - // build DdOperation and save it into etcd, when ddmsg send fail, - // system can restore ddmsg from etcd and re-send - ddOp := func(ts typeutil.Timestamp) (string, error) { - ddReq.Base.Timestamp = ts - return EncodeDdOperation(&ddReq, DropPartitionDDType) - } - reason := fmt.Sprintf("drop partition %s", t.Req.PartitionName) ts, err := t.core.TSOAllocator(1) if err != nil { return fmt.Errorf("TSO alloc fail, error = %w", err) } + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddReq.Base.Timestamp = ts + ddOpStr, err := EncodeDdOperation(&ddReq, DropPartitionDDType) + if err != nil { + return fmt.Errorf("EncodeDdOperation fail, error = %w", err) + } + // use lambda function here to guarantee all resources to be released dropPartitionFn := func() error { // lock for ddl operation @@ -611,7 +615,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) - _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOp) + _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr) if err != nil { return err }