diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 635a62f327..03fff17b6a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -50,6 +50,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -61,7 +63,6 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" - v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" ) // make sure QueryNode implements types.QueryNode @@ -376,6 +377,8 @@ func (node *QueryNode) handleSealedSegmentsChangeInfo(info *querypb.SealedSegmen log.Warn("failed to validate vchannel for SegmentChangeInfo", zap.Error(err)) continue } + // ignore segments that are online and offline in the same QueryNode + filterDuplicateChangeInfo(line) node.ShardClusterService.HandoffVChannelSegments(vchannel, line) } @@ -407,3 +410,41 @@ func validateChangeChannel(info *querypb.SegmentChangeInfo) (string, error) { return channelName, nil } + +// filterDuplicateChangeInfo filters out duplicated sealed segments which are both online and offline (Fix issue#17347) +func filterDuplicateChangeInfo(line *querypb.SegmentChangeInfo) { + if line.OnlineNodeID == line.OfflineNodeID { + dupSegmentIDs := make(map[UniqueID]struct{}) + for _, onlineSegment := range line.OnlineSegments { + for _, offlineSegment := range line.OfflineSegments { + if onlineSegment.SegmentID == offlineSegment.SegmentID && onlineSegment.SegmentState == segmentTypeSealed && offlineSegment.SegmentState == segmentTypeSealed { + dupSegmentIDs[onlineSegment.SegmentID] = struct{}{} + } + } + } + if len(dupSegmentIDs) == 0 { + return + } + + var dupSegmentIDsList []UniqueID + for sid := range dupSegmentIDs { + dupSegmentIDsList = append(dupSegmentIDsList, sid) + } + log.Warn("Found sealed segments are that are online and offline.", zap.Int64s("SegmentIDs", dupSegmentIDsList)) + + var filteredOnlineSegments []*querypb.SegmentInfo + for _, onlineSegment := range line.OnlineSegments { + if _, ok := dupSegmentIDs[onlineSegment.SegmentID]; !ok { + filteredOnlineSegments = append(filteredOnlineSegments, onlineSegment) + } + } + line.OnlineSegments = filteredOnlineSegments + var filteredOfflineSegments []*querypb.SegmentInfo + for _, offlineSegment := range line.OfflineSegments { + if _, ok := dupSegmentIDs[offlineSegment.SegmentID]; !ok { + filteredOfflineSegments = append(filteredOfflineSegments, offlineSegment) + } + } + line.OfflineSegments = filteredOfflineSegments + } +} diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 50e62fcb65..419d79250e 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -413,6 +413,74 @@ func TestQueryNode_validateChangeChannel(t *testing.T) { } } +func TestQueryNode_filterDuplicateChangeInfo(t *testing.T) { + t.Run("dup change info", func(t *testing.T) { + info := &querypb.SegmentChangeInfo{ + OnlineNodeID: 233, + OnlineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + OfflineNodeID: 233, + OfflineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + } + filterDuplicateChangeInfo(info) + assert.Equal(t, 0, len(info.OnlineSegments)) + assert.Equal(t, 0, len(info.OfflineSegments)) + }) + + t.Run("normal change info1", func(t *testing.T) { + info := &querypb.SegmentChangeInfo{ + OnlineNodeID: 233, + OnlineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + OfflineNodeID: 234, + OfflineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + } + filterDuplicateChangeInfo(info) + assert.Equal(t, 1, len(info.OnlineSegments)) + assert.Equal(t, 1, len(info.OfflineSegments)) + }) + + t.Run("normal change info2", func(t *testing.T) { + info := &querypb.SegmentChangeInfo{ + OnlineNodeID: 233, + OnlineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + OfflineNodeID: 234, + OfflineSegments: []*querypb.SegmentInfo{ + { + SegmentID: 23333, + SegmentState: segmentTypeSealed, + }, + }, + } + filterDuplicateChangeInfo(info) + assert.Equal(t, 1, len(info.OnlineSegments)) + assert.Equal(t, 1, len(info.OfflineSegments)) + }) +} + func TestQueryNode_handleSealedSegmentsChangeInfo(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()