diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 38ffb16306..66b7c4bfc3 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -132,11 +132,13 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { segments := h.s.meta.GetSegmentsByChannel(channel) for _, segment := range segments { - if segment.GetStartPosition() != nil && // fitler empty segment + if segment.GetStartPosition() != nil && // filter empty segment // FIXME: we filter compaction generated segments // because datanode may not know the segment due to the network lag or // datacoord crash when handling CompleteCompaction. - len(segment.CompactionFrom) == 0 && + // FIXME: cancel this limitation for #12265 + // need to change a unified DropAndFlush to solve the root problem + //len(segment.CompactionFrom) == 0 && segment.GetState() != commonpb.SegmentState_Dropped { return false } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index a4aab25b73..78da195e4a 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -1241,6 +1242,136 @@ func TestGetVChannelPos(t *testing.T) { }) } +func TestShouldDropChannel(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 0, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, + }) + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + + s1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + Timestamp: 0, + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 0, + }, + } + s2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + CompactionFrom: []int64{4, 5}, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + s3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + MsgGroup: "", + Timestamp: 2, + }, + } + s4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + } + + t.Run("channel without segments", func(t *testing.T) { + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + + }) + + t.Run("channel with all dropped segments", func(t *testing.T) { + err := svr.meta.AddSegment(NewSegmentInfo(s1)) + require.NoError(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + }) + + t.Run("channel with all dropped segments and compacted segments", func(t *testing.T) { + err := svr.meta.AddSegment(NewSegmentInfo(s2)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + }) + + t.Run("channel with other state segments", func(t *testing.T) { + err := svr.meta.AddSegment(NewSegmentInfo(s3)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.False(t, r) + }) + + t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) { + err := svr.meta.DropSegment(3) + require.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(s4)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + }) +} + func TestGetRecoveryInfo(t *testing.T) { t.Run("test get recovery info with no segments", func(t *testing.T) {