mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: Skip update index for L0 segment (#34099)
try to update index for l0 segment, will failed by `index not found` This PR skip update index for l0 segment Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
0426390f06
commit
f7ecafe77d
@ -43,12 +43,12 @@ type BalanceChecker struct {
|
|||||||
nodeManager *session.NodeManager
|
nodeManager *session.NodeManager
|
||||||
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
|
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
|
||||||
scheduler task.Scheduler
|
scheduler task.Scheduler
|
||||||
targetMgr *meta.TargetManager
|
targetMgr meta.TargetManagerInterface
|
||||||
getBalancerFunc GetBalancerFunc
|
getBalancerFunc GetBalancerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBalanceChecker(meta *meta.Meta,
|
func NewBalanceChecker(meta *meta.Meta,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr meta.TargetManagerInterface,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
scheduler task.Scheduler,
|
scheduler task.Scheduler,
|
||||||
getBalancerFunc GetBalancerFunc,
|
getBalancerFunc GetBalancerFunc,
|
||||||
|
|||||||
@ -38,7 +38,7 @@ type ChannelChecker struct {
|
|||||||
*checkerActivation
|
*checkerActivation
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
targetMgr *meta.TargetManager
|
targetMgr meta.TargetManagerInterface
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
getBalancerFunc GetBalancerFunc
|
getBalancerFunc GetBalancerFunc
|
||||||
}
|
}
|
||||||
@ -46,7 +46,7 @@ type ChannelChecker struct {
|
|||||||
func NewChannelChecker(
|
func NewChannelChecker(
|
||||||
meta *meta.Meta,
|
meta *meta.Meta,
|
||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr meta.TargetManagerInterface,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
getBalancerFunc GetBalancerFunc,
|
getBalancerFunc GetBalancerFunc,
|
||||||
) *ChannelChecker {
|
) *ChannelChecker {
|
||||||
|
|||||||
@ -42,7 +42,7 @@ type CheckerController struct {
|
|||||||
manualCheckChs map[utils.CheckerType]chan struct{}
|
manualCheckChs map[utils.CheckerType]chan struct{}
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
targetMgr *meta.TargetManager
|
targetMgr meta.TargetManagerInterface
|
||||||
broker meta.Broker
|
broker meta.Broker
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
balancer balance.Balance
|
balancer balance.Balance
|
||||||
@ -56,7 +56,7 @@ type CheckerController struct {
|
|||||||
func NewCheckerController(
|
func NewCheckerController(
|
||||||
meta *meta.Meta,
|
meta *meta.Meta,
|
||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr meta.TargetManagerInterface,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
scheduler task.Scheduler,
|
scheduler task.Scheduler,
|
||||||
broker meta.Broker,
|
broker meta.Broker,
|
||||||
@ -68,7 +68,7 @@ func NewCheckerController(
|
|||||||
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
||||||
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
||||||
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc),
|
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc),
|
||||||
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
|
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr),
|
||||||
// todo temporary work around must fix
|
// todo temporary work around must fix
|
||||||
// utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),
|
// utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),
|
||||||
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
|
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
@ -43,6 +44,8 @@ type IndexChecker struct {
|
|||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
broker meta.Broker
|
broker meta.Broker
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
|
|
||||||
|
targetMgr meta.TargetManagerInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndexChecker(
|
func NewIndexChecker(
|
||||||
@ -50,6 +53,7 @@ func NewIndexChecker(
|
|||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
broker meta.Broker,
|
broker meta.Broker,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
|
targetMgr meta.TargetManagerInterface,
|
||||||
) *IndexChecker {
|
) *IndexChecker {
|
||||||
return &IndexChecker{
|
return &IndexChecker{
|
||||||
checkerActivation: newCheckerActivation(),
|
checkerActivation: newCheckerActivation(),
|
||||||
@ -57,6 +61,7 @@ func NewIndexChecker(
|
|||||||
dist: dist,
|
dist: dist,
|
||||||
broker: broker,
|
broker: broker,
|
||||||
nodeMgr: nodeMgr,
|
nodeMgr: nodeMgr,
|
||||||
|
targetMgr: targetMgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,6 +117,13 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
|
|||||||
if roNodeSet.Contain(segment.Node) {
|
if roNodeSet.Contain(segment.Node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// skip update index for l0 segment
|
||||||
|
segmentInTarget := c.targetMgr.GetSealedSegment(collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst)
|
||||||
|
if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
missing := c.checkSegment(segment, indexInfos)
|
missing := c.checkSegment(segment, indexInfos)
|
||||||
if len(missing) > 0 {
|
if len(missing) > 0 {
|
||||||
targets[segment.GetID()] = missing
|
targets[segment.GetID()] = missing
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import (
|
|||||||
|
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
@ -40,11 +41,12 @@ import (
|
|||||||
|
|
||||||
type IndexCheckerSuite struct {
|
type IndexCheckerSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
kv kv.MetaKv
|
kv kv.MetaKv
|
||||||
checker *IndexChecker
|
checker *IndexChecker
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
broker *meta.MockBroker
|
broker *meta.MockBroker
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
|
targetMgr *meta.MockTargetManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *IndexCheckerSuite) SetupSuite() {
|
func (suite *IndexCheckerSuite) SetupSuite() {
|
||||||
@ -73,7 +75,15 @@ func (suite *IndexCheckerSuite) SetupTest() {
|
|||||||
distManager := meta.NewDistributionManager()
|
distManager := meta.NewDistributionManager()
|
||||||
suite.broker = meta.NewMockBroker(suite.T())
|
suite.broker = meta.NewMockBroker(suite.T())
|
||||||
|
|
||||||
suite.checker = NewIndexChecker(suite.meta, distManager, suite.broker, suite.nodeMgr)
|
suite.targetMgr = meta.NewMockTargetManager(suite.T())
|
||||||
|
suite.checker = NewIndexChecker(suite.meta, distManager, suite.broker, suite.nodeMgr, suite.targetMgr)
|
||||||
|
|
||||||
|
suite.targetMgr.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(cid, sid int64, i3 int32) *datapb.SegmentInfo {
|
||||||
|
return &datapb.SegmentInfo{
|
||||||
|
ID: sid,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
}
|
||||||
|
}).Maybe()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *IndexCheckerSuite) TearDownTest() {
|
func (suite *IndexCheckerSuite) TearDownTest() {
|
||||||
|
|||||||
@ -37,14 +37,14 @@ type LeaderChecker struct {
|
|||||||
*checkerActivation
|
*checkerActivation
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
target *meta.TargetManager
|
target meta.TargetManagerInterface
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLeaderChecker(
|
func NewLeaderChecker(
|
||||||
meta *meta.Meta,
|
meta *meta.Meta,
|
||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
target *meta.TargetManager,
|
target meta.TargetManagerInterface,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
) *LeaderChecker {
|
) *LeaderChecker {
|
||||||
return &LeaderChecker{
|
return &LeaderChecker{
|
||||||
|
|||||||
@ -43,7 +43,7 @@ type SegmentChecker struct {
|
|||||||
*checkerActivation
|
*checkerActivation
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
targetMgr *meta.TargetManager
|
targetMgr meta.TargetManagerInterface
|
||||||
nodeMgr *session.NodeManager
|
nodeMgr *session.NodeManager
|
||||||
getBalancerFunc GetBalancerFunc
|
getBalancerFunc GetBalancerFunc
|
||||||
}
|
}
|
||||||
@ -51,7 +51,7 @@ type SegmentChecker struct {
|
|||||||
func NewSegmentChecker(
|
func NewSegmentChecker(
|
||||||
meta *meta.Meta,
|
meta *meta.Meta,
|
||||||
dist *meta.DistributionManager,
|
dist *meta.DistributionManager,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr meta.TargetManagerInterface,
|
||||||
nodeMgr *session.NodeManager,
|
nodeMgr *session.NodeManager,
|
||||||
getBalancerFunc GetBalancerFunc,
|
getBalancerFunc GetBalancerFunc,
|
||||||
) *SegmentChecker {
|
) *SegmentChecker {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user