mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix querynode data distribution return false positive dm channel (#19249)
Co-authored-by: yah01 <yang.cen@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Co-authored-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
5143e2a75e
commit
50ea4eeef1
@ -165,6 +165,9 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica)
|
||||
leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
|
||||
for shard, leaderID := range leaders {
|
||||
lview := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard)
|
||||
if lview == nil {
|
||||
continue
|
||||
}
|
||||
// find growing segments from leaderview's sealed segments
|
||||
// because growing segments should be released only after loading the compaction created segment successfully.
|
||||
for sid := range lview.Segments {
|
||||
@ -172,6 +175,7 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica)
|
||||
if segment == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
sources := append(segment.GetCompactionFrom(), segment.GetID())
|
||||
for _, source := range sources {
|
||||
if lview.GrowingSegments.Contain(source) {
|
||||
|
||||
@ -94,7 +94,9 @@ func (m *ChannelDistManager) GetShardLeadersByReplica(replica *Replica) map[stri
|
||||
for node := range replica.Nodes {
|
||||
channels := m.channels[node]
|
||||
for _, dmc := range channels {
|
||||
ret[dmc.GetChannelName()] = node
|
||||
if dmc.GetCollectionID() == replica.GetCollectionID() {
|
||||
ret[dmc.GetChannelName()] = node
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
@ -83,8 +84,18 @@ func (suite *ChannelDistManagerSuite) TestGetBy() {
|
||||
|
||||
func (suite *ChannelDistManagerSuite) TestGetShardLeader() {
|
||||
replicas := []*Replica{
|
||||
{Nodes: typeutil.NewUniqueSet(suite.nodes[0], suite.nodes[2])},
|
||||
{Nodes: typeutil.NewUniqueSet(suite.nodes[1])},
|
||||
{
|
||||
Replica: &querypb.Replica{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
Nodes: typeutil.NewUniqueSet(suite.nodes[0], suite.nodes[2]),
|
||||
},
|
||||
{
|
||||
Replica: &querypb.Replica{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
Nodes: typeutil.NewUniqueSet(suite.nodes[1]),
|
||||
},
|
||||
}
|
||||
|
||||
// Test on replica 0
|
||||
|
||||
@ -1187,6 +1187,9 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
|
||||
channelVersionInfos := make([]*querypb.ChannelVersionInfo, 0, len(shardClusters))
|
||||
leaderViews := make([]*querypb.LeaderView, 0, len(shardClusters))
|
||||
for _, sc := range shardClusters {
|
||||
if !node.queryShardService.hasQueryShard(sc.vchannelName) {
|
||||
continue
|
||||
}
|
||||
segmentInfos := sc.GetSegmentInfos()
|
||||
mapping := make(map[int64]int64)
|
||||
for _, info := range segmentInfos {
|
||||
|
||||
@ -730,11 +730,16 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.String("scope", req.GetScope().String()))
|
||||
|
||||
if req.GetScope() != querypb.DataScope_Streaming {
|
||||
offlineSegments := make(typeutil.UniqueSet)
|
||||
offlineSegments := make(typeutil.UniqueSet)
|
||||
if req.Scope != querypb.DataScope_Streaming {
|
||||
offlineSegments.Insert(req.GetSegmentIDs()...)
|
||||
}
|
||||
|
||||
var lastVersionID int64
|
||||
var err error
|
||||
func() {
|
||||
sc.mutVersion.Lock()
|
||||
defer sc.mutVersion.Unlock()
|
||||
|
||||
var allocations SegmentsStatus
|
||||
if sc.currentVersion != nil {
|
||||
@ -750,7 +755,6 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
|
||||
version := NewShardClusterVersion(versionID, allocations, sc.currentVersion)
|
||||
sc.versions.Store(versionID, version)
|
||||
|
||||
var lastVersionID int64
|
||||
// currentVersion shall be not nil
|
||||
if sc.currentVersion != nil {
|
||||
// wait for last version search done
|
||||
@ -760,10 +764,31 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
|
||||
|
||||
// set current version to new one
|
||||
sc.currentVersion = version
|
||||
sc.mutVersion.Unlock()
|
||||
sc.cleanupVersion(lastVersionID)
|
||||
|
||||
sc.mut.Lock()
|
||||
// try to release segments from nodes
|
||||
node, ok := sc.getNode(req.GetNodeID())
|
||||
if !ok {
|
||||
log.Warn("node not in cluster", zap.Int64("nodeID", req.NodeID))
|
||||
err = fmt.Errorf("node %d not in cluster ", req.NodeID)
|
||||
return
|
||||
}
|
||||
|
||||
resp, rerr := node.client.ReleaseSegments(ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("failed to dispatch release segment request", zap.Error(err))
|
||||
err = rerr
|
||||
return
|
||||
}
|
||||
if resp.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("follower release segment failed", zap.String("reason", resp.GetReason()))
|
||||
err = fmt.Errorf("follower %d failed to release segment, reason %s", req.NodeID, resp.GetReason())
|
||||
}
|
||||
}()
|
||||
sc.cleanupVersion(lastVersionID)
|
||||
|
||||
sc.mut.Lock()
|
||||
// do not delete segment if data scope is streaming
|
||||
if req.GetScope() != querypb.DataScope_Streaming {
|
||||
for _, segmentID := range req.SegmentIDs {
|
||||
info, ok := sc.segments[segmentID]
|
||||
if ok {
|
||||
@ -773,26 +798,10 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
|
||||
}
|
||||
}
|
||||
}
|
||||
sc.mut.Unlock()
|
||||
}
|
||||
sc.mut.Unlock()
|
||||
|
||||
// try to release segments from nodes
|
||||
node, ok := sc.getNode(req.GetNodeID())
|
||||
if !ok {
|
||||
log.Warn("node not in cluster", zap.Int64("nodeID", req.NodeID))
|
||||
return fmt.Errorf("node %d not in cluster ", req.NodeID)
|
||||
}
|
||||
|
||||
resp, err := node.client.ReleaseSegments(ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("failed to dispatch release segment request", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("follower release segment failed", zap.String("reason", resp.GetReason()))
|
||||
return fmt.Errorf("follower %d failed to release segment, reason %s", req.NodeID, resp.GetReason())
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// appendHandoff adds the change info into pending list and returns the token.
|
||||
|
||||
@ -1728,6 +1728,7 @@ class TestUtilityAdvanced(TestcaseBase):
|
||||
check_items={ct.err_code: 1, ct.err_msg: "must be in the same replica group"})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.skip(reason="querycoordv2")
|
||||
def test_handoff_query_search(self):
|
||||
"""
|
||||
target: test query search after handoff
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user