From 305fa9c17eb3870ebc23806dcfd874e314f0d13a Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 18 Aug 2021 14:36:10 +0800 Subject: [PATCH] Fix timestamp go back issue for timeticksync (#7123) Signed-off-by: yudong.cai --- .../distributed/rootcoord/service_test.go | 2 +- internal/kv/kv.go | 6 +- internal/rootcoord/dml_channels.go | 2 +- internal/rootcoord/meta_snapshot.go | 55 ++-- internal/rootcoord/meta_snapshot_test.go | 31 +- internal/rootcoord/meta_table.go | 98 +++--- internal/rootcoord/meta_table_test.go | 302 ++++++++++-------- internal/rootcoord/root_coord.go | 52 +-- internal/rootcoord/task.go | 184 ++++++++--- internal/rootcoord/timeticksync.go | 99 +++++- 10 files changed, 514 insertions(+), 317 deletions(-) diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 6cdcbd674e..ca43ecdfe2 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -167,7 +167,7 @@ func TestGrpcService(t *testing.T) { timeTickArray := make([]typeutil.Timestamp, 0, 16) timeTickLock := sync.Mutex{} - core.SendTimeTick = func(ts typeutil.Timestamp) error { + core.SendTimeTick = func(ts typeutil.Timestamp, reason string) error { timeTickLock.Lock() defer timeTickLock.Unlock() t.Logf("send time tick %d", ts) diff --git a/internal/kv/kv.go b/internal/kv/kv.go index cb68a12090..37ff9e278b 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -36,9 +36,9 @@ type TxnKV interface { } type SnapShotKV interface { - Save(key, value string) (typeutil.Timestamp, error) + Save(key string, value string, ts typeutil.Timestamp) error Load(key string, ts typeutil.Timestamp) (string, error) - MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) + MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) - MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error } diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 3019e46e40..18abbc2484 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -146,7 +146,7 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) { defer d.lock.Unlock() for _, name := range names { - log.Debug("delete dml channel", zap.String("channel name", name)) + //log.Debug("delete dml channel", zap.String("channel name", name)) if ds, ok := d.dml[name]; ok { ds.valid = false } diff --git a/internal/rootcoord/meta_snapshot.go b/internal/rootcoord/meta_snapshot.go index affad8e66e..62acd54ae2 100644 --- a/internal/rootcoord/meta_snapshot.go +++ b/internal/rootcoord/meta_snapshot.go @@ -35,11 +35,10 @@ type rtPair struct { } type metaSnapshot struct { - cli *clientv3.Client - root string - tsKey string - lock sync.RWMutex - timeAllactor func() typeutil.Timestamp + cli *clientv3.Client + root string + tsKey string + lock sync.RWMutex ts2Rev []rtPair minPos int @@ -47,20 +46,19 @@ type metaSnapshot struct { numTs int } -func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int, timeAllactor func() typeutil.Timestamp) (*metaSnapshot, error) { +func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*metaSnapshot, error) { if bufSize <= 0 { bufSize = 1024 } ms := &metaSnapshot{ - cli: cli, - root: root, - tsKey: tsKey, - lock: sync.RWMutex{}, - timeAllactor: timeAllactor, - ts2Rev: make([]rtPair, bufSize), - minPos: 0, - maxPos: 0, - numTs: 0, + cli: cli, + root: root, + tsKey: tsKey, + lock: sync.RWMutex{}, + ts2Rev: make([]rtPair, bufSize), + minPos: 0, + maxPos: 0, + numTs: 0, } if err := ms.loadTs(); err != nil { return nil, err @@ -263,24 +261,23 @@ func (ms *metaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) { return 0, fmt.Errorf("can't find revision on ts=%d", ts) } -func (ms *metaSnapshot) Save(key, value string) (typeutil.Timestamp, error) { +func (ms *metaSnapshot) Save(key, value string, ts typeutil.Timestamp) error { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - ts := ms.timeAllactor() strTs := strconv.FormatInt(int64(ts), 10) resp, err := ms.cli.Txn(ctx).If().Then( clientv3.OpPut(path.Join(ms.root, key), value), clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs), ).Commit() if err != nil { - return 0, err + return err } ms.putTs(resp.Header.Revision, ts) - return ts, nil + return nil } func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { @@ -313,18 +310,18 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) return string(resp.Kvs[0].Value), nil } -func (ms *metaSnapshot) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { +func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - ts := ms.timeAllactor() - strTs := strconv.FormatInt(int64(ts), 10) ops := make([]clientv3.Op, 0, len(kvs)+2) for key, value := range kvs { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } + + strTs := strconv.FormatInt(int64(ts), 10) for _, addition := range additions { if addition == nil { continue @@ -336,10 +333,10 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string, additions ...func(ts ty ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() if err != nil { - return 0, err + return err } ms.putTs(resp.Header.Revision, ts) - return ts, nil + return nil } func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { ms.lock.RLock() @@ -378,18 +375,18 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str return keys, values, nil } -func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { +func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - ts := ms.timeAllactor() - strTs := strconv.FormatInt(int64(ts), 10) ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } + + strTs := strconv.FormatInt(int64(ts), 10) for _, addition := range additions { if addition == nil { continue @@ -404,8 +401,8 @@ func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, re ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() if err != nil { - return 0, err + return err } ms.putTs(resp.Header.Revision, ts) - return ts, nil + return nil } diff --git a/internal/rootcoord/meta_snapshot_test.go b/internal/rootcoord/meta_snapshot_test.go index 27b5720a3f..1a93cc17d6 100644 --- a/internal/rootcoord/meta_snapshot_test.go +++ b/internal/rootcoord/meta_snapshot_test.go @@ -41,20 +41,21 @@ func TestMetaSnapshot(t *testing.T) { return vtso } - ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso) + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4) assert.Nil(t, err) assert.NotNil(t, ms) for i := 0; i < 8; i++ { vtso = typeutil.Timestamp(100 + i) - ts, err := ms.Save("abc", fmt.Sprintf("value-%d", i)) + ts := ftso() + err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) _, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i)) assert.Nil(t, err) } - ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso) + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4) assert.Nil(t, err) assert.NotNil(t, ms) } @@ -224,13 +225,14 @@ func TestLoad(t *testing.T) { return vtso } - ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7) assert.Nil(t, err) assert.NotNil(t, ms) for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.Save("key", fmt.Sprintf("value-%d", i)) + ts := ftso() + err = ms.Save("key", fmt.Sprintf("value-%d", i), ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -243,7 +245,7 @@ func TestLoad(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "value-19", val) - ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11) assert.Nil(t, err) assert.NotNil(t, ms) @@ -271,14 +273,15 @@ func TestMultiSave(t *testing.T) { return vtso } - ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7) assert.Nil(t, err) assert.NotNil(t, ms) for i := 0; i < 20; i++ { saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.MultiSave(saves) + ts := ftso() + err = ms.MultiSave(saves, ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -301,7 +304,7 @@ func TestMultiSave(t *testing.T) { assert.Equal(t, vals[0], "v1-19") assert.Equal(t, vals[1], "v2-19") - ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11) assert.Nil(t, err) assert.NotNil(t, ms) @@ -334,13 +337,14 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { } defer etcdCli.Close() - ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7) assert.Nil(t, err) assert.NotNil(t, ms) for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i)) + ts := ftso() + err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -348,7 +352,8 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} dm := []string{fmt.Sprintf("kd-%04d", i-20)} vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm) + ts := ftso() + err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -370,7 +375,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { assert.Equal(t, 39-i, len(vals)) } - ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11) assert.Nil(t, err) assert.NotNil(t, ms) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 1838184f90..7648067b7b 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -191,52 +191,52 @@ func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error } } -func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) { +func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { mt.tenantLock.Lock() defer mt.tenantLock.Unlock() k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID) v := proto.MarshalTextString(te) - ts, err := mt.client.Save(k, v) + err := mt.client.Save(k, v, ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") } mt.tenantID2Meta[te.ID] = *te - return ts, nil + return nil } -func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) { +func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { mt.proxyLock.Lock() defer mt.proxyLock.Unlock() k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID) v := proto.MarshalTextString(po) - ts, err := mt.client.Save(k, v) + err := mt.client.Save(k, v, ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") } mt.proxyID2Meta[po.ID] = *po - return ts, nil + return nil } -func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { +func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() if len(coll.PartitionIDs) != len(coll.PartitionNames) || len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) || (len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) { - return 0, fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection") + return fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection") } if _, ok := mt.collName2ID[coll.Schema.Name]; ok { - return 0, fmt.Errorf("collection %s exist", coll.Schema.Name) + return fmt.Errorf("collection %s exist", coll.Schema.Name) } if len(coll.FieldIndexes) != len(idx) { - return 0, fmt.Errorf("incorrect index id when creating collection") + return fmt.Errorf("incorrect index id when creating collection") } for _, i := range idx { @@ -266,22 +266,22 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo, return k1, v1, nil } - ts, err := mt.client.MultiSave(meta, addition, saveColl) + err := mt.client.MultiSave(meta, ts, addition, saveColl) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") } - return ts, nil + return nil } -func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { +func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() collMeta, ok := mt.collID2Meta[collID] if !ok { - return 0, fmt.Errorf("can't find collection. id = %d", collID) + return fmt.Errorf("can't find collection. id = %d", collID) } delete(mt.collID2Meta, collID) @@ -319,13 +319,13 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts // save ddOpStr into etcd var saveMeta = map[string]string{} addition := mt.getAdditionKV(ddOpStr, saveMeta) - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition) + err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition) if err != nil { log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err)) panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail") } - return ts, nil + return nil } func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool { @@ -452,37 +452,37 @@ func (mt *metaTable) ListCollectionPhysicalChannels() []string { return plist } -func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { +func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] if !ok { - return 0, fmt.Errorf("can't find collection. id = %d", collID) + return fmt.Errorf("can't find collection. id = %d", collID) } // number of partition tags (except _default) should be limited to 4096 by default if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum { - return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum) + return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum) } if len(coll.PartitionIDs) != len(coll.PartitionNames) { - return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames)) + return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames)) } if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) { - return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps)) + return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps)) } if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) { - return 0, fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps)) + return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps)) } for idx := range coll.PartitionIDs { if coll.PartitionIDs[idx] == partitionID { - return 0, fmt.Errorf("partition id = %d already exists", partitionID) + return fmt.Errorf("partition id = %d already exists", partitionID) } if coll.PartitionNames[idx] == partitionName { - return 0, fmt.Errorf("partition name = %s already exists", partitionName) + return fmt.Errorf("partition name = %s already exists", partitionName) } // no necessary to check created timestamp } @@ -504,12 +504,12 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string return k1, v1, nil } - ts, err := mt.client.MultiSave(meta, addition, saveColl) + err := mt.client.MultiSave(meta, ts, addition, saveColl) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") } - return ts, nil + return nil } func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) { @@ -589,18 +589,17 @@ func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string return err == nil } -//return timestamp, partition id, error -func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) { +func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() if partitionName == Params.DefaultPartitionName { - return 0, 0, fmt.Errorf("default partition cannot be deleted") + return 0, fmt.Errorf("default partition cannot be deleted") } collMeta, ok := mt.collID2Meta[collID] if !ok { - return 0, 0, fmt.Errorf("can't find collection id = %d", collID) + return 0, fmt.Errorf("can't find collection id = %d", collID) } // check tag exists @@ -621,7 +620,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } } if !exist { - return 0, 0, fmt.Errorf("partition %s does not exist", partitionName) + return 0, fmt.Errorf("partition %s does not exist", partitionName) } collMeta.PartitionIDs = pd collMeta.PartitionNames = pn @@ -646,21 +645,21 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str // save ddOpStr into etcd addition := mt.getAdditionKV(ddOpStr, meta) - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition) + err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition) if err != nil { log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err)) panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail") } - return ts, partID, nil + return partID, nil } -func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) { +func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID] if !ok { - return 0, fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID) + return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID) } exist := false for _, fidx := range collMeta.FieldIndexes { @@ -670,7 +669,7 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta } } if !exist { - return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) + return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) } segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] @@ -686,9 +685,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { if segIdxInfo.BuildID == tmpInfo.BuildID { log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) - return 0, nil + return nil } - return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) + return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) } } } @@ -699,31 +698,31 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID) v := proto.MarshalTextString(segIdxInfo) - ts, err := mt.client.Save(k, v) + err := mt.client.Save(k, v, ts) if err != nil { log.Error("SnapShotKV Save fail", zap.Error(err)) panic("SnapShotKV Save fail") } - return ts, nil + return nil } //return timestamp, index id, is dropped, error -func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) { +func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() collID, ok := mt.collName2ID[collName] if !ok { - return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName) + return 0, false, fmt.Errorf("collection name = %s not exist", collName) } collMeta, ok := mt.collID2Meta[collID] if !ok { - return 0, 0, false, fmt.Errorf("collection name = %s not has meta", collName) + return 0, false, fmt.Errorf("collection name = %s not has meta", collName) } fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName) if err != nil { - return 0, 0, false, err + return 0, false, err } fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes)) var dropIdxID typeutil.UniqueID @@ -748,7 +747,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. } if len(fieldIdxInfo) == len(collMeta.FieldIndexes) { log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName)) - return 0, 0, false, nil + return 0, false, nil } collMeta.FieldIndexes = fieldIdxInfo mt.collID2Meta[collID] = collMeta @@ -772,13 +771,13 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID), } - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta) + err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts) if err != nil { log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err)) panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail") } - return ts, dropIdxID, true, nil + return dropIdxID, true, nil } func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) { @@ -946,12 +945,11 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - _, err = mt.client.MultiSave(meta) + err = mt.client.MultiSave(meta, 0) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") } - } else { idxInfo.IndexID = existInfo.IndexID if existInfo.IndexName != idxInfo.IndexName { //replace index name @@ -969,7 +967,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - _, err = mt.client.MultiSave(meta) + err = mt.client.MultiSave(meta, 0) if err != nil { log.Error("SnapShotKV MultiSave fail", zap.Error(err)) panic("SnapShotKV MultiSave fail") diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 3ddecaffc8..0c2ab8c002 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -30,9 +30,9 @@ type mockTestKV struct { kv.TxnKV loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error) - save func(key, value string) (typeutil.Timestamp, error) - multiSave func(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) - multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) + save func(key, value string, ts typeutil.Timestamp) error + multiSave func(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error + multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error } func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { @@ -42,16 +42,16 @@ func (m *mockTestKV) Load(key string, ts typeutil.Timestamp) (string, error) { return "", nil } -func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) { - return m.save(key, value) +func (m *mockTestKV) Save(key, value string, ts typeutil.Timestamp) error { + return m.save(key, value, ts) } -func (m *mockTestKV) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return m.multiSave(kvs, additions...) +func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { + return m.multiSave(kvs, ts, additions...) } -func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return m.multiSaveAndRemoveWithPrefix(saves, removals, additions...) +func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error { + return m.multiSaveAndRemoveWithPrefix(saves, removals, ts, additions...) } func Test_MockKV(t *testing.T) { @@ -118,19 +118,19 @@ func Test_MockKV(t *testing.T) { m1, err := NewMetaTable(k1) assert.Nil(t, err) - k1.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("save tenant error") + k1.save = func(key string, value string, ts typeutil.Timestamp) error { + return fmt.Errorf("save tenant error") } - assert.Panics(t, func() { m1.AddTenant(&pb.TenantMeta{}) }) - //_, err = m1.AddTenant(&pb.TenantMeta{}) + assert.Panics(t, func() { m1.AddTenant(&pb.TenantMeta{}, 0) }) + //err = m1.AddTenant(&pb.TenantMeta{}, 0) //assert.NotNil(t, err) //assert.EqualError(t, err, "save tenant error") - k1.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("save proxy error") + k1.save = func(key string, value string, ts typeutil.Timestamp) error { + return fmt.Errorf("save proxy error") } - assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}) }) - //_, err = m1.AddProxy(&pb.ProxyMeta{}) + assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}, 0) }) + //err = m1.AddProxy(&pb.ProxyMeta{}, 0) //assert.NotNil(t, err) //assert.EqualError(t, err, "save proxy error") } @@ -167,7 +167,7 @@ func TestMetaTable(t *testing.T) { etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() - skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso) + skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7) assert.Nil(t, err) assert.NotNil(t, skv) mt, err := NewMetaTable(skv) @@ -242,17 +242,18 @@ func TestMetaTable(t *testing.T) { } t.Run("add collection", func(t *testing.T) { - _, err = mt.AddCollection(collInfo, nil, ddOp) + ts := ftso() + err = mt.AddCollection(collInfo, ts, nil, ddOp) assert.NotNil(t, err) - ts, err := mt.AddCollection(collInfo, idxInfo, ddOp) + err = mt.AddCollection(collInfo, ts, idxInfo, ddOp) assert.Nil(t, err) - assert.Equal(t, ts, uint64(1)) + assert.Equal(t, uint64(1), ts) collMeta, err := mt.GetCollectionByName("testColl", 0) assert.Nil(t, err) - assert.Equal(t, ts, collMeta.CreateTime) - assert.Equal(t, ts, collMeta.PartitionCreatedTimestamps[0]) + assert.Equal(t, collMeta.CreateTime, ts) + assert.Equal(t, collMeta.PartitionCreatedTimestamps[0], ts) assert.Equal(t, partIDDefault, collMeta.PartitionIDs[0]) assert.Equal(t, 1, len(collMeta.PartitionIDs)) @@ -269,7 +270,8 @@ func TestMetaTable(t *testing.T) { }) t.Run("add partition", func(t *testing.T) { - ts, err := mt.AddPartition(collID, partName, partID, ddOp) + ts := ftso() + err = mt.AddPartition(collID, partName, partID, ts, ddOp) assert.Nil(t, err) assert.Equal(t, ts, uint64(2)) @@ -294,15 +296,15 @@ func TestMetaTable(t *testing.T) { IndexID: indexID, BuildID: buildID, } - _, err := mt.AddIndex(&segIdxInfo) + err = mt.AddIndex(&segIdxInfo, 0) assert.Nil(t, err) // it's legal to add index twice - _, err = mt.AddIndex(&segIdxInfo) + err = mt.AddIndex(&segIdxInfo, 0) assert.Nil(t, err) segIdxInfo.BuildID = 202 - _, err = mt.AddIndex(&segIdxInfo) + err = mt.AddIndex(&segIdxInfo, 0) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID)) }) @@ -388,12 +390,12 @@ func TestMetaTable(t *testing.T) { te := pb.TenantMeta{ ID: 100, } - _, err := mt.AddTenant(&te) + err := mt.AddTenant(&te, 0) assert.Nil(t, err) po := pb.ProxyMeta{ ID: 101, } - _, err = mt.AddProxy(&po) + err = mt.AddProxy(&po, 0) assert.Nil(t, err) _, err = NewMetaTable(skv) @@ -401,12 +403,12 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop index", func(t *testing.T) { - _, idx, ok, err := mt.DropIndex("testColl", "field110", "field110") + idx, ok, err := mt.DropIndex("testColl", "field110", "field110", 0) assert.Nil(t, err) assert.True(t, ok) assert.Equal(t, indexID, idx) - _, _, ok, err = mt.DropIndex("testColl", "field110", "field110-error") + _, ok, err = mt.DropIndex("testColl", "field110", "field110-error", 0) assert.Nil(t, err) assert.False(t, ok) @@ -424,7 +426,8 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop partition", func(t *testing.T) { - _, id, err := mt.DeletePartition(collID, partName, nil) + ts := ftso() + id, err := mt.DeletePartition(collID, partName, ts, nil) assert.Nil(t, err) assert.Equal(t, partID, id) @@ -435,9 +438,10 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop collection", func(t *testing.T) { - _, err = mt.DeleteCollection(collIDInvalid, nil) + ts := ftso() + err = mt.DeleteCollection(collIDInvalid, ts, nil) assert.NotNil(t, err) - _, err = mt.DeleteCollection(collID, nil) + err = mt.DeleteCollection(collID, ts, nil) assert.Nil(t, err) // check DD operation flag @@ -454,52 +458,54 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save error") } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - assert.Panics(t, func() { mt.AddCollection(collInfo, idxInfo, nil) }) - //_, err := mt.AddCollection(collInfo, idxInfo, nil) + assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, nil) }) + //err = mt.AddCollection(collInfo, 0, idxInfo, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") }) t.Run("delete collection failed", func(t *testing.T) { - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save and remove with prefix error") } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err := mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) - assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, nil) }) - //_, err = mt.DeleteCollection(collInfo.ID, nil) + ts = ftso() + assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, nil) }) + //err = mt.DeleteCollection(collInfo.ID, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save and remove with prefix error") }) t.Run("get collection failed", func(t *testing.T) { - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err := mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -510,8 +516,8 @@ func TestMetaTable(t *testing.T) { }) t.Run("add partition failed", func(t *testing.T) { - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil @@ -522,17 +528,19 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) - _, err = mt.AddPartition(2, "no-part", 22, nil) + ts = ftso() + err = mt.AddPartition(2, "no-part", 22, ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, "can't find collection. id = 2") coll := mt.collID2Meta[collInfo.ID] coll.PartitionIDs = make([]int64, Params.MaxPartitionNum) mt.collID2Meta[coll.ID] = coll - _, err = mt.AddPartition(coll.ID, "no-part", 22, nil) + err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)) @@ -540,22 +548,21 @@ func TestMetaTable(t *testing.T) { coll.PartitionNames = []string{partName} coll.PartitionCreatedTimestamps = []uint64{ftso()} mt.collID2Meta[coll.ID] = coll - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save error") } - assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, nil) }) - //_, err = mt.AddPartition(coll.ID, "no-part", 22, nil) + assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, nil) }) + //err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil @@ -565,10 +572,11 @@ func TestMetaTable(t *testing.T) { //assert.Nil(t, err) //_, err = mt.AddPartition(coll.ID, partName, partID, nil) //assert.Nil(t, err) - _, err = mt.AddPartition(coll.ID, partName, 22, nil) + ts = ftso() + err = mt.AddPartition(coll.ID, partName, 22, ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partName)) - _, err = mt.AddPartition(coll.ID, "no-part", partID, nil) + err = mt.AddPartition(coll.ID, "no-part", partID, ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partID)) }) @@ -577,8 +585,8 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, nil + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return nil } err := mt.reloadFromKV() assert.Nil(t, err) @@ -586,7 +594,8 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) assert.False(t, mt.HasPartition(collInfo.ID, "no-partName", 0)) @@ -599,14 +608,13 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } err := mt.reloadFromKV() assert.Nil(t, err) @@ -614,27 +622,29 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = []int64{partID} collInfo.PartitionNames = []string{partName} collInfo.PartitionCreatedTimestamps = []uint64{ftso()} - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) - _, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, nil) + ts = ftso() + _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, "default partition cannot be deleted") - _, _, err = mt.DeletePartition(collInfo.ID, "abc", nil) + _, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, "partition abc does not exist") - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save and remove with prefix error") } - assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, nil) }) - //_, _, err = mt.DeletePartition(collInfo.ID, partName, nil) + assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, nil) }) + //_, err = mt.DeletePartition(collInfo.ID, partName, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save and remove with prefix error") mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, _, err = mt.DeletePartition(collInfo.ID, "abc", nil) + _, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID)) }) @@ -643,25 +653,25 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key, value string, ts typeutil.Timestamp) error { + return nil } - err := mt.reloadFromKV() + err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) segIdxInfo := pb.SegmentIndexInfo{ @@ -672,12 +682,13 @@ func TestMetaTable(t *testing.T) { IndexID: indexID2, BuildID: buildID, } - _, err = mt.AddIndex(&segIdxInfo) + ts = ftso() + err = mt.AddIndex(&segIdxInfo, ts) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID)) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.AddIndex(&segIdxInfo) + err = mt.AddIndex(&segIdxInfo, ts) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID)) @@ -687,15 +698,17 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts = ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) segIdxInfo.IndexID = indexID - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("save error") + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return fmt.Errorf("save error") } - assert.Panics(t, func() { mt.AddIndex(&segIdxInfo) }) - //_, err = mt.AddIndex(&segIdxInfo) + ts = ftso() + assert.Panics(t, func() { mt.AddIndex(&segIdxInfo, ts) }) + //err = mt.AddIndex(&segIdxInfo, ts) //assert.NotNil(t, err) //assert.EqualError(t, err, "save error") }) @@ -704,17 +717,16 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } err := mt.reloadFromKV() assert.Nil(t, err) @@ -722,19 +734,21 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) - _, _, _, err = mt.DropIndex("abc", "abc", "abc") + ts = ftso() + _, _, err = mt.DropIndex("abc", "abc", "abc", ts) assert.NotNil(t, err) assert.EqualError(t, err, "collection name = abc not exist") mt.collName2ID["abc"] = 2 - _, _, _, err = mt.DropIndex("abc", "abc", "abc") + _, _, err = mt.DropIndex("abc", "abc", "abc", ts) assert.NotNil(t, err) assert.EqualError(t, err, "collection name = abc not has meta") - _, _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc") + _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc", ts) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed abc", collInfo.Schema.Name)) @@ -751,7 +765,8 @@ func TestMetaTable(t *testing.T) { } mt.collID2Meta[coll.ID] = coll mt.indexID2Meta = make(map[int64]pb.IndexInfo) - _, idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) + ts = ftso() + idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts) assert.Zero(t, idxID) assert.False(t, isDroped) assert.Nil(t, err) @@ -761,13 +776,15 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil coll.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts = ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save and remove with prefix error") } - assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) }) - //_, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) + ts = ftso() + assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts) }) + //_, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save and remove with prefix error") }) @@ -776,17 +793,16 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key, value string, ts typeutil.Timestamp) error { + return nil } err := mt.reloadFromKV() assert.Nil(t, err) @@ -794,7 +810,8 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) seg, err := mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc") @@ -811,7 +828,8 @@ func TestMetaTable(t *testing.T) { IndexID: indexID, BuildID: buildID, } - _, err = mt.AddIndex(&segIdxInfo) + ts = ftso() + err = mt.AddIndex(&segIdxInfo, ts) assert.Nil(t, err) idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName) assert.Nil(t, err) @@ -830,11 +848,11 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, nil + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } err := mt.reloadFromKV() assert.Nil(t, err) @@ -842,7 +860,8 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -901,17 +920,16 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } err = mt.reloadFromKV() assert.Nil(t, err) @@ -919,7 +937,8 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil) @@ -933,27 +952,26 @@ func TestMetaTable(t *testing.T) { assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID)) mt.indexID2Meta = bakMeta - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save error") } assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) }) //_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - //_, err = mt.AddCollection(collInfo, idxInfo, nil) + //err = mt.AddCollection(collInfo, ts, idxInfo, nil) //assert.Nil(t, err) coll, ok := mt.collID2Meta[collInfo.ID] assert.True(t, ok) @@ -973,8 +991,8 @@ func TestMetaTable(t *testing.T) { mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx idx.IndexName = idxInfo[0].IndexName - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { + return fmt.Errorf("multi save error") } assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) }) //_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) @@ -994,17 +1012,16 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { - ts := ftso() + mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error { for _, a := range addition { if a != nil { a(ts) } } - return 0, nil + return nil } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { - return 0, nil + mockKV.save = func(key string, value string, ts typeutil.Timestamp) error { + return nil } err = mt.reloadFromKV() assert.Nil(t, err) @@ -1012,7 +1029,8 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil collInfo.PartitionNames = nil collInfo.PartitionCreatedTimestamps = nil - _, err = mt.AddCollection(collInfo, idxInfo, nil) + ts := ftso() + err = mt.AddCollection(collInfo, ts, idxInfo, nil) assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) _, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName) @@ -1052,7 +1070,7 @@ func TestMetaWithTimestamp(t *testing.T) { assert.Nil(t, err) defer etcdCli.Close() - skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso) + skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7) assert.Nil(t, err) assert.NotNil(t, skv) mt, err := NewMetaTable(skv) @@ -1068,7 +1086,8 @@ func TestMetaWithTimestamp(t *testing.T) { collInfo.PartitionIDs = []int64{partID1} collInfo.PartitionNames = []string{partName1} collInfo.PartitionCreatedTimestamps = []uint64{ftso()} - t1, err := mt.AddCollection(collInfo, nil, nil) + t1 := ftso() + err = mt.AddCollection(collInfo, t1, nil, nil) assert.Nil(t, err) collInfo.ID = 2 @@ -1077,7 +1096,8 @@ func TestMetaWithTimestamp(t *testing.T) { collInfo.PartitionCreatedTimestamps = []uint64{ftso()} collInfo.Schema.Name = collName2 - t2, err := mt.AddCollection(collInfo, nil, nil) + t2 := ftso() + err = mt.AddCollection(collInfo, t2, nil, nil) assert.Nil(t, err) assert.True(t, mt.HasCollection(collID1, 0)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c6b9c975cb..ece6b02b12 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -17,6 +17,7 @@ import ( "fmt" "math/rand" "os" + "strconv" "sync" "sync/atomic" "time" @@ -85,8 +86,11 @@ type Core struct { etcdCli *clientv3.Client kvBase *etcdkv.EtcdKV + //DDL lock + ddlLock sync.Mutex + //setMsgStreams, send time tick into dd channel and time tick channel - SendTimeTick func(t typeutil.Timestamp) error + SendTimeTick func(t typeutil.Timestamp, reason string) error //setMsgStreams, send create collection into dd channel SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error @@ -152,6 +156,7 @@ func NewCore(c context.Context, factory ms.Factory) (*Core, error) { core := &Core{ ctx: ctx, cancel: cancel, + ddlLock: sync.Mutex{}, msFactory: factory, } core.UpdateStateCode(internalpb.StateCode_Abnormal) @@ -232,9 +237,11 @@ func (c *Core) startTimeTickLoop() { log.Debug("rootcoord context closed", zap.Error(c.ctx.Err())) return case <-ticker.C: + c.ddlLock.Lock() if ts, err := c.TSOAllocator(1); err == nil { - c.SendTimeTick(ts) + c.SendTimeTick(ts, "timetick loop") } + c.ddlLock.Unlock() } } } @@ -352,8 +359,12 @@ func (c *Core) checkFlushedSegmentsLoop() { if info.BuildID != 0 { info.EnableIndex = true } - if _, err := c.MetaTable.AddIndex(&info); err != nil { - log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err)) + if err := c.MetaTable.AddIndex(&info, 0); err != nil { + log.Debug("Add index into meta table failed", + zap.Int64("collection_id", collMeta.ID), + zap.Int64("index_id", info.IndexID), + zap.Int64("build_id", info.BuildID), + zap.Error(err)) } } } @@ -396,11 +407,8 @@ func (c *Core) setDdMsgSendFlag(b bool) error { return nil } - if b { - _, err = c.MetaTable.client.Save(DDMsgSendPrefix, "true") - return err - } - _, err = c.MetaTable.client.Save(DDMsgSendPrefix, "false") + ts, _ := c.TSOAllocator(1) + err = c.MetaTable.client.Save(DDMsgSendPrefix, strconv.FormatBool(b), ts) return err } @@ -420,7 +428,7 @@ func (c *Core) setMsgStreams() error { timeTickStream.AsProducer([]string{Params.TimeTickChannel}) log.Debug("rootcoord AsProducer: " + Params.TimeTickChannel) - c.SendTimeTick = func(t typeutil.Timestamp) error { + c.SendTimeTick = func(t typeutil.Timestamp, reason string) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ BeginTimestamp: t, @@ -462,7 +470,11 @@ func (c *Core) setMsgStreams() error { Timestamps: pt, DefaultTimestamp: t, } - return c.chanTimeTick.UpdateTimeTick(&ttMsg) + log.Debug("update timetick", + zap.Any("DefaultTs", t), + zap.Any("sourceID", c.session.ServerID), + zap.Any("reason", reason)) + return c.chanTimeTick.UpdateTimeTick(&ttMsg, reason) } c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error { @@ -883,19 +895,8 @@ func (c *Core) Init() error { if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil { return initError } - tsAlloc := func() typeutil.Timestamp { - for { - var ts typeutil.Timestamp - var err error - if ts, err = c.TSOAllocator(1); err == nil { - return ts - } - time.Sleep(100 * time.Millisecond) - log.Debug("alloc time stamp error", zap.Error(err)) - } - } var ms *metaSnapshot - ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024, tsAlloc) + ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024) if initError != nil { return initError } @@ -1832,7 +1833,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel status.Reason = fmt.Sprintf("UpdateChannelTimeTick receive invalid message %d", in.Base.GetMsgType()) return status, nil } - err := c.chanTimeTick.UpdateTimeTick(in) + err := c.chanTimeTick.UpdateTimeTick(in, "gRPC") if err != nil { status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() @@ -1913,7 +1914,8 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus log.Error("build index fail", zap.Int64("buildid", info.BuildID), zap.Error(err)) continue } - _, err = c.MetaTable.AddIndex(&info) + ts, _ := c.TSOAllocator(1) + err = c.MetaTable.AddIndex(&info, ts) if err != nil { log.Error("AddIndex fail", zap.String("err", err.Error())) } diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 28164ab0f2..2c74b00305 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -15,9 +15,9 @@ import ( "context" "fmt" - "github.com/milvus-io/milvus/internal/util/tsoutil" - "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -25,8 +25,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.uber.org/zap" ) type reqTask interface { @@ -180,27 +180,47 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return EncodeDdOperation(&ddCollReq, CreateCollectionDDType) } - ts, err := t.core.MetaTable.AddCollection(&collInfo, idxInfo, ddOp) + reason := fmt.Sprintf("create collection %d", collID) + ts, err := t.core.TSOAllocator(1) if err != nil { - return fmt.Errorf("meta table add collection failed,error = %w", err) + return fmt.Errorf("TSO alloc fail, error = %w", err) } - // add dml channel before send dd msg - t.core.dmlChannels.AddProducerChannels(chanNames...) + // use lambda function here to guarantee all resources to be released + createCollectionFn := func() error { + // lock for ddl operation + t.core.ddlLock.Lock() + defer t.core.ddlLock.Unlock() - err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames) - if err != nil { - return fmt.Errorf("send dd create collection req failed, error = %w", err) + t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + // clear ddl timetick in all conditions + defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + + err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp) + if err != nil { + return fmt.Errorf("meta table add collection failed,error = %w", err) + } + + // add dml channel before send dd msg + t.core.dmlChannels.AddProducerChannels(chanNames...) + + err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames) + if err != nil { + return fmt.Errorf("send dd create collection req failed, error = %w", err) + } + + t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.SendTimeTick(ts, reason) + return nil } - t.core.SendTimeTick(ts) + err = createCollectionFn() + if err != nil { + return err + } // Update DDOperation in etcd - err = t.core.setDdMsgSendFlag(true) - if err != nil { - return fmt.Errorf("send dd msg send flag failed,error = %w", err) - } - return nil + return t.core.setDdMsgSendFlag(true) } type DropCollectionReqTask struct { @@ -237,21 +257,45 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return EncodeDdOperation(&ddReq, DropCollectionDDType) } - ts, err := t.core.MetaTable.DeleteCollection(collMeta.ID, ddOp) + reason := fmt.Sprintf("drop collection %d", collMeta.ID) + ts, err := t.core.TSOAllocator(1) + if err != nil { + return fmt.Errorf("TSO alloc fail, error = %w", err) + } + + // use lambda function here to guarantee all resources to be released + dropCollectionFn := func() error { + // lock for ddl operation + t.core.ddlLock.Lock() + defer t.core.ddlLock.Unlock() + + t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + // clear ddl timetick in all conditions + defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + + err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOp) + if err != nil { + return err + } + + err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) + if err != nil { + return err + } + + t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.SendTimeTick(ts, reason) + + // remove dml channel after send dd msg + t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) + return nil + } + + err = dropCollectionFn() if err != nil { return err } - err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) - if err != nil { - return err - } - - t.core.SendTimeTick(ts) - - // remove dml channel after send dd msg - t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) - //notify query service to release collection if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil { log.Error("CallReleaseCollectionService failed", zap.String("error", err.Error())) @@ -414,18 +458,42 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return EncodeDdOperation(&ddReq, CreatePartitionDDType) } - ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp) + reason := fmt.Sprintf("create partition %s", t.Req.PartitionName) + ts, err := t.core.TSOAllocator(1) + if err != nil { + return fmt.Errorf("TSO alloc fail, error = %w", err) + } + + // use lambda function here to guarantee all resources to be released + createPartitionFn := func() error { + // lock for ddl operation + t.core.ddlLock.Lock() + defer t.core.ddlLock.Unlock() + + t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + // clear ddl timetick in all conditions + defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + + err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOp) + if err != nil { + return err + } + + err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) + if err != nil { + return err + } + + t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.SendTimeTick(ts, reason) + return nil + } + + err = createPartitionFn() if err != nil { return err } - err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) - if err != nil { - return err - } - - t.core.SendTimeTick(ts) - req := proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO, msg type @@ -482,18 +550,42 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return EncodeDdOperation(&ddReq, DropPartitionDDType) } - ts, _, err := t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOp) + reason := fmt.Sprintf("drop partition %s", t.Req.PartitionName) + ts, err := t.core.TSOAllocator(1) + if err != nil { + return fmt.Errorf("TSO alloc fail, error = %w", err) + } + + // use lambda function here to guarantee all resources to be released + dropPartitionFn := func() error { + // lock for ddl operation + t.core.ddlLock.Lock() + defer t.core.ddlLock.Unlock() + + t.core.chanTimeTick.AddDdlTimeTick(ts, reason) + // clear ddl timetick in all conditions + defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + + _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOp) + if err != nil { + return err + } + + err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames) + if err != nil { + return err + } + + t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) + t.core.SendTimeTick(ts, reason) + return nil + } + + err = dropPartitionFn() if err != nil { return err } - err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames) - if err != nil { - return err - } - - t.core.SendTimeTick(ts) - req := proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO, msg type @@ -724,7 +816,8 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if info.BuildID != 0 { info.EnableIndex = true } - if _, err := t.core.MetaTable.AddIndex(&info); err != nil { + ts, _ := t.core.TSOAllocator(1) + if err := t.core.MetaTable.AddIndex(&info, ts); err != nil { log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err)) } } @@ -795,6 +888,7 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { if err != nil { return err } - _, _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) + ts, _ := t.core.TSOAllocator(1) + _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName, ts) return err } diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index baf3dfc2a6..facea9124b 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -13,6 +13,7 @@ package rootcoord import ( "fmt" + "math" "sync" "github.com/milvus-io/milvus/internal/log" @@ -26,11 +27,21 @@ import ( "go.uber.org/zap" ) +type ddlTimetickInfo struct { + ddlMinTs typeutil.Timestamp + ddlTsSet map[typeutil.Timestamp]struct{} +} + type timetickSync struct { core *Core lock sync.Mutex proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg + + // record ddl timetick info + ddlLock sync.RWMutex + ddlMinTs typeutil.Timestamp + ddlTsSet map[typeutil.Timestamp]struct{} } type channelTimeTickMsg struct { @@ -63,18 +74,22 @@ func newTimeTickSync(core *Core) *timetickSync { core: core, proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg), sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16), + + ddlLock: sync.RWMutex{}, + ddlMinTs: typeutil.Timestamp(math.MaxUint64), + ddlTsSet: make(map[typeutil.Timestamp]struct{}), } } // sendToChannel send all channels' timetick to sendChan // lock is needed by the invoker -func (t *timetickSync) sendToChannel() { +func (t *timetickSync) sendToChannel() error { if len(t.proxyTimeTick) == 0 { - return + return fmt.Errorf("proxyTimeTick empty") } for _, v := range t.proxyTimeTick { if v == nil { - return + return fmt.Errorf("proxyTimeTick has not been fulfilled") } } // clear proxyTimeTick and send a clone @@ -84,10 +99,57 @@ func (t *timetickSync) sendToChannel() { t.proxyTimeTick[k] = nil } t.sendChan <- ptt + return nil +} + +// AddDmlTimeTick add ts into ddlTimetickInfos[sourceID], +// can be used to tell if DDL operation is in process. +func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) { + t.ddlLock.Lock() + defer t.ddlLock.Unlock() + + if ts < t.ddlMinTs { + t.ddlMinTs = ts + } + t.ddlTsSet[ts] = struct{}{} + + log.Debug("add ddl timetick", zap.Uint64("minTs", t.ddlMinTs), zap.Uint64("ts", ts), + zap.Int("len(ddlTsSet)", len(t.ddlTsSet)), zap.String("reason", reason)) +} + +// RemoveDdlTimeTick is invoked in UpdateTimeTick. +// It clears the ts generated by AddDdlTimeTick, indicates DDL operation finished. +func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) { + t.ddlLock.Lock() + defer t.ddlLock.Unlock() + + delete(t.ddlTsSet, ts) + log.Debug("remove ddl timetick", zap.Uint64("ts", ts), zap.Int("len(ddlTsSet)", len(t.ddlTsSet)), + zap.String("reason", reason)) + if len(t.ddlTsSet) == 0 { + t.ddlMinTs = typeutil.Timestamp(math.MaxUint64) + } else if t.ddlMinTs == ts { + // re-calculate minTs + minTs := typeutil.Timestamp(math.MaxUint64) + for tt := range t.ddlTsSet { + if tt < minTs { + minTs = tt + } + } + t.ddlMinTs = ts + log.Debug("update ddl minTs", zap.Any("minTs", ts)) + } +} + +func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp { + t.ddlLock.Lock() + defer t.ddlLock.Unlock() + + return t.ddlMinTs } // UpdateTimeTick check msg validation and send it to local channel -func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error { +func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error { t.lock.Lock() defer t.lock.Unlock() if len(in.ChannelNames) == 0 && in.DefaultTimestamp == 0 { @@ -101,20 +163,40 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error { if !ok { return fmt.Errorf("Skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID) } + + // if ddl operation not finished, skip current ts update + ddlMinTs := t.GetDdlMinTimeTick() + if in.DefaultTimestamp > ddlMinTs { + log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID), + zap.Uint64("curr ts", in.DefaultTimestamp), + zap.Uint64("ddlMinTs", ddlMinTs), + zap.String("reason", reason)) + return nil + } + if in.Base.SourceID == t.core.session.ServerID { - if prev != nil && prev.in.DefaultTimestamp >= in.DefaultTimestamp { - log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.in.DefaultTimestamp), zap.Uint64("curr ts", in.DefaultTimestamp)) + if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp { + log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), + zap.Uint64("curr ts", in.DefaultTimestamp), + zap.Uint64("prev ts", prev.in.DefaultTimestamp), + zap.String("reason", reason)) return nil } } if in.DefaultTimestamp == 0 { mints := minTimeTick(in.Timestamps...) - log.Debug("default time stamp is zero, set it to the min value of inputs", zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints)) + log.Debug("default time stamp is zero, set it to the min value of inputs", + zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints)) in.DefaultTimestamp = mints } t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) - t.sendToChannel() + log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), + zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) + + if err := t.sendToChannel(); err != nil { + log.Debug("sendToChannel fail", zap.Any("err", err.Error())) + } return nil } @@ -168,7 +250,6 @@ func (t *timetickSync) StartWatch() { log.Debug("SendChannelTimeTick fail", zap.Error(err)) } } - } } }