diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index ec78aa3b71..9c9b955981 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -14,4 +14,5 @@ master: minSegmentSizeToEnableIndex: 1024 nodeID: 100 timeout: 3600 # time out, 5 seconds + timeTickInterval: 200 # ms diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 39d7f39f32..3065f1e565 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "math/rand" - "regexp" "strconv" "strings" "sync" @@ -294,21 +293,21 @@ func TestGrpcService(t *testing.T) { assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_CreateCollection) assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again") - //time stamp go back - schema.Name = "testColl-goback" - sbf, err = proto.Marshal(&schema) - assert.Nil(t, err) - req.CollectionName = schema.Name - req.Schema = sbf - req.Base.MsgID = 103 - req.Base.Timestamp = 103 - req.Base.SourceID = 103 - status, err = cli.CreateCollection(ctx, req) - assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError) - matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason) - assert.Nil(t, err) - assert.True(t, matched) + //time stamp go back, master response to add the timestamp, so the time tick will never go back + //schema.Name = "testColl-goback" + //sbf, err = proto.Marshal(&schema) + //assert.Nil(t, err) + //req.CollectionName = schema.Name + //req.Schema = sbf + //req.Base.MsgID = 103 + //req.Base.Timestamp = 103 + //req.Base.SourceID = 103 + //status, err = cli.CreateCollection(ctx, req) + //assert.Nil(t, err) + //assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError) + //matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason) + //assert.Nil(t, err) + //assert.True(t, matched) }) t.Run("has collection", func(t *testing.T) { diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 98c6e369be..0351d3fb7f 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -38,7 +38,7 @@ type TxnKV interface { type SnapShotKV interface { Save(key, value string) (typeutil.Timestamp, error) Load(key string, ts typeutil.Timestamp) (string, error) - MultiSave(kvs map[string]string) (typeutil.Timestamp, error) + MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) - MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 65884cac74..d899d0dede 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -53,6 +53,10 @@ import ( // milvuspb -> milvuspb // masterpb2 -> masterpb (master_service) +//NEZA2017, DEBUG FLAG for milvus 2.0, this part should remove when milvus 2.0 release + +var SetDDTimeTimeByMaster bool = false + // ------------------ struct ----------------------- // DdOperation used to save ddMsg into ETCD @@ -78,12 +82,12 @@ type Core struct { MetaTable *metaTable //id allocator - idAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) - idAllocatorUpdate func() error + IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) + IDAllocatorUpdate func() error //tso allocator - tsoAllocator func(count uint32) (typeutil.Timestamp, error) - tsoAllocatorUpdate func() error + TSOAllocator func(count uint32) (typeutil.Timestamp, error) + TSOAllocatorUpdate func() error //inner members ctx context.Context @@ -130,8 +134,7 @@ type Core struct { ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error //dd request scheduler - ddReqQueue chan reqTask //dd request will be push into this chan - lastDdTimeStamp typeutil.Timestamp + ddReqQueue chan reqTask //dd request will be push into this chan //time tick loop lastTimeTick typeutil.Timestamp @@ -169,16 +172,16 @@ func (c *Core) checkInit() error { if c.MetaTable == nil { return fmt.Errorf("MetaTable is nil") } - if c.idAllocator == nil { + if c.IDAllocator == nil { return fmt.Errorf("idAllocator is nil") } - if c.idAllocatorUpdate == nil { + if c.IDAllocatorUpdate == nil { return fmt.Errorf("idAllocatorUpdate is nil") } - if c.tsoAllocator == nil { + if c.TSOAllocator == nil { return fmt.Errorf("tsoAllocator is nil") } - if c.tsoAllocatorUpdate == nil { + if c.TSOAllocatorUpdate == nil { return fmt.Errorf("tsoAllocatorUpdate is nil") } if c.etcdCli == nil { @@ -243,42 +246,57 @@ func (c *Core) startDdScheduler() { log.Debug("dd chan is closed, exit task execution loop") return } - ts, err := task.Ts() - if err != nil { - task.Notify(err) - break - } - if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp { - task.Notify(fmt.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp)) - break - } - err = task.Execute(task.Ctx()) + err := task.Execute(task.Ctx()) task.Notify(err) - if ts > c.lastDdTimeStamp { - c.lastDdTimeStamp = ts - } } } } func (c *Core) startTimeTickLoop() { - for { - select { - case <-c.ctx.Done(): - log.Debug("close master time tick loop") - return - case tt, ok := <-c.ProxyTimeTickChan: - if !ok { - log.Warn("proxyTimeTickStream is closed, exit time tick loop") + if SetDDTimeTimeByMaster { + ticker := time.NewTimer(time.Duration(Params.TimeTickInterval) * time.Millisecond) + cnt := 0 + for { + select { + case <-c.ctx.Done(): + log.Debug("master context closed", zap.Error(c.ctx.Err())) return + case <-ticker.C: + if len(c.ddReqQueue) < 2 || cnt > 5 { + tt := &TimetickTask{ + baseReqTask: baseReqTask{ + ctx: c.ctx, + cv: make(chan error, 1), + core: c, + }, + } + c.ddReqQueue <- tt + cnt = 0 + } else { + cnt++ + } + } - if tt <= c.lastTimeTick { - log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt)) + } + } else { + for { + select { + case <-c.ctx.Done(): + log.Debug("close master time tick loop") + return + case tt, ok := <-c.ProxyTimeTickChan: + if !ok { + log.Warn("proxyTimeTickStream is closed, exit time tick loop") + return + } + if tt <= c.lastTimeTick { + log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt)) + } + if err := c.SendTimeTick(tt); err != nil { + log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error())) + } + c.lastTimeTick = tt } - if err := c.SendTimeTick(tt); err != nil { - log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error())) - } - c.lastTimeTick = tt } } } @@ -360,11 +378,11 @@ func (c *Core) tsLoop() { for { select { case <-tsoTicker.C: - if err := c.tsoAllocatorUpdate(); err != nil { + if err := c.TSOAllocatorUpdate(); err != nil { log.Warn("failed to update timestamp: ", zap.Error(err)) continue } - if err := c.idAllocatorUpdate(); err != nil { + if err := c.IDAllocatorUpdate(); err != nil { log.Warn("failed to update id: ", zap.Error(err)) continue } @@ -651,7 +669,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { - ts, err := c.tsoAllocator(1) + ts, err := c.TSOAllocator(1) if err != nil { return nil, err } @@ -679,7 +697,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { } c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { - ts, err := c.tsoAllocator(1) + ts, err := c.TSOAllocator(1) if err != nil { return 0, err } @@ -815,7 +833,7 @@ func (c *Core) Init() error { for { var ts typeutil.Timestamp var err error - if ts, err = c.tsoAllocator(1); err == nil { + if ts, err = c.TSOAllocator(1); err == nil { return ts } time.Sleep(100 * time.Millisecond) @@ -842,10 +860,10 @@ func (c *Core) Init() error { if initError = idAllocator.Initialize(); initError != nil { return } - c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + c.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { return idAllocator.Alloc(count) } - c.idAllocatorUpdate = func() error { + c.IDAllocatorUpdate = func() error { return idAllocator.UpdateID() } @@ -853,10 +871,10 @@ func (c *Core) Init() error { if initError = tsoAllocator.Initialize(); initError != nil { return } - c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + c.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) { return tsoAllocator.Alloc(count) } - c.tsoAllocatorUpdate = func() error { + c.TSOAllocatorUpdate = func() error { return tsoAllocator.UpdateTSO() } @@ -1584,7 +1602,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques } func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { - ts, err := c.tsoAllocator(in.Count) + ts, err := c.TSOAllocator(in.Count) if err != nil { log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocTimestampResponse{ @@ -1608,7 +1626,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe } func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) { - start, _, err := c.idAllocator(in.Count) + start, _, err := c.IDAllocator(in.Count) if err != nil { log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocIDResponse{ diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index fdedbb3656..6c73ddcba5 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -1581,10 +1581,10 @@ func TestMasterService(t *testing.T) { }) t.Run("alloc_error", func(t *testing.T) { - core.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + core.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { return 0, 0, fmt.Errorf("id allocator error test") } - core.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + core.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) { return 0, fmt.Errorf("tso allcoator error test") } r1 := &masterpb.AllocTimestampRequest{ @@ -1615,6 +1615,152 @@ func TestMasterService(t *testing.T) { }) } +func TestMasterService2(t *testing.T) { + const ( + dbName = "testDb" + collName = "testColl" + partName = "testPartition" + ) + SetDDTimeTimeByMaster = true + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + msFactory := msgstream.NewPmsFactory() + Params.Init() + core, err := NewCore(ctx, msFactory) + assert.Nil(t, err) + randVal := rand.Int() + + Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) + Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) + Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) + Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) + Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) + Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + + pm := &proxyMock{ + randVal: randVal, + collArray: make([]string, 0, 16), + mutex: sync.Mutex{}, + } + err = core.SetProxyService(ctx, pm) + assert.Nil(t, err) + + dm := &dataMock{randVal: randVal} + err = core.SetDataService(ctx, dm) + assert.Nil(t, err) + + im := &indexMock{ + fileArray: []string{}, + idxBuildID: []int64{}, + idxID: []int64{}, + idxDropID: []int64{}, + mutex: sync.Mutex{}, + } + err = core.SetIndexService(im) + assert.Nil(t, err) + + qm := &queryMock{ + collID: nil, + mutex: sync.Mutex{}, + } + err = core.SetQueryService(qm) + assert.Nil(t, err) + + err = core.Init() + assert.Nil(t, err) + + err = core.Start() + assert.Nil(t, err) + + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + proxyTimeTickStream, _ := msFactory.NewMsgStream(ctx) + proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel}) + + dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx) + dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel}) + + timeTickStream, _ := msFactory.NewMsgStream(ctx) + timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) + timeTickStream.Start() + + ddStream, _ := msFactory.NewMsgStream(ctx) + ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) + ddStream.Start() + + time.Sleep(time.Second) + + getNotTTMsg := func(ch <-chan *msgstream.MsgPack, n int) []msgstream.TsMsg { + msg := make([]msgstream.TsMsg, 0, n) + for { + m, ok := <-ch + assert.True(t, ok) + for _, m := range m.Msgs { + if _, ok := (m).(*msgstream.TimeTickMsg); !ok { + msg = append(msg, m) + } + } + if len(msg) >= n { + return msg + } + } + } + + t.Run("time tick", func(t *testing.T) { + ttmsg, ok := <-timeTickStream.Chan() + assert.True(t, ok) + assert.Equal(t, 1, len(ttmsg.Msgs)) + ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg) + assert.True(t, ok) + assert.Greater(t, ttm.Base.Timestamp, typeutil.Timestamp(0)) + + ddmsg, ok := <-ddStream.Chan() + assert.True(t, ok) + assert.Equal(t, 1, len(ddmsg.Msgs)) + ddm, ok := (ddmsg.Msgs[0]).(*msgstream.TimeTickMsg) + assert.True(t, ok) + assert.Greater(t, ddm.Base.Timestamp, typeutil.Timestamp(0)) + }) + + t.Run("create collection", func(t *testing.T) { + schema := schemapb.CollectionSchema{ + Name: collName, + } + + sbf, err := proto.Marshal(&schema) + assert.Nil(t, err) + + req := &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateCollection, + Timestamp: 100, + }, + DbName: dbName, + CollectionName: collName, + Schema: sbf, + } + status, err := core.CreateCollection(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + + msg := getNotTTMsg(ddStream.Chan(), 2) + assert.GreaterOrEqual(t, len(msg), 2) + m1, ok := (msg[0]).(*msgstream.CreateCollectionMsg) + assert.True(t, ok) + m2, ok := (msg[1]).(*msgstream.CreatePartitionMsg) + assert.True(t, ok) + assert.Equal(t, m1.Base.Timestamp, m2.Base.Timestamp) + t.Log("time tick", m1.Base.Timestamp) + }) +} + func TestCheckInit(t *testing.T) { c, err := NewCore(context.Background(), nil) assert.Nil(t, err) @@ -1629,25 +1775,25 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + c.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { return 0, 0, nil } err = c.checkInit() assert.NotNil(t, err) - c.idAllocatorUpdate = func() error { + c.IDAllocatorUpdate = func() error { return nil } err = c.checkInit() assert.NotNil(t, err) - c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + c.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) { return 0, nil } err = c.checkInit() assert.NotNil(t, err) - c.tsoAllocatorUpdate = func() error { + c.TSOAllocatorUpdate = func() error { return nil } err = c.checkInit() diff --git a/internal/masterservice/meta_snapshot.go b/internal/masterservice/meta_snapshot.go index 178ee2867a..ab0bf00ca6 100644 --- a/internal/masterservice/meta_snapshot.go +++ b/internal/masterservice/meta_snapshot.go @@ -313,7 +313,7 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) return string(resp.Kvs[0].Value), nil } -func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) { +func (ms *metaSnapshot) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) @@ -321,10 +321,15 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, er ts := ms.timeAllactor() strTs := strconv.FormatInt(int64(ts), 10) - ops := make([]clientv3.Op, 0, len(kvs)+1) + ops := make([]clientv3.Op, 0, len(kvs)+2) for key, value := range kvs { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } + if addition != nil { + if k, v, e := addition(ts); e == nil { + ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v)) + } + } ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() if err != nil { @@ -370,7 +375,7 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str return keys, values, nil } -func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) { +func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { ms.lock.Lock() defer ms.lock.Unlock() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) @@ -378,10 +383,15 @@ func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, re ts := ms.timeAllactor() strTs := strconv.FormatInt(int64(ts), 10) - ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1) + ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) } + if addition != nil { + if k, v, e := addition(ts); e == nil { + ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v)) + } + } for _, key := range removals { ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key))) } diff --git a/internal/masterservice/meta_snapshot_test.go b/internal/masterservice/meta_snapshot_test.go index feaf5c5b4f..27919b1432 100644 --- a/internal/masterservice/meta_snapshot_test.go +++ b/internal/masterservice/meta_snapshot_test.go @@ -282,7 +282,7 @@ func TestMultiSave(t *testing.T) { for i := 0; i < 20; i++ { saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.MultiSave(saves) + ts, err := ms.MultiSave(saves, nil) assert.Nil(t, err) assert.Equal(t, vtso, ts) } @@ -353,7 +353,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} dm := []string{fmt.Sprintf("kd-%d", i-20)} vtso = typeutil.Timestamp(100 + i*5) - ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm) + ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm, nil) assert.Nil(t, err) assert.Equal(t, vtso, ts) } diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 8da57dcde4..4f47367d87 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -208,6 +208,20 @@ func (mt *metaTable) reloadFromKV() error { return nil } +func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) { + if op == nil { + return nil + } + meta[DDMsgSendPrefix] = "false" + return func(ts typeutil.Timestamp) (string, string, error) { + val, err := op(ts) + if err != nil { + return "", "", err + } + return DDOperationPrefix, val, nil + } +} + func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) { mt.tenantLock.Lock() defer mt.tenantLock.Unlock() @@ -238,7 +252,7 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) { return ts, nil } -func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) (typeutil.Timestamp, error) { +func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -285,10 +299,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn } // save ddOpStr into etcd - meta[DDOperationPrefix] = ddOpStr - meta[DDMsgSendPrefix] = "false" - - ts, err := mt.client.MultiSave(meta) + addition := mt.getAdditionKV(ddOpStr, meta) + ts, err := mt.client.MultiSave(meta, addition) if err != nil { _ = mt.reloadFromKV() return 0, err @@ -297,7 +309,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn return ts, nil } -func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) { +func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -349,12 +361,9 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) } // save ddOpStr into etcd - var saveMeta = map[string]string{ - DDOperationPrefix: ddOpStr, - DDMsgSendPrefix: "false", - } - - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) + var saveMeta = map[string]string{} + addition := mt.getAdditionKV(ddOpStr, saveMeta) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition) if err != nil { _ = mt.reloadFromKV() return 0, err @@ -478,7 +487,7 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) ([]string, error) { return colls, nil } -func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) { +func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] @@ -520,10 +529,9 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string meta := map[string]string{k1: v1, k2: v2} // save ddOpStr into etcd - meta[DDOperationPrefix] = ddOpStr - meta[DDMsgSendPrefix] = "false" + addition := mt.getAdditionKV(ddOpStr, meta) - ts, err := mt.client.MultiSave(meta) + ts, err := mt.client.MultiSave(meta, addition) if err != nil { _ = mt.reloadFromKV() return 0, err @@ -589,7 +597,7 @@ func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string } //return timestamp, partitionid, error -func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.Timestamp, typeutil.UniqueID, error) { +func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -647,10 +655,9 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } // save ddOpStr into etcd - meta[DDOperationPrefix] = ddOpStr - meta[DDMsgSendPrefix] = "false" + addition := mt.getAdditionKV(ddOpStr, meta) - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition) if err != nil { _ = mt.reloadFromKV() return 0, 0, err @@ -856,7 +863,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID), } - ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, nil) if err != nil { _ = mt.reloadFromKV() return 0, 0, false, err @@ -1035,7 +1042,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - _, err = mt.client.MultiSave(meta) + _, err = mt.client.MultiSave(meta, nil) if err != nil { _ = mt.reloadFromKV() return nil, schemapb.FieldSchema{}, err @@ -1058,7 +1065,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - _, err = mt.client.MultiSave(meta) + _, err = mt.client.MultiSave(meta, nil) if err != nil { _ = mt.reloadFromKV() return nil, schemapb.FieldSchema{}, err diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 49111f3d7f..143a6a8984 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -33,8 +33,8 @@ type mockTestKV struct { loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error) save func(key, value string) (typeutil.Timestamp, error) - multiSave func(kvs map[string]string) (typeutil.Timestamp, error) - multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) (typeutil.Timestamp, error) + multiSave func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) + multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) } func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { @@ -48,12 +48,12 @@ func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) { return m.save(key, value) } -func (m *mockTestKV) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) { - return m.multiSave(kvs) +func (m *mockTestKV) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { + return m.multiSave(kvs, addition) } -func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) { - return m.multiSaveAndRemoveWithPrefix(saves, removals) +func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { + return m.multiSaveAndRemoveWithPrefix(saves, removals, addition) } func Test_MockKV(t *testing.T) { @@ -253,21 +253,25 @@ func TestMetaTable(t *testing.T) { }, } + ddOp := func(ts typeutil.Timestamp) (string, error) { + return "", nil + } + t.Run("add collection", func(t *testing.T) { partInfoDefault.SegmentIDs = []int64{segID} - _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp) assert.NotNil(t, err) partInfoDefault.SegmentIDs = []int64{} collInfo.PartitionIDs = []int64{segID} - _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp) assert.NotNil(t, err) collInfo.PartitionIDs = []int64{} - _, err = mt.AddCollection(collInfo, partInfoDefault, nil, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, nil, ddOp) assert.NotNil(t, err) - _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp) assert.Nil(t, err) collMeta, err := mt.GetCollectionByName("testColl", 0) @@ -287,7 +291,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("add partition", func(t *testing.T) { - _, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "") + _, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, ddOp) assert.Nil(t, err) // check DD operation flag @@ -460,7 +464,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop partition", func(t *testing.T) { - _, id, err := mt.DeletePartition(collID, partInfo.PartitionName, "") + _, id, err := mt.DeletePartition(collID, partInfo.PartitionName, nil) assert.Nil(t, err) assert.Equal(t, partID, id) @@ -471,9 +475,9 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop collection", func(t *testing.T) { - _, err = mt.DeleteCollection(collIDInvalid, "") + _, err = mt.DeleteCollection(collIDInvalid, nil) assert.NotNil(t, err) - _, err = mt.DeleteCollection(collID, "") + _, err = mt.DeleteCollection(collID, nil) assert.Nil(t, err) // check DD operation flag @@ -490,28 +494,28 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save error") } collInfo.PartitionIDs = nil - _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") }) t.Run("delete collection failed", func(t *testing.T) { - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } - mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) (typeutil.Timestamp, error) { + mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("milti save and remove with prefix error") } collInfo.PartitionIDs = nil - _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo) mt.indexID2Meta = make(map[int64]pb.IndexInfo) - _, err = mt.DeleteCollection(collInfo.ID, "") + _, err = mt.DeleteCollection(collInfo.ID, nil) assert.NotNil(t, err) assert.EqualError(t, err, "milti save and remove with prefix error") }) @@ -522,7 +526,7 @@ func TestMetaTable(t *testing.T) { } collInfo.PartitionIDs = nil - _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) seg := &datapb.SegmentInfo{ @@ -559,41 +563,41 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - _, err = mt.AddPartition(2, "no-part", 22, "") + _, err = mt.AddPartition(2, "no-part", 22, nil) assert.NotNil(t, err) assert.EqualError(t, err, "can't find collection. id = 2") coll := mt.collID2Meta[collInfo.ID] coll.PartitionIDs = make([]int64, Params.MaxPartitionNum) mt.collID2Meta[coll.ID] = coll - _, err = mt.AddPartition(coll.ID, "no-part", 22, "") + _, err = mt.AddPartition(coll.ID, "no-part", 22, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)) coll.PartitionIDs = []int64{partInfo.PartitionID} mt.collID2Meta[coll.ID] = coll mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save error") } - _, err = mt.AddPartition(coll.ID, "no-part", 22, "") + _, err = mt.AddPartition(coll.ID, "no-part", 22, nil) assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - _, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "") + _, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName)) - _, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "") + _, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID)) }) @@ -602,14 +606,14 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) @@ -623,36 +627,36 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - _, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "") + _, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, nil) assert.NotNil(t, err) assert.EqualError(t, err, "default partition cannot be deleted") - _, _, err = mt.DeletePartition(collInfo.ID, "abc", "") + _, _, err = mt.DeletePartition(collInfo.ID, "abc", nil) assert.NotNil(t, err) assert.EqualError(t, err, "partition abc does not exist") pm := mt.partitionID2Meta[partInfo.PartitionID] pm.SegmentIDs = []int64{11, 12, 13} mt.partitionID2Meta[pm.PartitionID] = pm - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) { + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save and remove with prefix error") } - _, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "") + _, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, nil) assert.NotNil(t, err) assert.EqualError(t, err, "multi save and remove with prefix error") mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, _, err = mt.DeletePartition(collInfo.ID, "abc", "") + _, _, err = mt.DeletePartition(collInfo.ID, "abc", nil) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID)) @@ -665,14 +669,14 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) noPart := pb.PartitionInfo{ @@ -708,7 +712,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -718,7 +722,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) seg := &datapb.SegmentInfo{ @@ -758,7 +762,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) _, err = mt.AddSegment(seg) assert.Nil(t, err) @@ -776,7 +780,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -786,7 +790,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) _, _, _, err = mt.DropIndex("abc", "abc", "abc") @@ -823,10 +827,10 @@ func TestMetaTable(t *testing.T) { err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) { + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save and remove with prefix error") } _, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) @@ -838,7 +842,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -848,7 +852,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) _, err = mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc") @@ -895,7 +899,7 @@ func TestMetaTable(t *testing.T) { mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -905,7 +909,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -966,7 +970,7 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -976,7 +980,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx) @@ -990,18 +994,18 @@ func TestMetaTable(t *testing.T) { assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID)) mt.indexID2Meta = bakMeta - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save error") } _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx) assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) coll := mt.collID2Meta[collInfo.ID] coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1}) @@ -1019,7 +1023,7 @@ func TestMetaTable(t *testing.T) { mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx idx.IndexName = idxInfo[0].IndexName - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("multi save error") } _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx) @@ -1039,7 +1043,7 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, nil } mockKV.save = func(key, value string) (typeutil.Timestamp, error) { @@ -1049,7 +1053,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) _, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName) @@ -1111,7 +1115,7 @@ func TestMetaWithTimestamp(t *testing.T) { PartitionID: 11, SegmentIDs: nil, } - t1, err := mt.AddCollection(collInfo, partInfo, nil, "") + t1, err := mt.AddCollection(collInfo, partInfo, nil, nil) assert.Nil(t, err) collInfo.ID = 2 @@ -1120,7 +1124,7 @@ func TestMetaWithTimestamp(t *testing.T) { partInfo.PartitionID = 12 partInfo.PartitionName = "p2" - t2, err := mt.AddCollection(collInfo, partInfo, nil, "") + t2, err := mt.AddCollection(collInfo, partInfo, nil, nil) assert.Nil(t, err) assert.True(t, mt.HasCollection(1, 0)) diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 49381b45dd..b944fc8f53 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -45,7 +45,8 @@ type ParamTable struct { DefaultIndexName string MinSegmentSizeToEnableIndex int64 - Timeout int + Timeout int + TimeTickInterval int Log log.Config @@ -79,6 +80,7 @@ func (p *ParamTable) Init() { p.initDefaultIndexName() p.initTimeout() + p.initTimeTickInterval() p.initLogCfg() p.initRoleName() @@ -189,6 +191,10 @@ func (p *ParamTable) initTimeout() { p.Timeout = p.ParseInt("master.timeout") } +func (p *ParamTable) initTimeTickInterval() { + p.TimeTickInterval = p.ParseInt("master.timeTickInterval") +} + func (p *ParamTable) initLogCfg() { p.Log = log.Config{} format, err := p.Load("log.format") diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index 6f14db71a7..8015016890 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -61,4 +61,7 @@ func TestParamTable(t *testing.T) { assert.NotZero(t, Params.Timeout) t.Logf("master timeout = %d", Params.Timeout) + + assert.NotZero(t, Params.TimeTickInterval) + t.Logf("master timetickerInterval = %d", Params.TimeTickInterval) } diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index c287527793..0102a4b022 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -29,8 +29,6 @@ import ( type reqTask interface { Ctx() context.Context Type() commonpb.MsgType - Ts() (typeutil.Timestamp, error) - IgnoreTimeStamp() bool Execute(ctx context.Context) error WaitToFinish() error Notify(err error) @@ -60,6 +58,26 @@ func (bt *baseReqTask) WaitToFinish() error { } } +type TimetickTask struct { + baseReqTask +} + +func (t *TimetickTask) Ctx() context.Context { + return t.ctx +} + +func (t *TimetickTask) Type() commonpb.MsgType { + return commonpb.MsgType_TimeTick +} + +func (t *TimetickTask) Execute(ctx context.Context) error { + ts, err := t.core.TSOAllocator(1) + if err != nil { + return err + } + return t.core.SendTimeTick(ts) +} + type CreateCollectionReqTask struct { baseReqTask Req *milvuspb.CreateCollectionRequest @@ -73,14 +91,6 @@ func (t *CreateCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { const defaultShardsNum = 2 @@ -122,15 +132,12 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { } schema.Fields = append(schema.Fields, rowIDField, timeStampField) - collID, _, err := t.core.idAllocator(1) + collID, _, err := t.core.IDAllocator(1) if err != nil { return err } - collTs, err := t.Ts() - if err != nil { - return err - } - partID, _, err := t.core.idAllocator(1) + collTs := t.Req.Base.Timestamp + partID, _, err := t.core.IDAllocator(1) if err != nil { return err } @@ -217,12 +224,15 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { // build DdOperation and save it into etcd, when ddmsg send fail, // system can restore ddmsg from etcd and re-send - ddOpStr, err := EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType) - if err != nil { - return err + ddOp := func(ts typeutil.Timestamp) (string, error) { + if SetDDTimeTimeByMaster { + ddCollReq.Base.Timestamp = ts + ddPartReq.Base.Timestamp = ts + } + return EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType) } - _, err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr) + ts, err := t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOp) if err != nil { return err } @@ -236,6 +246,10 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return err } + if SetDDTimeTimeByMaster { + t.core.SendTimeTick(ts) + } + // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) } @@ -253,14 +267,6 @@ func (t *DropCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DropCollectionReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *DropCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropCollection { return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -281,12 +287,14 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // build DdOperation and save it into etcd, when ddmsg send fail, // system can restore ddmsg from etcd and re-send - ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropCollectionDDType) - if err != nil { - return err + ddOp := func(ts typeutil.Timestamp) (string, error) { + if SetDDTimeTimeByMaster { + ddReq.Base.Timestamp = ts + } + return EncodeDdOperation(&ddReq, nil, DropCollectionDDType) } - _, err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr) + ts, err := t.core.MetaTable.DeleteCollection(collMeta.ID, ddOp) if err != nil { return err } @@ -296,6 +304,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return err } + if SetDDTimeTimeByMaster { + t.core.SendTimeTick(ts) + } + //notify query service to release collection go func() { if err = t.core.ReleaseCollection(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { @@ -324,14 +336,6 @@ func (t *HasCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *HasCollectionReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *HasCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasCollection { return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -359,14 +363,6 @@ func (t *DescribeCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeCollection { return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -415,14 +411,6 @@ func (t *ShowCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *ShowCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowCollections { return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -448,14 +436,6 @@ func (t *CreatePartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreatePartition { return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -464,7 +444,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - partID, _, err := t.core.idAllocator(1) + partID, _, err := t.core.IDAllocator(1) if err != nil { return err } @@ -481,12 +461,14 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { // build DdOperation and save it into etcd, when ddmsg send fail, // system can restore ddmsg from etcd and re-send - ddOpStr, err := EncodeDdOperation(&ddReq, nil, CreatePartitionDDType) - if err != nil { - return err + ddOp := func(ts typeutil.Timestamp) (string, error) { + if SetDDTimeTimeByMaster { + ddReq.Base.Timestamp = ts + } + return EncodeDdOperation(&ddReq, nil, CreatePartitionDDType) } - _, err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr) + ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp) if err != nil { return err } @@ -496,6 +478,10 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return err } + if SetDDTimeTimeByMaster { + t.core.SendTimeTick(ts) + } + // error doesn't matter here t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) @@ -516,14 +502,6 @@ func (t *DropPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DropPartitionReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *DropPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropPartition { return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -549,12 +527,14 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { // build DdOperation and save it into etcd, when ddmsg send fail, // system can restore ddmsg from etcd and re-send - ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropPartitionDDType) - if err != nil { - return err + ddOp := func(ts typeutil.Timestamp) (string, error) { + if SetDDTimeTimeByMaster { + ddReq.Base.Timestamp = ts + } + return EncodeDdOperation(&ddReq, nil, DropPartitionDDType) } - _, _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr) + ts, _, err := t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOp) if err != nil { return err } @@ -564,6 +544,10 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return err } + if SetDDTimeTimeByMaster { + t.core.SendTimeTick(ts) + } + // error doesn't matter here t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) @@ -585,14 +569,6 @@ func (t *HasPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *HasPartitionReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *HasPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasPartition { return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -619,14 +595,6 @@ func (t *ShowPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *ShowPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowPartitions { return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -666,14 +634,6 @@ func (t *DescribeSegmentReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeSegment { return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -726,14 +686,6 @@ func (t *ShowSegmentReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowSegments { return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -773,20 +725,12 @@ func (t *CreateIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *CreateIndexReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreateIndex { return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } indexName := Params.DefaultIndexName //TODO, get name from request - indexID, _, err := t.core.idAllocator(1) + indexID, _, err := t.core.IDAllocator(1) if err != nil { return err } @@ -827,14 +771,6 @@ func (t *DescribeIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool { - return true -} - func (t *DescribeIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeIndex { return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -873,14 +809,6 @@ func (t *DropIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } -func (t *DropIndexReqTask) Ts() (typeutil.Timestamp, error) { - return t.Req.Base.Timestamp, nil -} - -func (t *DropIndexReqTask) IgnoreTimeStamp() bool { - return false -} - func (t *DropIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropIndex { return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])