diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 4f36fd9d31..7ab7eb4893 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -165,6 +165,9 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica) leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica) for shard, leaderID := range leaders { lview := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard) + if lview == nil { + continue + } // find growing segments from leaderview's sealed segments // because growing segments should be released only after loading the compaction created segment successfully. for sid := range lview.Segments { @@ -172,6 +175,7 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica) if segment == nil { continue } + sources := append(segment.GetCompactionFrom(), segment.GetID()) for _, source := range sources { if lview.GrowingSegments.Contain(source) { diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 4acb0bbca3..7fc6ab02b5 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -94,7 +94,9 @@ func (m *ChannelDistManager) GetShardLeadersByReplica(replica *Replica) map[stri for node := range replica.Nodes { channels := m.channels[node] for _, dmc := range channels { - ret[dmc.GetChannelName()] = node + if dmc.GetCollectionID() == replica.GetCollectionID() { + ret[dmc.GetChannelName()] = node + } } } return ret diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index 62359febca..306e090ace 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/suite" ) @@ -83,8 +84,18 @@ func (suite *ChannelDistManagerSuite) TestGetBy() { func (suite *ChannelDistManagerSuite) TestGetShardLeader() { replicas := []*Replica{ - {Nodes: typeutil.NewUniqueSet(suite.nodes[0], suite.nodes[2])}, - {Nodes: typeutil.NewUniqueSet(suite.nodes[1])}, + { + Replica: &querypb.Replica{ + CollectionID: suite.collection, + }, + Nodes: typeutil.NewUniqueSet(suite.nodes[0], suite.nodes[2]), + }, + { + Replica: &querypb.Replica{ + CollectionID: suite.collection, + }, + Nodes: typeutil.NewUniqueSet(suite.nodes[1]), + }, } // Test on replica 0 diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index b3ca6d911f..0c7a99eb73 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -1187,6 +1187,9 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get channelVersionInfos := make([]*querypb.ChannelVersionInfo, 0, len(shardClusters)) leaderViews := make([]*querypb.LeaderView, 0, len(shardClusters)) for _, sc := range shardClusters { + if !node.queryShardService.hasQueryShard(sc.vchannelName) { + continue + } segmentInfos := sc.GetSegmentInfos() mapping := make(map[int64]int64) for _, info := range segmentInfos { diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 938f5124cd..4c3213b23f 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -730,11 +730,16 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.String("scope", req.GetScope().String())) - if req.GetScope() != querypb.DataScope_Streaming { - offlineSegments := make(typeutil.UniqueSet) + offlineSegments := make(typeutil.UniqueSet) + if req.Scope != querypb.DataScope_Streaming { offlineSegments.Insert(req.GetSegmentIDs()...) + } + var lastVersionID int64 + var err error + func() { sc.mutVersion.Lock() + defer sc.mutVersion.Unlock() var allocations SegmentsStatus if sc.currentVersion != nil { @@ -750,7 +755,6 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas version := NewShardClusterVersion(versionID, allocations, sc.currentVersion) sc.versions.Store(versionID, version) - var lastVersionID int64 // currentVersion shall be not nil if sc.currentVersion != nil { // wait for last version search done @@ -760,10 +764,31 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas // set current version to new one sc.currentVersion = version - sc.mutVersion.Unlock() - sc.cleanupVersion(lastVersionID) - sc.mut.Lock() + // try to release segments from nodes + node, ok := sc.getNode(req.GetNodeID()) + if !ok { + log.Warn("node not in cluster", zap.Int64("nodeID", req.NodeID)) + err = fmt.Errorf("node %d not in cluster ", req.NodeID) + return + } + + resp, rerr := node.client.ReleaseSegments(ctx, req) + if err != nil { + log.Warn("failed to dispatch release segment request", zap.Error(err)) + err = rerr + return + } + if resp.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("follower release segment failed", zap.String("reason", resp.GetReason())) + err = fmt.Errorf("follower %d failed to release segment, reason %s", req.NodeID, resp.GetReason()) + } + }() + sc.cleanupVersion(lastVersionID) + + sc.mut.Lock() + // do not delete segment if data scope is streaming + if req.GetScope() != querypb.DataScope_Streaming { for _, segmentID := range req.SegmentIDs { info, ok := sc.segments[segmentID] if ok { @@ -773,26 +798,10 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas } } } - sc.mut.Unlock() } + sc.mut.Unlock() - // try to release segments from nodes - node, ok := sc.getNode(req.GetNodeID()) - if !ok { - log.Warn("node not in cluster", zap.Int64("nodeID", req.NodeID)) - return fmt.Errorf("node %d not in cluster ", req.NodeID) - } - - resp, err := node.client.ReleaseSegments(ctx, req) - if err != nil { - log.Warn("failed to dispatch release segment request", zap.Error(err)) - return err - } - if resp.GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("follower release segment failed", zap.String("reason", resp.GetReason())) - return fmt.Errorf("follower %d failed to release segment, reason %s", req.NodeID, resp.GetReason()) - } - return nil + return err } // appendHandoff adds the change info into pending list and returns the token. diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index c611202914..ed336b5415 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -1728,6 +1728,7 @@ class TestUtilityAdvanced(TestcaseBase): check_items={ct.err_code: 1, ct.err_msg: "must be in the same replica group"}) @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.skip(reason="querycoordv2") def test_handoff_query_search(self): """ target: test query search after handoff