diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index f9f82bc426..c5b1d33b83 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -97,11 +97,13 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni } } -// GetQueryVChanPositions gets vchannel latest positions with provided dml channel names for QueryCoord, -// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments, -// the unflushed segments are actually the segments without index, even they are flushed. +// GetQueryVChanPositions gets vchannel latest positions with provided dml channel names for QueryCoord. +// unflushend segmentIDs ---> L1, growing segments +// flushend segmentIDs ---> L1&L2, flushed segments, including indexed or unindexed +// dropped segmentIDs ---> dropped segments +// level zero segmentIDs ---> L0 segments func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo { - // cannot use GetSegmentsByChannel since dropped segments are needed here + partStatsVersionsMap := make(map[int64]int64) validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID }) if len(validPartitions) <= 0 { collInfo, err := h.s.handler.GetCollection(h.s.ctx, channel.GetCollectionID()) @@ -111,25 +113,26 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . } validPartitions = collInfo.Partitions } - partStatsVersionsMap := make(map[int64]int64) + for _, partitionID := range validPartitions { + currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName()) + partStatsVersionsMap[partitionID] = currentPartitionStatsVersion + } + var ( - indexedIDs = make(typeutil.UniqueSet) + flushedIDs = make(typeutil.UniqueSet) droppedIDs = make(typeutil.UniqueSet) growingIDs = make(typeutil.UniqueSet) levelZeroIDs = make(typeutil.UniqueSet) ) + // cannot use GetSegmentsByChannel since dropped segments are needed here segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName()) - segmentInfos := make(map[int64]*SegmentInfo) + validSegmentInfos := make(map[int64]*SegmentInfo) indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...) - indexed := make(typeutil.UniqueSet) - for _, segment := range indexedSegments { - indexed.Insert(segment.GetID()) - } + indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...) unIndexedIDs := make(typeutil.UniqueSet) - for _, s := range segments { if s.GetStartPosition() == nil && s.GetDmlPosition() == nil { continue @@ -147,7 +150,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . continue } - segmentInfos[s.GetID()] = s + validSegmentInfos[s.GetID()] = s switch { case s.GetState() == commonpb.SegmentState_Dropped: if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion { @@ -155,7 +158,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // it must have been indexed, this is guaranteed by clustering compaction process // this is to ensure that the current valid L2 compaction produce is available to search/query // to avoid insufficient data - indexedIDs.Insert(s.GetID()) + flushedIDs.Insert(s.GetID()) continue } droppedIDs.Insert(s.GetID()) @@ -163,11 +166,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . growingIDs.Insert(s.GetID()) case s.GetLevel() == datapb.SegmentLevel_L0: levelZeroIDs.Insert(s.GetID()) - case indexed.Contain(s.GetID()): - indexedIDs.Insert(s.GetID()) - case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed - indexedIDs.Insert(s.GetID()) + case indexed.Contain(s.GetID()) || s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): + // fill in indexed segments into flushed directly + flushedIDs.Insert(s.GetID()) default: + // unIndexed segments will be checked if it's parents are all indexed unIndexedIDs.Insert(s.GetID()) } } @@ -178,10 +181,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // \ / // c d // \ / - // e + // / \ + // e f // // GC: a, b - // Indexed: c, d, e + // Indexed: c, d, e, f // || // || (Index dropped and creating new index and not finished) // \/ @@ -192,26 +196,33 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // ================================================ isValid := func(ids ...UniqueID) bool { for _, id := range ids { - if seg, ok := segmentInfos[id]; !ok || seg == nil { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil { return false } } return true } + retrieveUnIndexed := func() bool { continueRetrieve := false for id := range unIndexedIDs { - compactionFrom := segmentInfos[id].GetCompactionFrom() + compactionFrom := validSegmentInfos[id].GetCompactionFrom() + compactTos := []UniqueID{} // neighbors and itself if len(compactionFrom) > 0 && isValid(compactionFrom...) { for _, fromID := range compactionFrom { + if len(compactTos) == 0 { + compactToInfo, _ := h.s.meta.GetCompactionTo(fromID) + compactTos = lo.Map(compactToInfo, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() }) + } if indexed.Contain(fromID) { - indexedIDs.Insert(fromID) + flushedIDs.Insert(fromID) } else { unIndexedIDs.Insert(fromID) continueRetrieve = true } } - unIndexedIDs.Remove(id) + unIndexedIDs.Remove(compactTos...) + flushedIDs.Remove(compactTos...) droppedIDs.Remove(compactionFrom...) } } @@ -221,21 +232,15 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . } // unindexed is flushed segments as well - indexedIDs.Insert(unIndexedIDs.Collect()...) - - for _, partitionID := range validPartitions { - currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName()) - partStatsVersionsMap[partitionID] = currentPartitionStatsVersion - } + flushedIDs.Insert(unIndexedIDs.Collect()...) log.Info("GetQueryVChanPositions", zap.Int64("collectionID", channel.GetCollectionID()), zap.String("channel", channel.GetName()), zap.Int("numOfSegments", len(segments)), - zap.Int("indexed segment", len(indexedSegments)), - zap.Int("result indexed", len(indexedIDs)), - zap.Int("result unIndexed", len(unIndexedIDs)), + zap.Int("result flushed", len(flushedIDs)), zap.Int("result growing", len(growingIDs)), + zap.Int("result L0", len(levelZeroIDs)), zap.Any("partition stats", partStatsVersionsMap), ) @@ -243,7 +248,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . CollectionID: channel.GetCollectionID(), ChannelName: channel.GetName(), SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...), - FlushedSegmentIds: indexedIDs.Collect(), + FlushedSegmentIds: flushedIDs.Collect(), UnflushedSegmentIds: growingIDs.Collect(), DroppedSegmentIds: droppedIDs.Collect(), LevelZeroSegmentIds: levelZeroIDs.Collect(), diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go new file mode 100644 index 0000000000..6ec69eede6 --- /dev/null +++ b/internal/datacoord/handler_test.go @@ -0,0 +1,731 @@ +package datacoord + +import ( + "context" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/metastore/model" + mocks2 "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/proto/workerpb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + + channel := "ch1" + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: channel, + Data: []byte{8, 9, 10}, + }, + }, + }) + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + CollectionID: 1, + FieldID: 2, + IndexID: 1, + }) + require.NoError(t, err) + + segArgs := []struct { + segID int64 + state commonpb.SegmentState + level datapb.SegmentLevel + indexed bool + }{ + {100, commonpb.SegmentState_Growing, datapb.SegmentLevel_L1, false}, + {200, commonpb.SegmentState_Flushing, datapb.SegmentLevel_L1, false}, + {300, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L1, false}, + {400, commonpb.SegmentState_Dropped, datapb.SegmentLevel_L1, true}, + {401, commonpb.SegmentState_Dropped, datapb.SegmentLevel_L1, true}, + {402, commonpb.SegmentState_Dropped, datapb.SegmentLevel_L1, true}, + {403, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L1, true}, + {404, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L1, false}, + // (400[indexed], 401[indexed], 402(indexed) -> 403(indexed), 404(no index)) + {500, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L2, true}, + {600, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L2, false}, + {700, commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, false}, + {800, commonpb.SegmentState_Dropped, datapb.SegmentLevel_L1, false}, + } + + compactFroms := []int64{400, 401, 402} + compactTos := []int64{403, 404} + + for _, arg := range segArgs { + seg := NewSegmentInfo(&datapb.SegmentInfo{ + ID: arg.segID, + CollectionID: 1, + InsertChannel: channel, + State: arg.state, + Level: arg.level, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: channel, + MsgID: []byte{1, 2, 3}, + }, + NumOfRows: 2048, + }) + + if lo.Contains(compactTos, arg.segID) { + seg.CompactionFrom = compactFroms + } + err := svr.meta.AddSegment(context.TODO(), seg) + require.NoError(t, err) + + if arg.indexed { + err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: arg.segID, + BuildID: arg.segID, + IndexID: 1, + }) + assert.NoError(t, err) + err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ + BuildID: arg.segID, + State: commonpb.IndexState_Finished, + }) + assert.NoError(t, err) + } + } + + info := svr.handler.GetQueryVChanPositions(&channelMeta{Name: channel, CollectionID: 1}, -1) + + totalSegs := len(info.GetLevelZeroSegmentIds()) + + len(info.GetUnflushedSegmentIds()) + + len(info.GetFlushedSegmentIds()) + + len(info.GetDroppedSegmentIds()) + assert.EqualValues(t, 1, info.CollectionID) + assert.EqualValues(t, len(segArgs)-2, totalSegs) + assert.ElementsMatch(t, []int64{700}, info.GetLevelZeroSegmentIds()) + assert.ElementsMatch(t, []int64{100}, info.GetUnflushedSegmentIds()) + assert.ElementsMatch(t, []int64{200, 300, 400, 401, 402, 500, 600}, info.GetFlushedSegmentIds()) + assert.ElementsMatch(t, []int64{800}, info.GetDroppedSegmentIds()) + + assert.Empty(t, info.GetUnflushedSegments()) + assert.Empty(t, info.GetFlushedSegments()) + assert.Empty(t, info.GetDroppedSegments()) + assert.Empty(t, info.GetIndexedSegments()) + assert.Empty(t, info.GetIndexedSegmentIds()) +} + +func TestGetQueryVChanPositions(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, + }) + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + + s1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 0, + }, + NumOfRows: 2048, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) + assert.NoError(t, err) + err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: 1, + BuildID: 1, + IndexID: 1, + }) + assert.NoError(t, err) + err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ + BuildID: 1, + State: commonpb.IndexState_Finished, + }) + assert.NoError(t, err) + s2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) + assert.NoError(t, err) + s3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + MsgGroup: "", + Timestamp: 2, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) + assert.NoError(t, err) + + s4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: common.AllPartitionsID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + StartPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + MsgGroup: "", + Timestamp: 2, + }, + Level: datapb.SegmentLevel_L0, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s4)) + assert.NoError(t, err) + + t.Run("get unexisted channel", func(t *testing.T) { + vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}) + assert.Empty(t, vchan.UnflushedSegmentIds) + assert.Empty(t, vchan.FlushedSegmentIds) + }) + + t.Run("empty collection", func(t *testing.T) { + infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}) + assert.EqualValues(t, 1, infos.CollectionID) + assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) + assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) + assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds())) + }) + + t.Run("filter partition", func(t *testing.T) { + infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, 1, len(infos.GetLevelZeroSegmentIds())) + }) + + t.Run("empty collection with passed positions", func(t *testing.T) { + vchannel := "ch_no_segment_1" + pchannel := funcutil.ToPhysicalChannel(vchannel) + infos := svr.handler.GetQueryVChanPositions(&channelMeta{ + Name: vchannel, + CollectionID: 0, + StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, + }) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, vchannel, infos.ChannelName) + assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds())) + }) +} + +func TestGetQueryVChanPositions_PartitionStats(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + collectionID := int64(0) + partitionID := int64(1) + vchannel := "test_vchannel" + version := int64(100) + svr.meta.AddCollection(&collectionInfo{ + ID: collectionID, + Schema: schema, + }) + svr.meta.partitionStatsMeta.partitionStatsInfos = map[string]map[int64]*partitionStatsInfo{ + vchannel: { + partitionID: { + currentVersion: version, + infos: map[int64]*datapb.PartitionStatsInfo{ + version: {Version: version}, + }, + }, + }, + } + partitionIDs := make([]UniqueID, 0) + partitionIDs = append(partitionIDs, partitionID) + vChannelInfo := svr.handler.GetQueryVChanPositions(&channelMeta{Name: vchannel, CollectionID: collectionID}, partitionIDs...) + statsVersions := vChannelInfo.GetPartitionStatsVersions() + assert.Equal(t, 1, len(statsVersions)) + assert.Equal(t, int64(100), statsVersions[partitionID]) +} + +func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { + t.Run("ab GC-ed, cde unIndexed", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) + assert.NoError(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) + assert.NoError(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + NumOfRows: 2048, + } + + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) + assert.NoError(t, err) + // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) + // assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) + // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) + // assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d + }) + + t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + a := &datapb.SegmentInfo{ + ID: 99, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(a)) + assert.NoError(t, err) + + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) + assert.NoError(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) + assert.NoError(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + NumOfRows: 2048, + } + + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) + assert.NoError(t, err) + // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) + // assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) + // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) + // assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d + }) + + t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + c := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) + assert.NoError(t, err) + d := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) + assert.NoError(t, err) + err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: 2, + BuildID: 1, + IndexID: 1, + }) + assert.NoError(t, err) + err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ + BuildID: 1, + State: commonpb.IndexState_Finished, + }) + assert.NoError(t, err) + e := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + CompactionFrom: []int64{1, 2}, // c, d + NumOfRows: 2048, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) + assert.NoError(t, err) + err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ + SegmentID: 3, + BuildID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ + BuildID: 2, + State: commonpb.IndexState_Finished, + }) + assert.NoError(t, err) + + // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) + // assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) + // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) + // assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e + }) +} + +func TestShouldDropChannel(t *testing.T) { + type myRootCoord struct { + mocks2.MockRootCoordClient + } + myRoot := &myRootCoord{} + myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ + Status: merr.Success(), + Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), + Count: 1, + }, nil) + + myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)), + Count: 1, + }, nil) + + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, + }) + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + + t.Run("channel name not in kv ", func(t *testing.T) { + assert.False(t, svr.handler.CheckShouldDropChannel("ch99")) + }) + + t.Run("channel in remove flag", func(t *testing.T) { + err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1") + require.NoError(t, err) + assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) + }) +} + +func TestGetDataVChanPositions(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, + }) + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + + s1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + }, + } + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) + require.Nil(t, err) + s2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + }, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) + require.Nil(t, err) + s3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + }, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + Timestamp: 2, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) + require.Nil(t, err) + + t.Run("get unexisted channel", func(t *testing.T) { + vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}, allPartitionID) + assert.Empty(t, vchan.UnflushedSegmentIds) + assert.Empty(t, vchan.FlushedSegmentIds) + }) + + t.Run("get existed channel", func(t *testing.T) { + vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) + assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) + assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) + assert.ElementsMatch(t, []int64{s2.ID, s3.ID}, vchan.UnflushedSegmentIds) + }) + + t.Run("empty collection", func(t *testing.T) { + infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) + assert.EqualValues(t, 1, infos.CollectionID) + assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) + assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) + }) + + t.Run("filter partition", func(t *testing.T) { + infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) + assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) + }) + + t.Run("empty collection with passed positions", func(t *testing.T) { + vchannel := "ch_no_segment_1" + pchannel := funcutil.ToPhysicalChannel(vchannel) + infos := svr.handler.GetDataVChanPositions(&channelMeta{ + Name: vchannel, + CollectionID: 0, + StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, + }, allPartitionID) + assert.EqualValues(t, 0, infos.CollectionID) + assert.EqualValues(t, vchannel, infos.ChannelName) + }) +} diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index e9a20e3415..a9ac2dc671 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -166,7 +166,7 @@ func (s *SegmentsInfo) GetRealSegmentsForChannel(channel string) []*SegmentInfo // GetCompactionTo returns the segment that the provided segment is compacted to. // Return (nil, false) if given segmentID can not found in the meta. -// Return (nil, true) if given segmentID can be found not no compaction to. +// Return (nil, true) if given segmentID can be found with no compaction to. // Return (notnil, true) if given segmentID can be found and has compaction to. func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) ([]*SegmentInfo, bool) { if _, ok := s.segments[fromSegmentID]; !ok { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f9c4d55d23..00f10e9480 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -49,7 +49,6 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/workerpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" @@ -57,7 +56,6 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -1140,637 +1138,6 @@ func TestGetChannelSeekPosition(t *testing.T) { } } -func TestGetDataVChanPositions(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch1", - Data: []byte{8, 9, 10}, - }, - }, - }) - svr.meta.AddCollection(&collectionInfo{ - ID: 1, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch0", - Data: []byte{8, 9, 10}, - }, - }, - }) - - s1 := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - }, - } - err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) - require.Nil(t, err) - s2 := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - StartPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) - require.Nil(t, err) - s3 := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - StartPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{11, 12, 13}, - Timestamp: 2, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) - require.Nil(t, err) - - t.Run("get unexisted channel", func(t *testing.T) { - vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}, allPartitionID) - assert.Empty(t, vchan.UnflushedSegmentIds) - assert.Empty(t, vchan.FlushedSegmentIds) - }) - - t.Run("get existed channel", func(t *testing.T) { - vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID) - assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) - assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) - assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) - assert.ElementsMatch(t, []int64{s2.ID, s3.ID}, vchan.UnflushedSegmentIds) - }) - - t.Run("empty collection", func(t *testing.T) { - infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) - assert.EqualValues(t, 1, infos.CollectionID) - assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) - assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) - }) - - t.Run("filter partition", func(t *testing.T) { - infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1) - assert.EqualValues(t, 0, infos.CollectionID) - assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) - assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) - }) - - t.Run("empty collection with passed positions", func(t *testing.T) { - vchannel := "ch_no_segment_1" - pchannel := funcutil.ToPhysicalChannel(vchannel) - infos := svr.handler.GetDataVChanPositions(&channelMeta{ - Name: vchannel, - CollectionID: 0, - StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, - }, allPartitionID) - assert.EqualValues(t, 0, infos.CollectionID) - assert.EqualValues(t, vchannel, infos.ChannelName) - }) -} - -func TestGetQueryVChanPositions(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch1", - Data: []byte{8, 9, 10}, - }, - }, - }) - svr.meta.AddCollection(&collectionInfo{ - ID: 1, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch0", - Data: []byte{8, 9, 10}, - }, - }, - }) - - err := svr.meta.indexMeta.CreateIndex(&model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 1, - }) - assert.NoError(t, err) - - s1 := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 2048, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) - assert.NoError(t, err) - err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ - SegmentID: 1, - BuildID: 1, - IndexID: 1, - }) - assert.NoError(t, err) - err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ - BuildID: 1, - State: commonpb.IndexState_Finished, - }) - assert.NoError(t, err) - s2 := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - StartPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) - assert.NoError(t, err) - s3 := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - StartPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{11, 12, 13}, - MsgGroup: "", - Timestamp: 2, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) - assert.NoError(t, err) - - s4 := &datapb.SegmentInfo{ - ID: 4, - CollectionID: 0, - PartitionID: common.AllPartitionsID, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - StartPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{11, 12, 13}, - MsgGroup: "", - Timestamp: 2, - }, - Level: datapb.SegmentLevel_L0, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s4)) - assert.NoError(t, err) - //mockResp := &indexpb.GetIndexInfoResponse{ - // Status: &commonpb.Status{}, - // SegmentInfo: map[int64]*indexpb.SegmentInfo{ - // s1.ID: { - // CollectionID: s1.CollectionID, - // SegmentID: s1.ID, - // EnableIndex: true, - // IndexInfos: []*indexpb.IndexFilePathInfo{ - // { - // SegmentID: s1.ID, - // FieldID: 2, - // }, - // }, - // }, - // }, - //} - - t.Run("get unexisted channel", func(t *testing.T) { - vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}) - assert.Empty(t, vchan.UnflushedSegmentIds) - assert.Empty(t, vchan.FlushedSegmentIds) - }) - - // t.Run("get existed channel", func(t *testing.T) { - // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - // assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) - // assert.ElementsMatch(t, []int64{1}, vchan.FlushedSegmentIds) - // assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) - // assert.EqualValues(t, 1, len(vchan.GetLevelZeroSegmentIds())) - // }) - - t.Run("empty collection", func(t *testing.T) { - infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}) - assert.EqualValues(t, 1, infos.CollectionID) - assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) - assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) - assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds())) - }) - - t.Run("filter partition", func(t *testing.T) { - infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1) - assert.EqualValues(t, 0, infos.CollectionID) - // assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) - // assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) - assert.EqualValues(t, 1, len(infos.GetLevelZeroSegmentIds())) - }) - - t.Run("empty collection with passed positions", func(t *testing.T) { - vchannel := "ch_no_segment_1" - pchannel := funcutil.ToPhysicalChannel(vchannel) - infos := svr.handler.GetQueryVChanPositions(&channelMeta{ - Name: vchannel, - CollectionID: 0, - StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, - }) - assert.EqualValues(t, 0, infos.CollectionID) - assert.EqualValues(t, vchannel, infos.ChannelName) - assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds())) - }) -} - -func TestGetQueryVChanPositions_PartitionStats(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - collectionID := int64(0) - partitionID := int64(1) - vchannel := "test_vchannel" - version := int64(100) - svr.meta.AddCollection(&collectionInfo{ - ID: collectionID, - Schema: schema, - }) - svr.meta.partitionStatsMeta.partitionStatsInfos = map[string]map[int64]*partitionStatsInfo{ - vchannel: { - partitionID: { - currentVersion: version, - infos: map[int64]*datapb.PartitionStatsInfo{ - version: {Version: version}, - }, - }, - }, - } - partitionIDs := make([]UniqueID, 0) - partitionIDs = append(partitionIDs, partitionID) - vChannelInfo := svr.handler.GetQueryVChanPositions(&channelMeta{Name: vchannel, CollectionID: collectionID}, partitionIDs...) - statsVersions := vChannelInfo.GetPartitionStatsVersions() - assert.Equal(t, 1, len(statsVersions)) - assert.Equal(t, int64(100), statsVersions[partitionID]) -} - -func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { - t.Run("ab GC-ed, cde unIndexed", func(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - }) - err := svr.meta.indexMeta.CreateIndex(&model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 1, - }) - assert.NoError(t, err) - c := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) - assert.NoError(t, err) - d := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) - assert.NoError(t, err) - e := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{1, 2}, // c, d - NumOfRows: 2048, - } - - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) - assert.NoError(t, err) - // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - // assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) - // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) - // assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d - }) - - t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - }) - err := svr.meta.indexMeta.CreateIndex(&model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 1, - }) - assert.NoError(t, err) - a := &datapb.SegmentInfo{ - ID: 99, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(a)) - assert.NoError(t, err) - - c := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) - assert.NoError(t, err) - d := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) - assert.NoError(t, err) - e := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{1, 2}, // c, d - NumOfRows: 2048, - } - - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) - assert.NoError(t, err) - // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - // assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) - // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) - // assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d - }) - - t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) { - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - }) - err := svr.meta.indexMeta.CreateIndex(&model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 1, - }) - assert.NoError(t, err) - c := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) - assert.NoError(t, err) - d := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) - assert.NoError(t, err) - err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ - SegmentID: 2, - BuildID: 1, - IndexID: 1, - }) - assert.NoError(t, err) - err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ - BuildID: 1, - State: commonpb.IndexState_Finished, - }) - assert.NoError(t, err) - e := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - CompactionFrom: []int64{1, 2}, // c, d - NumOfRows: 2048, - } - err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) - assert.NoError(t, err) - err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{ - SegmentID: 3, - BuildID: 2, - IndexID: 1, - }) - assert.NoError(t, err) - err = svr.meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{ - BuildID: 2, - State: commonpb.IndexState_Finished, - }) - assert.NoError(t, err) - - // vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - // assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) - // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) - // assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e - }) -} - -func TestShouldDropChannel(t *testing.T) { - type myRootCoord struct { - mocks.MockRootCoordClient - } - myRoot := &myRootCoord{} - myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ - Status: merr.Success(), - Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), - Count: 1, - }, nil) - - myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ - Status: merr.Success(), - ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)), - Count: 1, - }, nil) - - svr := newTestServer(t) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch1", - Data: []byte{8, 9, 10}, - }, - }, - }) - svr.meta.AddCollection(&collectionInfo{ - ID: 1, - Schema: schema, - StartPositions: []*commonpb.KeyDataPair{ - { - Key: "ch0", - Data: []byte{8, 9, 10}, - }, - }, - }) - - t.Run("channel name not in kv ", func(t *testing.T) { - assert.False(t, svr.handler.CheckShouldDropChannel("ch99")) - }) - - t.Run("channel in remove flag", func(t *testing.T) { - err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1") - require.NoError(t, err) - assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) - }) -} - func TestGetRecoveryInfo(t *testing.T) { t.Run("test get recovery info with no segments", func(t *testing.T) { svr := newTestServer(t) @@ -2372,9 +1739,6 @@ func TestManualCompaction(t *testing.T) { }, }, } - // mockMeta =: - // mockHandler := newCompactionPlanHandler(nil, nil, nil, mockMeta, nil) - // svr.compactionHandler = mockHandler resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{ CollectionID: 1, Timetravel: 1, diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 52b1f3e3a3..4f960da240 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -284,8 +284,8 @@ message VchannelInfo { repeated int64 unflushedSegmentIds = 7; repeated int64 flushedSegmentIds = 8; repeated int64 dropped_segmentIds = 9; - repeated int64 indexed_segmentIds = 10; - repeated SegmentInfo indexed_segments = 11; + repeated int64 indexed_segmentIds = 10; // deprecated, keep it for compatibility + repeated SegmentInfo indexed_segments = 11; // deprecated, keep it for compatibility repeated int64 level_zero_segment_ids = 12; map partition_stats_versions = 13; }