diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index c041d16c27..f74f4a9d28 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -519,6 +519,31 @@ func (c *Core) setDdMsgSendFlag(b bool) error { return err } +func (c *Core) startMsgStreamAndSeek(chanName string, subName string, key string) (*ms.MsgStream, error) { + stream, err := c.msFactory.NewMsgStream(c.ctx) + if err != nil { + return nil, err + } + stream.AsConsumer([]string{chanName}, subName) + log.Debug("AsConsumer: " + chanName + ":" + subName) + + msgPosStr, err := c.MetaTable.client.Load(key, 0) + if err == nil { + msgPositions := make([]*ms.MsgPosition, 0) + if err := DecodeMsgPositions(msgPosStr, &msgPositions); err != nil { + return nil, fmt.Errorf("decode msg positions fail, err %s", err.Error()) + } + if len(msgPositions) > 0 { + if err := stream.Seek(msgPositions); err != nil { + return nil, fmt.Errorf("msg stream seek fail, err %s", err.Error()) + } + log.Debug("msg stream: " + chanName + ":" + subName + " seek to stored position") + } + } + stream.Start() + return &stream, nil +} + func (c *Core) setMsgStreams() error { if Params.PulsarAddress == "" { return fmt.Errorf("PulsarAddress is empty") @@ -689,20 +714,22 @@ func (c *Core) setMsgStreams() error { } // data service will put msg into this channel when create segment - dsStream, _ := c.msFactory.NewMsgStream(c.ctx) + dsChanName := Params.DataServiceSegmentChannel dsSubName := Params.MsgChannelSubName + "ds" - dsStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dsSubName) - log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dsSubName) - dsStream.Start() - c.DataServiceSegmentChan = dsStream.Chan() + dsStream, err := c.startMsgStreamAndSeek(dsChanName, dsSubName, SegInfoMsgEndPosPrefix) + if err != nil { + return err + } + c.DataServiceSegmentChan = (*dsStream).Chan() // data node will put msg into this channel when flush segment - dnStream, _ := c.msFactory.NewMsgStream(c.ctx) + dnChanName := Params.DataServiceSegmentChannel dnSubName := Params.MsgChannelSubName + "dn" - dnStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dnSubName) - log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dnSubName) - dnStream.Start() - c.DataNodeFlushedSegmentChan = dnStream.Chan() + dnStream, err := c.startMsgStreamAndSeek(dnChanName, dnSubName, FlushedSegMsgEndPosPrefix) + if err != nil { + return err + } + c.DataNodeFlushedSegmentChan = (*dnStream).Chan() return nil } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 66b90f21b9..c7cd873b8c 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -199,11 +199,58 @@ func consumeMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack) } } +func GenSegInfoMsgPack(seg *datapb.SegmentInfo) *msgstream.MsgPack { + msgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + } + segMsg := &msgstream.SegmentInfoMsg{ + BaseMsg: baseMsg, + SegmentMsg: datapb.SegmentMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + Segment: seg, + }, + } + msgPack.Msgs = append(msgPack.Msgs, segMsg) + return &msgPack +} + +func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack { + msgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + } + segMsg := &msgstream.FlushCompletedMsg{ + BaseMsg: baseMsg, + SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentFlushDone, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + SegmentID: segID, + }, + } + msgPack.Msgs = append(msgPack.Msgs, segMsg) + return &msgPack +} + func TestMasterService(t *testing.T) { const ( dbName = "testDb" collName = "testColl" partName = "testPartition" + segID = 1001 ) ctx, cancel := context.WithCancel(context.Background()) @@ -278,15 +325,9 @@ func TestMasterService(t *testing.T) { err = core.SetQueryService(qm) assert.Nil(t, err) - err = core.Init() - assert.Nil(t, err) - - err = core.Start() - assert.Nil(t, err) - m := map[string]interface{}{ - "receiveBufSize": 1024, "pulsarAddress": Params.PulsarAddress, + "receiveBufSize": 1024, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -305,6 +346,25 @@ func TestMasterService(t *testing.T) { ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) ddStream.Start() + // test dataServiceSegmentStream seek + dataNodeSubName := Params.MsgChannelSubName + "dn" + flushedSegStream, _ := msFactory.NewMsgStream(ctx) + flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName) + flushedSegStream.Start() + msgPack := GenFlushedSegMsgPack(9999) + err = dataServiceSegmentStream.Produce(msgPack) + assert.Nil(t, err) + flushedSegMsgPack := flushedSegStream.Consume() + flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions) + _, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr) + assert.Nil(t, err) + + err = core.Init() + assert.Nil(t, err) + + err = core.Start() + assert.Nil(t, err) + time.Sleep(time.Second) t.Run("time tick", func(t *testing.T) { @@ -458,7 +518,7 @@ func TestMasterService(t *testing.T) { ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation - err = json.Unmarshal([]byte(ddOpStr), &ddOp) + err = DecodeDdOperation(ddOpStr, &ddOp) assert.Nil(t, err) assert.Equal(t, CreateCollectionDDType, ddOp.Type) @@ -602,7 +662,7 @@ func TestMasterService(t *testing.T) { ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation - err = json.Unmarshal([]byte(ddOpStr), &ddOp) + err = DecodeDdOperation(ddOpStr, &ddOp) assert.Nil(t, err) assert.Equal(t, CreatePartitionDDType, ddOp.Type) @@ -665,27 +725,8 @@ func TestMasterService(t *testing.T) { CollectionID: coll.ID, PartitionID: part.PartitionID, } - - msgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - } - segMsg := &msgstream.SegmentInfoMsg{ - BaseMsg: baseMsg, - SegmentMsg: datapb.SegmentMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - Segment: seg, - }, - } - msgPack.Msgs = append(msgPack.Msgs, segMsg) - err = dataServiceSegmentStream.Broadcast(&msgPack) + segInfoMsgPack := GenSegInfoMsgPack(seg) + err = dataServiceSegmentStream.Broadcast(segInfoMsgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -821,31 +862,12 @@ func TestMasterService(t *testing.T) { assert.Equal(t, 1, len(part.SegmentIDs)) seg := &datapb.SegmentInfo{ - ID: 1001, + ID: segID, CollectionID: coll.ID, PartitionID: part.PartitionID, } - - msgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - } - segMsg := &msgstream.SegmentInfoMsg{ - BaseMsg: baseMsg, - SegmentMsg: datapb.SegmentMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - Segment: seg, - }, - } - msgPack.Msgs = append(msgPack.Msgs, segMsg) - err = dataServiceSegmentStream.Broadcast(&msgPack) + segInfoMsgPack := GenSegInfoMsgPack(seg) + err = dataServiceSegmentStream.Broadcast(segInfoMsgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -853,20 +875,8 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 2, len(part.SegmentIDs)) - flushMsg := &msgstream.FlushCompletedMsg{ - BaseMsg: baseMsg, - SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentFlushDone, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - SegmentID: 1001, - }, - } - msgPack.Msgs = []msgstream.TsMsg{flushMsg} - err = dataServiceSegmentStream.Broadcast(&msgPack) + flushedSegMsgPack := GenFlushedSegMsgPack(segID) + err = dataServiceSegmentStream.Broadcast(flushedSegMsgPack) assert.Nil(t, err) time.Sleep(time.Second) @@ -1007,7 +1017,7 @@ func TestMasterService(t *testing.T) { ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation - err = json.Unmarshal([]byte(ddOpStr), &ddOp) + err = DecodeDdOperation(ddOpStr, &ddOp) assert.Nil(t, err) assert.Equal(t, DropPartitionDDType, ddOp.Type) @@ -1081,7 +1091,7 @@ func TestMasterService(t *testing.T) { ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation - err = json.Unmarshal([]byte(ddOpStr), &ddOp) + err = DecodeDdOperation(ddOpStr, &ddOp) assert.Nil(t, err) assert.Equal(t, DropCollectionDDType, ddOp.Type) diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 0b2513e089..bab84dccc2 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -41,8 +41,10 @@ const ( TimestampPrefix = ComponentPrefix + "/timestamp" - MsgStartPositionPrefix = ComponentPrefix + "/msg-start-position" - MsgEndPositionPrefix = ComponentPrefix + "/msg-end-position" + SegInfoMsgStartPosPrefix = ComponentPrefix + "/seg-info-msg-start-position" + SegInfoMsgEndPosPrefix = ComponentPrefix + "/seg-info-msg-end-position" + FlushedSegMsgStartPosPrefix = ComponentPrefix + "/flushed-seg-msg-start-position" + FlushedSegMsgEndPosPrefix = ComponentPrefix + "/flushed-seg-msg-end-position" DDOperationPrefix = ComponentPrefix + "/dd-operation" DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" @@ -735,9 +737,10 @@ func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos stri meta[k] = v } + // AddSegment is invoked from DataService if msgStartPos != "" && msgEndPos != "" { - meta[MsgStartPositionPrefix] = msgStartPos - meta[MsgEndPositionPrefix] = msgEndPos + meta[SegInfoMsgStartPosPrefix] = msgStartPos + meta[SegInfoMsgEndPosPrefix] = msgEndPos } ts, err := mt.client.MultiSave(meta, nil) @@ -803,9 +806,10 @@ func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos st meta[k] = v } + // AddIndex is invoked from DataNode flush operation if msgStartPos != "" && msgEndPos != "" { - meta[MsgStartPositionPrefix] = msgStartPos - meta[MsgEndPositionPrefix] = msgEndPos + meta[FlushedSegMsgStartPosPrefix] = msgStartPos + meta[FlushedSegMsgEndPosPrefix] = msgEndPos } ts, err := mt.client.MultiSave(meta, nil) diff --git a/internal/masterservice/util.go b/internal/masterservice/util.go index 52b32bbd68..7e7cd12dc8 100644 --- a/internal/masterservice/util.go +++ b/internal/masterservice/util.go @@ -86,7 +86,12 @@ func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string return string(ddOpByte), nil } -// SegmentIndexInfoEqual return true if 2 SegmentIndexInfo are identical +// DecodeDdOperation deserialize string to DdOperation +func DecodeDdOperation(str string, ddOp *DdOperation) error { + return json.Unmarshal([]byte(str), ddOp) +} + +// SegmentIndexInfoEqual return true if SegmentIndexInfos are identical func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.SegmentIndexInfo) bool { return info1.SegmentID == info2.SegmentID && info1.FieldID == info2.FieldID && @@ -106,3 +111,11 @@ func EncodeMsgPositions(msgPositions []*msgstream.MsgPosition) (string, error) { } return string(resByte), nil } + +// DecodeMsgPositions deserialize string to []*MsgPosition +func DecodeMsgPositions(str string, msgPositions *[]*msgstream.MsgPosition) error { + if str == "" || str == "null" { + return nil + } + return json.Unmarshal([]byte(str), msgPositions) +}