diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 505820e970..7f7bf15046 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -145,25 +145,52 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs .. unIndexedIDs.Insert(s.GetID()) } } - hasUnIndexed := true - for hasUnIndexed { - hasUnIndexed = false - for id := range unIndexedIDs { - // Indexed segments are compacted to a raw segment, - // replace it with the indexed ones - if len(segmentInfos[id].GetCompactionFrom()) > 0 { - unIndexedIDs.Remove(id) - for _, segID := range segmentInfos[id].GetCompactionFrom() { - if indexed.Contain(segID) { - indexedIDs.Insert(segID) - } else { - unIndexedIDs.Insert(segID) - hasUnIndexed = true - } - } - droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + // ================================================ + // Segments blood relationship: + // a b + // \ / + // c d + // \ / + // e + // + // GC: a, b + // Indexed: c, d, e + // || + // || (Index dropped and creating new index and not finished) + // \/ + // UnIndexed: c, d, e + // + // Retrieve unIndexed expected result: + // unIndexed: c, d + // ================================================ + isValid := func(ids ...UniqueID) bool { + for _, id := range ids { + if seg, ok := segmentInfos[id]; !ok || seg == nil { + return false } } + return true + } + retrieveUnIndexed := func() bool { + continueRetrieve := false + for id := range unIndexedIDs { + compactionFrom := segmentInfos[id].GetCompactionFrom() + if len(compactionFrom) > 0 && isValid(compactionFrom...) { + for _, fromID := range compactionFrom { + if indexed.Contain(fromID) { + indexedIDs.Insert(fromID) + } else { + unIndexedIDs.Insert(fromID) + continueRetrieve = true + } + } + unIndexedIDs.Remove(id) + droppedIDs.Remove(compactionFrom...) + } + } + return continueRetrieve + } + for retrieveUnIndexed() { } return &datapb.VchannelInfo{ diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 111b253143..dd2fcecf72 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2371,6 +2371,253 @@ func TestGetQueryVChanPositions(t *testing.T) { }) } +func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { + t.Run("ab GC-ed, cde unIndexed", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.Nil(t, err) + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(NewSegmentInfo(c)) + assert.Nil(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(d)) + assert.Nil(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + } + + err = svr.meta.AddSegment(NewSegmentInfo(e)) + assert.Nil(t, err) + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d + }) + + t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.Nil(t, err) + a := &datapb.SegmentInfo{ + ID: 99, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(a)) + assert.Nil(t, err) + + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(NewSegmentInfo(c)) + assert.Nil(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(d)) + assert.Nil(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + } + + err = svr.meta.AddSegment(NewSegmentInfo(e)) + assert.Nil(t, err) + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d + }) + + t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.Nil(t, err) + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(NewSegmentInfo(c)) + assert.Nil(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(NewSegmentInfo(d)) + assert.Nil(t, err) + err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: 2, + BuildID: 1, + IndexID: 1, + }) + assert.Nil(t, err) + err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{ + BuildID: 1, + State: commonpb.IndexState_Finished, + }) + assert.Nil(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + } + err = svr.meta.AddSegment(NewSegmentInfo(e)) + assert.Nil(t, err) + err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: 3, + BuildID: 2, + IndexID: 1, + }) + assert.Nil(t, err) + err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{ + BuildID: 2, + State: commonpb.IndexState_Finished, + }) + assert.Nil(t, err) + + vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e + }) +} + func TestShouldDropChannel(t *testing.T) { type myRootCoord struct { mocks.RootCoord