Add compacted segment into check list when drop collection (#12272)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-25 15:13:15 +08:00 committed by GitHub
parent d2dae89f4c
commit 2ee0c7c074
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 2 deletions

View File

@ -132,11 +132,13 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
segments := h.s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // fitler empty segment
if segment.GetStartPosition() != nil && // filter empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
len(segment.CompactionFrom) == 0 &&
// FIXME: cancel this limitation for #12265
// need to change a unified DropAndFlush to solve the root problem
//len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -1241,6 +1242,136 @@ func TestGetVChannelPos(t *testing.T) {
})
}
func TestShouldDropChannel(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: 0,
Schema: schema,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch1",
Data: []byte{8, 9, 10},
},
},
})
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: 1,
Schema: schema,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch0",
Data: []byte{8, 9, 10},
},
},
})
s1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
Timestamp: 0,
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
}
s2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
CompactionFrom: []int64{4, 5},
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
s4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}
t.Run("channel without segments", func(t *testing.T) {
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
t.Run("channel with all dropped segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s1))
require.NoError(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
t.Run("channel with all dropped segments and compacted segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s2))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
t.Run("channel with other state segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s3))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.False(t, r)
})
t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) {
err := svr.meta.DropSegment(3)
require.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(s4))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
}
func TestGetRecoveryInfo(t *testing.T) {
t.Run("test get recovery info with no segments", func(t *testing.T) {