mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-03 17:31:58 +08:00
DataService publishes flush-completed message into its channel (#5320)
DataService publishes flush-completed message into related msgstream See also: #5220 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
156d5b9f73
commit
7a82462da8
@ -70,6 +70,7 @@ type Server struct {
|
||||
name string
|
||||
}
|
||||
segmentInfoStream msgstream.MsgStream
|
||||
flushMsgStream msgstream.MsgStream
|
||||
insertChannels []string
|
||||
msFactory msgstream.Factory
|
||||
ttBarrier timesync.TimeTickBarrier
|
||||
@ -194,6 +195,15 @@ func (s *Server) initMsgProducer() error {
|
||||
return err
|
||||
}
|
||||
s.msgProducer.Start(s.ctx)
|
||||
// segment flush stream
|
||||
s.flushMsgStream, err = s.msFactory.NewMsgStream(s.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
|
||||
log.Debug("dataservice AsProducer:" + Params.SegmentInfoChannelName)
|
||||
s.flushMsgStream.Start()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -891,6 +901,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
||||
resp.Reason = "server is initializing"
|
||||
return resp, nil
|
||||
}
|
||||
if s.flushMsgStream == nil {
|
||||
resp.Reason = "flush msg stream nil"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// check segment id & collection id matched
|
||||
_, err := s.meta.GetCollection(req.GetCollectionID())
|
||||
@ -926,12 +940,44 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
||||
for k, v := range ddlMeta {
|
||||
meta[k] = v
|
||||
}
|
||||
// Save into k-v store
|
||||
err = s.SaveBinLogMetaTxn(meta)
|
||||
if err != nil {
|
||||
resp.Reason = err.Error()
|
||||
return resp, err
|
||||
}
|
||||
// write flush msg into segmentInfo/flush stream
|
||||
msgPack := composeSegmentFlushMsgPack(req.SegmentID)
|
||||
err = s.flushMsgStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
resp.Reason = err.Error()
|
||||
return resp, err
|
||||
}
|
||||
|
||||
resp.ErrorCode = commonpb.ErrorCode_Success
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack {
|
||||
msgPack := msgstream.MsgPack{
|
||||
Msgs: make([]msgstream.TsMsg, 0, 1),
|
||||
}
|
||||
completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_SegmentFlushDone,
|
||||
MsgID: 0, // TODO
|
||||
Timestamp: 0, // TODO
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
SegmentID: segmentID,
|
||||
}
|
||||
var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
SegmentFlushCompletedMsg: completeFlushMsg,
|
||||
}
|
||||
|
||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||
return msgPack
|
||||
}
|
||||
|
||||
@ -525,6 +525,9 @@ func TestSaveBinlogPaths(t *testing.T) {
|
||||
t.Run("Normal SaveRequest", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
Timestamp: uint64(time.Now().Unix()),
|
||||
},
|
||||
SegmentID: 2,
|
||||
CollectionID: 0,
|
||||
Field2BinlogPaths: &datapb.ID2PathList{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user