diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 7261ee2e1e..73401bc092 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -444,7 +444,7 @@ func (c *ChannelManager) Watch(ch *channel) error { // fillChannelWatchInfo updates the channel op by filling in channel watch info. func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { for _, ch := range op.Channels { - vcInfo := c.h.GetVChanPositions(ch, allPartitionID) + vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID) info := &datapb.ChannelWatchInfo{ Vchan: vcInfo, StartTs: time.Now().Unix(), @@ -462,7 +462,7 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data startTs := time.Now().Unix() timeoutTs := time.Now().Add(maxWatchDuration).UnixNano() for _, ch := range op.Channels { - vcInfo := c.h.GetVChanPositions(ch, allPartitionID) + vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID) info := &datapb.ChannelWatchInfo{ Vchan: vcInfo, StartTs: startTs, diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index a53a11a21f..26881260c9 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -30,8 +30,10 @@ import ( // Handler handles some channel method for ChannelManager type Handler interface { - // GetVChanPositions gets the information recovery needed of a channel - GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo + // GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord + GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo + // GetDataVChanPositions gets the information recovery needed of a channel for DataNode + GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(channel string) bool FinishDropChannel(channel string) } @@ -46,10 +48,79 @@ func newServerHandler(s *Server) *ServerHandler { return &ServerHandler{s: s} } -// GetVChanPositions gets vchannel latest postitions with provided dml channel names, +// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode. +func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { + segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { + return s.InsertChannel == channel.Name + }) + log.Info("GetDataVChanPositions", + zap.Int64("collectionID", channel.CollectionID), + zap.String("channel", channel.Name), + zap.Int("numOfSegments", len(segments)), + ) + var ( + flushedIDs = make(typeutil.UniqueSet) + unflushedIDs = make(typeutil.UniqueSet) + droppedIDs = make(typeutil.UniqueSet) + seekPosition *internalpb.MsgPosition + ) + for _, s := range segments { + if (partitionID > allPartitionID && s.PartitionID != partitionID) || + (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { + continue + } + if s.GetIsImporting() { + // Skip bulk load segments. + continue + } + + if s.GetState() == commonpb.SegmentState_Dropped { + droppedIDs.Insert(s.GetID()) + continue + } else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed { + flushedIDs.Insert(s.GetID()) + } else { + unflushedIDs.Insert(s.GetID()) + } + + var segmentPosition *internalpb.MsgPosition + if s.GetDmlPosition() != nil { + segmentPosition = s.GetDmlPosition() + } else { + segmentPosition = s.GetStartPosition() + } + if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { + seekPosition = segmentPosition + } + } + + // use collection start position when segment position is not found + if seekPosition == nil { + if channel.StartPositions == nil { + collection := h.GetCollection(h.s.ctx, channel.CollectionID) + if collection != nil { + seekPosition = getCollectionStartPosition(channel.Name, collection) + } + } else { + // use passed start positions, skip to ask rootcoord. + seekPosition = toMsgPosition(channel.Name, channel.StartPositions) + } + } + + return &datapb.VchannelInfo{ + CollectionID: channel.CollectionID, + ChannelName: channel.Name, + SeekPosition: seekPosition, + FlushedSegmentIds: flushedIDs.Collect(), + UnflushedSegmentIds: unflushedIDs.Collect(), + DroppedSegmentIds: droppedIDs.Collect(), + } +} + +// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord, // we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments, // the unflushed segments are actually the segments without index, even they are flushed. -func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { +func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { // cannot use GetSegmentsByChannel since dropped segments are needed here segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { return s.InsertChannel == channel.Name @@ -60,10 +131,10 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID for _, segment := range indexedSegments { indexed.Insert(segment.GetID()) } - log.Info("GetSegmentsByChannel", - zap.Any("collectionID", channel.CollectionID), - zap.Any("channel", channel), - zap.Any("numOfSegments", len(segments)), + log.Info("GetQueryVChanPositions", + zap.Int64("collectionID", channel.CollectionID), + zap.String("channel", channel.Name), + zap.Int("numOfSegments", len(segments)), ) var ( indexedIDs = make(typeutil.UniqueSet) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 40fc6bfe32..a8cb273850 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -736,7 +736,14 @@ func newMockHandler() *mockHandler { return &mockHandler{} } -func (h *mockHandler) GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { +func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { + return &datapb.VchannelInfo{ + CollectionID: channel.CollectionID, + ChannelName: channel.Name, + } +} + +func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { return &datapb.VchannelInfo{ CollectionID: channel.CollectionID, ChannelName: channel.Name, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 1c774bb42e..5b5f073224 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1130,7 +1130,6 @@ func TestSaveBinlogPaths(t *testing.T) { err := svr.meta.AddSegment(NewSegmentInfo(s)) assert.Nil(t, err) } - svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) err := svr.channelManager.AddNode(0) assert.Nil(t, err) @@ -1301,7 +1300,6 @@ func TestDropVirtualChannel(t *testing.T) { err := svr.meta.AddSegment(NewSegmentInfo(s)) assert.Nil(t, err) } - svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) // add non matched segments os := &datapb.SegmentInfo{ ID: maxOperationsPerTxn + 100, @@ -1646,7 +1644,127 @@ func TestDataNodeTtChannel(t *testing.T) { }) } -func TestGetVChannelPos(t *testing.T) { +func TestGetDataVChanPositions(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 0, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, + }) + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + + s1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + }, + } + err := svr.meta.AddSegment(NewSegmentInfo(s1)) + require.Nil(t, err) + s2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(s2)) + require.Nil(t, err) + s3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + Timestamp: 2, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(s3)) + require.Nil(t, err) + + t.Run("get unexisted channel", func(t *testing.T) { + vchan := svr.handler.GetDataVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) + assert.Empty(t, vchan.UnflushedSegmentIds) + assert.Empty(t, vchan.FlushedSegmentIds) + }) + + t.Run("get existed channel", func(t *testing.T) { + vchan := svr.handler.GetDataVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) + assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{s2.ID, s3.ID}, vchan.UnflushedSegmentIds) + assert.EqualValues(t, []byte{1, 2, 3}, vchan.GetSeekPosition().GetMsgID()) + }) + + t.Run("empty collection", func(t *testing.T) { + infos := svr.handler.GetDataVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) + assert.EqualValues(t, 1, infos.CollectionID) + assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) + assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) + assert.EqualValues(t, []byte{8, 9, 10}, infos.SeekPosition.MsgID) + }) + + t.Run("filter partition", func(t *testing.T) { + infos := svr.handler.GetDataVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) + assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) + assert.EqualValues(t, []byte{11, 12, 13}, infos.SeekPosition.MsgID) + }) + + t.Run("empty collection with passed positions", func(t *testing.T) { + vchannel := "ch_no_segment_1" + pchannel := funcutil.ToPhysicalChannel(vchannel) + infos := svr.handler.GetDataVChanPositions(&channel{ + Name: vchannel, + CollectionID: 0, + StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, + }, allPartitionID) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, vchannel, infos.ChannelName) + assert.EqualValues(t, []byte{14, 15, 16}, infos.SeekPosition.MsgID) + }) +} + +func TestGetQueryVChanPositions(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) schema := newTestSchema() @@ -1746,21 +1864,22 @@ func TestGetVChannelPos(t *testing.T) { svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) t.Run("get unexisted channel", func(t *testing.T) { - vchan := svr.handler.GetVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) assert.Empty(t, vchan.UnflushedSegmentIds) assert.Empty(t, vchan.FlushedSegmentIds) }) t.Run("get existed channel", func(t *testing.T) { - vchan := svr.handler.GetVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{s2.ID, s3.ID}, vchan.UnflushedSegmentIds) assert.EqualValues(t, []byte{1, 2, 3}, vchan.GetSeekPosition().GetMsgID()) }) t.Run("empty collection", func(t *testing.T) { - infos := svr.handler.GetVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) + infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) assert.EqualValues(t, 1, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) @@ -1768,7 +1887,7 @@ func TestGetVChannelPos(t *testing.T) { }) t.Run("filter partition", func(t *testing.T) { - infos := svr.handler.GetVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1) + infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1) assert.EqualValues(t, 0, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) @@ -1778,7 +1897,7 @@ func TestGetVChannelPos(t *testing.T) { t.Run("empty collection with passed positions", func(t *testing.T) { vchannel := "ch_no_segment_1" pchannel := funcutil.ToPhysicalChannel(vchannel) - infos := svr.handler.GetVChanPositions(&channel{ + infos := svr.handler.GetQueryVChanPositions(&channel{ Name: vchannel, CollectionID: 0, StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, @@ -1787,6 +1906,18 @@ func TestGetVChannelPos(t *testing.T) { assert.EqualValues(t, vchannel, infos.ChannelName) assert.EqualValues(t, []byte{14, 15, 16}, infos.SeekPosition.MsgID) }) + + t.Run("filter non indexed segments", func(t *testing.T) { + svr.indexCoord = mocks.NewMockIndexCoord(t) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return( + &indexpb.GetIndexInfoResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil) + + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 3, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{s1.ID, s2.ID, s3.ID}, vchan.UnflushedSegmentIds) + assert.EqualValues(t, []byte{1, 2, 3}, vchan.GetSeekPosition().GetMsgID()) + }) } func TestShouldDropChannel(t *testing.T) { @@ -2925,7 +3056,6 @@ func TestDataCoord_SaveImportSegment(t *testing.T) { NodeID: 110, Address: "localhost:8080", }) - svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) err := svr.channelManager.AddNode(110) assert.Nil(t, err) err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 100}) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b2cc9ff570..6b9057d7f5 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -609,7 +609,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) flushedIDs := make(typeutil.UniqueSet) for _, c := range channels { - channelInfo := s.handler.GetVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) + channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) channelInfos = append(channelInfos, channelInfo) log.Debug("datacoord append channelInfo in GetRecoveryInfo", zap.Any("channelInfo", channelInfo),