diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 0ae1b4abc1..ad3d2ae65b 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -250,7 +250,7 @@ func (c *ChannelManager) Watch(ch *channel) error { func (c *ChannelManager) fillChannelPosition(update *ChannelOp) { for _, ch := range update.Channels { - vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, true) + vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, false) info := &datapb.ChannelWatchInfo{ Vchan: vchan, StartTs: time.Now().Unix(), diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 0794caec63..5695276472 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -683,29 +683,30 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr flushed := make([]*datapb.SegmentInfo, 0) unflushed := make([]*datapb.SegmentInfo, 0) var seekPosition *internalpb.MsgPosition - var useUnflushedPosition bool for _, s := range segments { if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed { flushed = append(flushed, trimSegmentInfo(s.SegmentInfo)) - if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) { + if seekPosition == nil || (s.DmlPosition.Timestamp < seekPosition.Timestamp) { seekPosition = s.DmlPosition } continue } - if s.DmlPosition == nil { + if s.DmlPosition == nil { // segment position all nil continue } unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo)) - if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp { - useUnflushedPosition = true - if !seekFromStartPosition { - seekPosition = s.DmlPosition - } else { - seekPosition = s.StartPosition - } + segmentPosition := s.DmlPosition + if seekFromStartPosition { + // need to use start position when load collection/partition, querynode does not support seek from checkpoint yet + // TODO silverxia remove seek from start logic after checkpoint supported in querynode + segmentPosition = s.StartPosition + } + + if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { + seekPosition = segmentPosition } } // use collection start position when segment position is not found diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index d4314bed59..9cae21e78f 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1179,7 +1179,7 @@ func TestGetRecoveryInfo(t *testing.T) { } } - t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) { + t.Run("test get earliest position of flushed segments as seek position", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) @@ -1204,7 +1204,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, 1, len(resp.GetChannels())) assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegments())) assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments()) - assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + assert.EqualValues(t, 10, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) t.Run("test get recovery of unflushed segments ", func(t *testing.T) { @@ -1232,6 +1232,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, 0, len(resp.GetBinlogs())) assert.EqualValues(t, 1, len(resp.GetChannels())) assert.NotNil(t, resp.GetChannels()[0].SeekPosition) + assert.EqualValues(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) t.Run("test get binlogs", func(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index ca7c4d9f33..fb70322fa8 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -476,7 +476,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf channels := dresp.GetVirtualChannelNames() channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) for _, c := range channels { - channelInfo := s.GetVChanPositions(c, collectionID, false) + channelInfo := s.GetVChanPositions(c, collectionID, true) channelInfos = append(channelInfos, channelInfo) }