diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index cfae3a7e35..3914e86a9c 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -138,79 +138,27 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni unIndexedIDs.Insert(s.GetID()) } } - for id := range unIndexedIDs { - // Indexed segments are compacted to a raw segment, - // replace it with the indexed ones - if len(segmentInfos[id].GetCompactionFrom()) > 0 && - indexed.Contain(segmentInfos[id].GetCompactionFrom()...) { - unIndexedIDs.Remove(id) - indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...) - droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + hasUnIndexed := true + for hasUnIndexed { + hasUnIndexed = false + for id := range unIndexedIDs { + // Indexed segments are compacted to a raw segment, + // replace it with the indexed ones + if len(segmentInfos[id].GetCompactionFrom()) > 0 { + unIndexedIDs.Remove(id) + for _, segID := range segmentInfos[id].GetCompactionFrom() { + if indexed.Contain(segID) { + indexedIDs.Insert(segID) + } else { + unIndexedIDs.Insert(segID) + hasUnIndexed = true + } + } + droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + } } } - //var ( - // indexedIDs = make(typeutil.UniqueSet) - // unIndexedIDs = make(typeutil.UniqueSet) - // growingIDs = make(typeutil.UniqueSet) - // droppedIDs = make(typeutil.UniqueSet) - //) - //for _, s := range segments { - // if (partitionID > allPartitionID && s.PartitionID != partitionID) || - // (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { - // continue - // } - // if s.GetIsImporting() { - // // Skip bulk insert segments. - // continue - // } - // segmentInfos[s.GetID()] = s - // if s.GetState() == commonpb.SegmentState_Dropped { - // droppedIDs.Insert(s.GetID()) - // } else if s.GetState() == commonpb.SegmentState_Growing { - // growingIDs.Insert(s.GetID()) - // } else if indexed.Contain(s.GetID()) { - // indexedIDs.Insert(s.GetID()) - // } else { - // unIndexedIDs.Insert(s.GetID()) - // } - //} - //hasUnIndexed := true - //for hasUnIndexed { - // hasUnIndexed = false - // for id := range unIndexedIDs { - // // Indexed segments are compacted to a raw segment, - // // replace it with the indexed ones - // if indexed.Contain(id) { - // unIndexedIDs.Remove(id) - // indexedIDs.Insert(id) - // continue - // } - // if len(segmentInfos[id].GetCompactionFrom()) > 0 { - // unIndexedIDs.Remove(id) - // for _, segID := range segmentInfos[id].GetCompactionFrom() { - // if indexed.Contain(segID) { - // indexedIDs.Insert(segID) - // } else { - // unIndexedIDs.Insert(id) - // hasUnIndexed = true - // } - // } - // droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) - // } - // } - //} - // - //return &datapb.VchannelInfo{ - // CollectionID: channel.CollectionID, - // ChannelName: channel.Name, - // SeekPosition: h.GetChannelSeekPosition(channel, partitionID), - // IndexedSegmentIds: indexed.Collect(), - // FlushedSegmentIds: unIndexedIDs.Collect(), - // UnflushedSegmentIds: growingIDs.Collect(), - // DroppedSegmentIds: droppedIDs.Collect(), - //} - return &datapb.VchannelInfo{ CollectionID: channel.CollectionID, ChannelName: channel.Name, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 2f901939c6..2d2f643234 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2697,6 +2697,87 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) + t.Run("with continuous compaction", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + }) + assert.NoError(t, err) + + seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) + seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3.CompactionFrom = []int64{9, 10} + seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + seg5.CompactionFrom = []int64{11, 12} + err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg3)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg4)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg5)) + assert.Nil(t, err) + err = svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 0, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.Nil(t, err) + svr.meta.segments.SetSegmentIndex(seg4.ID, &model.SegmentIndex{ + SegmentID: seg4.ID, + CollectionID: 0, + PartitionID: 0, + NumRows: 100, + IndexID: 0, + BuildID: 0, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.NotNil(t, resp.GetChannels()[0].SeekPosition) + assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) + assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) + assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) + }) + t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 322a64fb45..ad4fe7bbc7 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -483,6 +483,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, segmentsToSync = append(segmentsToSync, task.segmentID) ibNode.channel.rollInsertBuffer(task.segmentID) ibNode.channel.RollPKstats(task.segmentID, pkStats) + ibNode.channel.setSegmentLastSyncTs(task.segmentID, endPosition.GetTimestamp()) metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc() if task.auto { diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 2816d6daa9..f99c6ae596 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/samber/lo" "github.com/stretchr/testify/assert" @@ -228,7 +229,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { } inMsg := genFlowGraphInsertMsg(insertChannelName) - assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + iBNode.channel.setSegmentLastSyncTs(UniqueID(1), tsoutil.ComposeTSByTime(time.Now().Add(-11*time.Minute), 0)) + assert.NotPanics(t, func() { + res := iBNode.Operate([]flowgraph.Msg{&inMsg}) + assert.Subset(t, res[0].(*flowGraphMsg).segmentsToSync, []UniqueID{1}) + }) + assert.NotSubset(t, iBNode.channel.listSegmentIDsToSync(tsoutil.ComposeTSByTime(time.Now(), 0)), []UniqueID{1}) resendTTChan <- resendTTMsg{ segmentIDs: []int64{0, 1, 2}, diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 8662af4718..8a60e87d28 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -864,6 +864,5 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet dsService.flushingSegCache.Remove(req.GetSegmentID()) dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos) dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos) - dsService.channel.setSegmentLastSyncTs(req.GetSegmentID(), pack.pos.GetTimestamp()) } } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 0442ae8571..053bbbe1a9 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -898,7 +898,7 @@ func genFlowGraphInsertMsg(chanName string) flowGraphMsg { { ChannelName: chanName, MsgID: make([]byte, 0), - Timestamp: 0, + Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), }, }