enhance: Remove QueryCoord's scheduling of L0 segments (#39552)

issue: #39551
This PR remove querycoord's scheduling of l0 segments:
  - only load l0 segment when watch channel
- only release l0 segment when release channel or sync data distribution

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-02-26 21:38:00 +08:00 committed by GitHub
parent 75b74e9413
commit 69b8b89369
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 3498 additions and 3692 deletions

View File

@ -133,10 +133,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}
var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
deleteCheckPoint *msgpb.MsgPosition
)
// cannot use GetSegmentsByChannel since dropped segments are needed here
@ -171,6 +172,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
// use smallest start position of l0 segments as deleteCheckPoint, so query coord will only maintain stream delete record after this ts
if deleteCheckPoint == nil || s.GetStartPosition().GetTimestamp() < deleteCheckPoint.GetTimestamp() {
deleteCheckPoint = s.GetStartPosition()
}
default:
flushedIDs.Insert(s.GetID())
}

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -120,12 +119,6 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
continue
}
// skip update index for l0 segment
segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst)
if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
missing := c.checkSegment(segment, indexInfos)
if len(missing) > 0 {
targets[segment.GetID()] = missing

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
var _ Checker = (*LeaderChecker)(nil)
@ -169,10 +168,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
latestNodeDist := utils.FindMaxVersionSegments(dist)
for _, s := range latestNodeDist {
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
// shouldn't set l0 segment location to delegator. l0 segment should be reload in delegator
if !existInTarget || isL0Segment {
if segment == nil {
continue
}
@ -223,8 +219,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me
_, ok := distMap[sid]
segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, sid, meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if ok || existInTarget || isL0Segment {
if ok || existInTarget {
continue
}
log.Debug("leader checker append a segment to remove",

View File

@ -162,29 +162,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2)
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)
// mock l0 segment exist on non delegator node, doesn't set to leader view
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestActivation() {
@ -423,30 +400,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
// skip sync l0 segments
segments := []*datapb.SegmentInfo{
{
ID: 3,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(ctx, int64(1))
observer.target.UpdateCollectionCurrentTarget(ctx, 1)
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {

View File

@ -21,7 +21,6 @@ import (
"sort"
"time"
"github.com/blang/semver/v4"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -245,37 +244,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
distMap[s.GetID()] = s.Node
}
versionRangeFilter := semver.MustParseRange(">2.3.x")
checkLeaderVersion := func(leader *meta.LeaderView, segmentID int64) bool {
// if current shard leader's node version < 2.4, skip load L0 segment
info := c.nodeMgr.Get(leader.ID)
if info != nil && !versionRangeFilter(info.Version()) {
log.Warn("l0 segment is not supported in current node version, skip it",
zap.Int64("collection", replica.GetCollectionID()),
zap.Int64("segmentID", segmentID),
zap.String("channel", leader.Channel),
zap.Int64("leaderID", leader.ID),
zap.String("nodeVersion", info.Version().String()))
return false
}
return true
}
isSegmentLack := func(segment *datapb.SegmentInfo) bool {
node, existInDist := distMap[segment.ID]
if segment.GetLevel() == datapb.SegmentLevel_L0 {
// the L0 segments have to been in the same node as the channel watched
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
// if the leader node's version doesn't match load l0 segment's requirement, skip it
if leader != nil && checkLeaderVersion(leader, segment.ID) {
l0WithWrongLocation := node != leader.ID
return !existInDist || l0WithWrongLocation
}
return false
}
_, existInDist := distMap[segment.ID]
return !existInDist
}
@ -290,18 +260,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}
// l0 Segment which exist on current target, but not on dist
for _, segment := range currentTargetMap {
// to avoid generate duplicate segment task
if nextTargetMap[segment.ID] != nil {
continue
}
if isSegmentLack(segment) {
toLoad = append(toLoad, segment)
}
}
// get segment which exist on dist, but not on current target and next target
for _, segment := range dist {
_, existOnCurrent := currentTargetMap[segment.GetID()]
@ -313,16 +271,6 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
}
level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetLevel() == datapb.SegmentLevel_L0
})
// L0 segment found,
// QueryCoord loads the L0 segments first,
// to make sure all L0 delta logs will be delivered to the other segments.
if len(level0Segments) > 0 {
toLoad = level0Segments
}
return
}
@ -336,14 +284,6 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica))
versions := make(map[int64]*meta.Segment)
for _, s := range dist {
// l0 segment should be release with channel together
segment := c.targetMgr.GetSealedSegment(ctx, s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if isL0Segment {
continue
}
maxVer, ok := versions[s.GetID()]
if !ok {
versions[s.GetID()] = s
@ -414,7 +354,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
return nil
}
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
return s.GetInsertChannel()
})
@ -432,11 +371,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
rwNodes = replica.GetRWNodes()
}
// L0 segment can only be assign to shard leader's node
if isLevel0 {
rwNodes = []int64{leader.ID}
}
segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment {
return &meta.Segment{
SegmentInfo: s,

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
@ -170,170 +169,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.Len(tasks, 1)
}
func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
ctx := context.Background()
checker := suite.checker
// set meta
checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
Version: common.Version,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
Version: common.Version,
}))
checker.meta.ResourceManager.HandleNodeUp(ctx, 1)
checker.meta.ResourceManager.HandleNodeUp(ctx, 2)
// set target
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
// set dist
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
// test load l0 segments in next target
tasks := checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
// test load l0 segments in current target
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// seg l0 segment exist on a non delegator node
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
// test load l0 segments to delegator
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
ctx := context.Background()
checker := suite.checker
// set meta
checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.HandleNodeUp(ctx, 1)
checker.meta.ResourceManager.HandleNodeUp(ctx, 2)
// set target
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
// set dist
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
// seg l0 segment exist on a non delegator node
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel"))
// release duplicate l0 segment
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)
checker.dist.SegmentDistManager.Update(1)
// test release l0 segment which doesn't exist in target
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, nil, nil)
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
ctx := context.Background()
checker := suite.checker

View File

@ -151,6 +151,11 @@ func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetData
func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *querypb.GetDataDistributionResponse) {
updates := make([]*meta.Segment, 0, len(resp.GetSegments()))
for _, s := range resp.GetSegments() {
// To maintain compatibility with older versions of QueryNode,
// QueryCoord should neither process nor interact with L0 segments.
if s.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
segmentInfo := dh.target.GetSealedSegment(ctx, s.GetCollection(), s.GetID(), meta.CurrentTargetFirst)
if segmentInfo == nil {
segmentInfo = &datapb.SegmentInfo{
@ -211,8 +216,13 @@ func (dh *distHandler) updateLeaderView(ctx context.Context, resp *querypb.GetDa
collectionsToSync := typeutil.NewUniqueSet()
for _, lview := range resp.GetLeaderViews() {
segments := make(map[int64]*meta.Segment)
for ID, position := range lview.GrowingSegments {
// To maintain compatibility with older versions of QueryNode,
// QueryCoord should neither process nor interact with L0 segments.
segmentInfo := dh.target.GetSealedSegment(ctx, lview.GetCollection(), ID, meta.CurrentTargetFirst)
if segmentInfo != nil && segmentInfo.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
segments[ID] = &meta.Segment{
SegmentInfo: &datapb.SegmentInfo{
ID: ID,

View File

@ -25,7 +25,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/common"
@ -159,22 +158,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
return partition.PartitionID
})
channelInfos := make(map[string][]*datapb.VchannelInfo)
segments := make(map[int64]*datapb.SegmentInfo, 0)
dmChannels := make(map[string]*DmChannel)
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
for _, segmentID := range info.GetLevelZeroSegmentIds() {
segments[segmentID] = &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
InsertChannel: info.GetChannelName(),
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}
}
}
partitionSet := typeutil.NewUniqueSet(partitionIDs...)
for _, segmentInfo := range segmentInfos {
if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID {
@ -182,9 +166,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
}
}
for _, infos := range channelInfos {
merged := mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
dmChannels := make(map[string]*DmChannel)
for _, channelInfo := range vChannelInfos {
dmChannels[channelInfo.ChannelName] = DmChannelFromVChannel(channelInfo)
}
if len(segments) == 0 && len(dmChannels) == 0 {
@ -574,12 +558,12 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo
// if segment isn't l0 segment, and exist in current/next target, then it can be moved
func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool {
current := mgr.current.getCollectionTarget(collectionID)
if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
if current != nil && current.segments[segmentID] != nil {
return true
}
next := mgr.next.getCollectionTarget(collectionID)
if next != nil && next.segments[segmentID] != nil && next.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
if next != nil && next.segments[segmentID] != nil {
return true
}

View File

@ -257,7 +257,7 @@ func (suite *TargetManagerSuite) TestRemovePartition() {
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget))
suite.mgr.RemovePartition(ctx, collectionID, 100)
suite.assertSegments(append([]int64{3, 4}, suite.level0Segments...), suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget))
suite.assertSegments([]int64{3, 4}, suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget))
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, NextTarget))
suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, CurrentTarget))
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget))
@ -303,7 +303,7 @@ func (suite *TargetManagerSuite) getAllSegment(collectionID int64, partitionIDs
}
}
return append(allSegments, suite.level0Segments...)
return allSegments
}
func (suite *TargetManagerSuite) assertChannels(expected []string, actual map[string]*DmChannel) bool {

View File

@ -524,6 +524,8 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead
if channel != nil {
action.Checkpoint = channel.GetSeekPosition()
// used to clean delete buffer in delegator, cause delete record before this ts already be dispatch to sealed segments
action.DeleteCP = channel.GetDeleteCheckpoint()
}
return action

View File

@ -1676,108 +1676,6 @@ func (suite *TaskSuite) TestBalanceChannelTask() {
suite.Equal(2, task.step)
}
func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() {
ctx := context.Background()
collectionID := int64(1)
partitionID := int64(1)
channel := "channel-1"
vchannel := &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
}
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
{
ID: 3,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
}
suite.meta.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1))
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil)
suite.target.UpdateCollectionNextTarget(ctx, collectionID)
suite.target.UpdateCollectionCurrentTarget(ctx, collectionID)
suite.target.UpdateCollectionNextTarget(ctx, collectionID)
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
ID: 2,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 2},
2: {NodeID: 2},
3: {NodeID: 2},
},
})
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 2},
2: {NodeID: 2},
3: {NodeID: 2},
},
UnServiceableError: merr.ErrSegmentLack,
})
task, err := NewChannelTask(context.Background(),
10*time.Second,
WrapIDSource(2),
collectionID,
meta.NewReplica(
&querypb.Replica{
ID: 1,
},
typeutil.NewUniqueSet(),
),
NewChannelAction(1, ActionTypeGrow, channel),
NewChannelAction(2, ActionTypeReduce, channel),
)
suite.NoError(err)
// l0 hasn't been loaded into delegator, block balance
suite.scheduler.preProcess(task)
suite.Equal(0, task.step)
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 1},
2: {NodeID: 1},
3: {NodeID: 1},
},
})
// new delegator distribution updated, task step up
suite.scheduler.preProcess(task)
suite.Equal(1, task.step)
suite.dist.LeaderViewManager.Update(2)
// old delegator removed
suite.scheduler.preProcess(task)
suite.Equal(2, task.step)
}
func (suite *TaskSuite) TestGetTasksJSON() {
ctx := context.Background()
scheduler := suite.newScheduler()

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -70,18 +69,12 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target
}
segmentDist := targetMgr.GetSealedSegmentsByChannel(context.TODO(), leader.CollectionID, leader.Channel, scope)
// Check whether segments are fully loaded
for segmentID, info := range segmentDist {
for segmentID := range segmentDist {
_, exist := leader.Segments[segmentID]
if !exist {
log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID
if l0WithWrongLocation {
log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
}
return nil
}

View File

@ -81,9 +81,10 @@ type ShardDelegator interface {
ProcessInsert(insertRecords map[int64]*InsertData)
ProcessDelete(deleteData []*DeleteData, ts uint64)
LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, deleteSeekPos *msgpb.MsgPosition)
GetTargetVersion() int64
GetDeleteBufferSize() (entryNum int64, memorySize int64)
@ -863,6 +864,11 @@ func (sd *shardDelegator) Close() {
sd.tsCond.Broadcast()
sd.lifetime.Wait()
// clean up l0 segment in delete buffer
start := time.Now()
sd.deleteBuffer.Clear()
log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start)))
metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)
metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)
}

View File

@ -410,6 +410,10 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
zap.Int64s("segments", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
)
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel")
}
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
if err != nil {
log.Warn("delegator failed to find worker", zap.Error(err))
@ -420,17 +424,6 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
log.Debug("worker loads segments...")
sLoad := func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
info := req.GetInfos()[0]
// put meta l0, instead of load actual delta data
if info.GetLevel() == datapb.SegmentLevel_L0 && sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad {
l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, req.GetVersion(), info)
if err != nil {
return err
}
sd.collection.Ref(1)
sd.segmentManager.Put(ctx, segments.SegmentTypeSealed, l0Seg)
return nil
}
segmentID := req.GetInfos()[0].GetSegmentID()
nodeID := req.GetDstNodeID()
_, err, _ := sd.sf.Do(fmt.Sprintf("%d-%d", nodeID, segmentID), func() (struct{}, error) {
@ -481,51 +474,76 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
Level: info.GetLevel(),
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.RefreshLevel0DeletionStats()
} else {
// load bloom filter only when candidate not exists
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
})
// load bloom filter only when candidate not exists
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
})
var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats]
if sd.idfOracle != nil {
bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...)
if err != nil {
log.Warn("failed to load bm25 stats for segment", zap.Error(err))
return err
}
}
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats]
if sd.idfOracle != nil {
bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...)
if err != nil {
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
log.Warn("failed to load bm25 stats for segment", zap.Error(err))
return err
}
}
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, bm25Stats, infos, req, targetNodeID, worker)
if err != nil {
log.Warn("load stream delete failed", zap.Error(err))
return err
}
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
if err != nil {
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
return err
}
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, bm25Stats, infos, req, targetNodeID, worker)
if err != nil {
log.Warn("load stream delete failed", zap.Error(err))
return err
}
// alter distribution
sd.distribution.AddDistributions(entries...)
partStatsToReload := make([]UniqueID, 0)
lo.ForEach(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) {
partStatsToReload = append(partStatsToReload, info.PartitionID)
})
return nil
}
// LoadGrowing load growing segments locally.
func (sd *shardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
log := sd.getLogger(ctx)
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
log.Info("loading l0 segments...", zap.Int64s("segmentIDs", segmentIDs))
loaded := make([]segments.Segment, 0)
if sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad {
for _, info := range infos {
l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, version, info)
if err != nil {
return err
}
loaded = append(loaded, l0Seg)
}
} else {
var err error
loaded, err = sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeSealed, version, infos...)
if err != nil {
log.Warn("failed to load l0 segment", zap.Error(err))
return err
}
}
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs))
sd.deleteBuffer.RegisterL0(loaded...)
// register l0 segment
sd.RefreshLevel0DeletionStats()
return nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
level0Segments := sd.deleteBuffer.ListL0()
deltaData := storage.NewDeltaData(0)
for _, segment := range level0Segments {
@ -554,7 +572,7 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
}
func (sd *shardDelegator) RefreshLevel0DeletionStats() {
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
level0Segments := sd.deleteBuffer.ListL0()
totalSize := int64(0)
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
@ -562,6 +580,13 @@ func (sd *shardDelegator) RefreshLevel0DeletionStats() {
totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(sd.Collection()),
commonpb.SegmentState_Sealed.String(),
datapb.SegmentLevel_L0.String(),
).Set(float64(len(level0Segments)))
metrics.QueryNodeLevelZeroSize.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(sd.collectionID),
@ -582,8 +607,6 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
idCandidates := lo.SliceToMap(candidates, func(candidate *pkoracle.BloomFilterSet) (int64, *pkoracle.BloomFilterSet) {
return candidate.ID(), candidate
})
deltaPositions := req.GetDeltaPositions()
for _, info := range infos {
candidate := idCandidates[info.GetSegmentID()]
// forward l0 deletion
@ -602,16 +625,6 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
zap.Int64("segmentID", info.GetSegmentID()),
)
candidate := idCandidates[info.GetSegmentID()]
position := info.GetDeltaPosition()
if position == nil { // for compatibility of rolling upgrade from 2.2.x to 2.3
// During rolling upgrade, Querynode(2.3) may receive merged LoadSegmentRequest
// from QueryCoord(2.2); In version 2.2.x, only segments with the same dmlChannel
// can be merged, and deltaPositions will be merged into a single deltaPosition,
// so we should use `deltaPositions[0]` as the seek position for all the segments
// within the same LoadSegmentRequest.
position = deltaPositions[0]
}
// after L0 segment feature
// growing segemnts should have load stream delete as well
deleteScope := querypb.DataScope_All
@ -625,20 +638,19 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
deleteData := &storage.DeleteData{}
// start position is dml position for segment
// if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream
if position.GetTimestamp() < sd.deleteBuffer.SafeTs() {
log.Info("load delete from stream...")
streamDeleteData, err := sd.readDeleteFromMsgstream(ctx, position, sd.deleteBuffer.SafeTs(), candidate)
if err != nil {
log.Warn("failed to read delete data from msgstream", zap.Error(err))
return err
}
deleteData.Merge(streamDeleteData)
log.Info("load delete from stream done")
}
// if info.GetStartPosition().GetTimestamp() < sd.deleteBuffer.SafeTs() {
// log.Info("load delete from stream...")
// streamDeleteData, err := sd.readDeleteFromMsgstream(ctx, info.GetStartPosition(), sd.deleteBuffer.SafeTs(), candidate)
// if err != nil {
// log.Warn("failed to read delete data from msgstream", zap.Error(err))
// return err
// }
// deleteData.Merge(streamDeleteData)
// log.Info("load delete from stream done")
// }
// list buffered delete
deleteRecords := sd.deleteBuffer.ListAfter(position.GetTimestamp())
deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
for _, entry := range deleteRecords {
for _, record := range entry.Data {
if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
@ -844,14 +856,14 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
log := sd.getLogger(ctx)
targetNodeID := req.GetNodeID()
level0Segments := typeutil.NewSet(lo.Map(sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)), func(segment segments.Segment, _ int) int64 {
level0Segments := typeutil.NewSet(lo.Map(sd.deleteBuffer.ListL0(), func(segment segments.Segment, _ int) int64 {
return segment.ID()
})...)
hasLevel0 := false
for _, segmentID := range req.GetSegmentIDs() {
hasLevel0 = level0Segments.Contain(segmentID)
if hasLevel0 {
break
return merr.WrapErrServiceInternal("release L0 segment is not supported, l0 segment should only be released by unSubChannel/SyncDataDistribution")
}
}
@ -938,22 +950,17 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
if releaseErr != nil {
return releaseErr
}
if hasLevel0 {
sd.RefreshLevel0DeletionStats()
}
partitionsToReload := make([]UniqueID, 0)
lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) {
segment := sd.segmentManager.Get(segmentID)
if segment != nil {
partitionsToReload = append(partitionsToReload, segment.Partition())
}
})
return nil
}
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64,
sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition,
func (sd *shardDelegator) SyncTargetVersion(
newVersion int64,
partitions []int64,
growingInTarget []int64,
sealedInTarget []int64,
droppedInTarget []int64,
checkpoint *msgpb.MsgPosition,
deleteSeekPos *msgpb.MsgPosition,
) {
growings := sd.segmentManager.GetBy(
segments.WithType(segments.SegmentTypeGrowing),
@ -985,7 +992,10 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64
zap.Int64s("growingSegments", redundantGrowingIDs))
}
sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs)
sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp())
start := time.Now()
sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp())
log.Info("clean delete buffer cost", zap.Duration("cost", time.Since(start)))
sd.RefreshLevel0DeletionStats()
}
func (sd *shardDelegator) GetTargetVersion() int64 {

View File

@ -773,44 +773,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
},
},
})
s.NoError(err)
// err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
// Base: commonpbutil.NewMsgBase(),
// DstNodeID: 1,
// CollectionID: s.collectionID,
// Infos: []*querypb.SegmentLoadInfo{
// {
// SegmentID: 200,
// PartitionID: 500,
// StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
// DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
// Level: datapb.SegmentLevel_L1,
// InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
// },
// },
// })
s.NoError(err)
sealed, _ := s.delegator.GetSegmentInfo(false)
s.Require().Equal(1, len(sealed))
s.Equal(int64(1), sealed[0].NodeID)
s.ElementsMatch([]SegmentEntry{
{
SegmentID: 100,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
{
SegmentID: 200,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L0,
},
}, sealed[0].Segments)
s.ErrorIs(err, merr.ErrServiceInternal)
})
s.Run("load_segments_with_l0_delete_failed", func() {
@ -866,84 +829,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
s.ErrorIs(err, mockErr)
})
s.Run("load_segments_with_streaming_delete_failed", func() {
defer func() {
s.workerManager.ExpectedCalls = nil
s.loader.ExpectedCalls = nil
}()
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
pks := &storage.PkStatistics{
PkFilter: bf,
}
pks.UpdatePKRange(&storage.Int64FieldData{
Data: []int64{10, 20, 30},
})
bfs.AddHistoricalStats(pks)
return bfs
})
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
return nil
})
workers := make(map[int64]*cluster.MockWorker)
worker1 := &cluster.MockWorker{}
workers[1] = worker1
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
Return(nil)
worker1.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")).Return(nil)
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
return workers[nodeID]
}, nil)
s.delegator.ProcessDelete([]*DeleteData{
{
PartitionID: 500,
PrimaryKeys: []storage.PrimaryKey{
storage.NewInt64PrimaryKey(1),
storage.NewInt64PrimaryKey(10),
},
Timestamps: []uint64{10, 10},
RowCount: 2,
},
}, 10)
s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().GetUnmarshalDispatcher().Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.ConsumeMsgPack, 10)
close(ch)
s.mq.EXPECT().Chan().Return(ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 300,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 2},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 2},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
s.Error(err)
})
s.Run("get_worker_fail", func() {
defer func() {
s.workerManager.ExpectedCalls = nil
@ -1503,7 +1388,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms)
}
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{}, &msgpb.MsgPosition{})
s.Equal(int64(5), s.delegator.GetTargetVersion())
}
@ -1533,9 +1418,12 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: 10,
},
})
l0.LoadDeltaData(context.TODO(), partitionDeleteData)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0)
delegator.deleteBuffer.RegisterL0(l0)
l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{
CollectionID: 1,
@ -1544,6 +1432,9 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: int64(1),
StartPosition: &msgpb.MsgPosition{
Timestamp: 20,
},
})
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)
@ -1553,7 +1444,7 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
delegator.deleteBuffer.RegisterL0(l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
rawPks := make([]storage.PrimaryKey, 0, pks.Len())
for i := 0; i < pks.Len(); i++ {
@ -1569,14 +1460,14 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
s.Equal(pks.Len(), 1)
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
delegator.deleteBuffer.UnRegister(uint64(11))
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
delegator.deleteBuffer.UnRegister(uint64(21))
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
}

View File

@ -325,7 +325,7 @@ func (s *DelegatorSuite) initSegments() {
Version: 2001,
},
)
s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{})
s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{}, &msgpb.MsgPosition{})
}
func (s *DelegatorSuite) TestSearch() {

View File

@ -17,10 +17,13 @@
package deletebuffer
import (
"context"
"sort"
"sync"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
)
var errBufferFull = errors.New("buffer full")
@ -39,13 +42,24 @@ type DeleteBuffer[T timed] interface {
TryDiscard(uint64)
// Size returns current size information of delete buffer: entryNum and memory
Size() (entryNum, memorySize int64)
// Register L0 segment
RegisterL0(segments ...segments.Segment)
// ListAll L0
ListL0() []segments.Segment
// Clean delete data, include l0 segment and delete buffer
UnRegister(ts uint64)
// clean up delete buffer
Clear()
}
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
return &doubleCacheBuffer[T]{
head: newCacheBlock[T](startTs, maxSize),
maxSize: maxSize,
ts: startTs,
head: newCacheBlock[T](startTs, maxSize),
maxSize: maxSize,
ts: startTs,
l0Segments: make([]segments.Segment, 0),
}
}
@ -55,6 +69,55 @@ type doubleCacheBuffer[T timed] struct {
head, tail *cacheBlock[T]
maxSize int64
ts uint64
// maintain l0 segment list
l0Segments []segments.Segment
}
func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
c.mut.Lock()
defer c.mut.Unlock()
// Filter out nil segments
for _, seg := range segmentList {
if seg != nil {
c.l0Segments = append(c.l0Segments, seg)
}
}
}
func (c *doubleCacheBuffer[T]) ListL0() []segments.Segment {
c.mut.RLock()
defer c.mut.RUnlock()
return c.l0Segments
}
func (c *doubleCacheBuffer[T]) UnRegister(ts uint64) {
c.mut.Lock()
defer c.mut.Unlock()
var newSegments []segments.Segment
for _, s := range c.l0Segments {
if s.StartPosition().GetTimestamp() < ts {
s.Release(context.TODO())
continue
}
newSegments = append(newSegments, s)
}
c.l0Segments = newSegments
}
func (c *doubleCacheBuffer[T]) Clear() {
c.mut.Lock()
defer c.mut.Unlock()
for _, s := range c.l0Segments {
s.Release(context.TODO())
}
c.l0Segments = nil
// reset cache block
c.tail = c.head
c.head = newCacheBlock[T](c.ts, c.maxSize)
}
func (c *doubleCacheBuffer[T]) SafeTs() uint64 {

View File

@ -20,8 +20,11 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
)
@ -136,6 +139,67 @@ func (s *DoubleCacheBufferSuite) TestPut() {
s.EqualValues(234, memorySize)
}
func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() {
buffer := NewDoubleCacheDeleteBuffer[*Item](10, 1000)
// Create mock segments with specific IDs
seg1 := segments.NewMockSegment(s.T())
seg1.On("ID").Return(int64(1))
seg1.On("Release", mock.Anything).Return()
seg1.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 10,
})
seg2 := segments.NewMockSegment(s.T())
seg2.On("ID").Return(int64(2))
seg2.On("Release", mock.Anything).Return()
seg2.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 20,
})
seg3 := segments.NewMockSegment(s.T())
seg3.On("Release", mock.Anything).Return()
// Test RegisterL0 with multiple segments
buffer.RegisterL0(seg1, seg2)
segments := buffer.ListL0()
s.Equal(2, len(segments))
// Verify segment IDs by collecting them first
ids := make([]int64, 0, len(segments))
for _, seg := range segments {
ids = append(ids, seg.ID())
}
s.ElementsMatch([]int64{1, 2}, ids, "expected segment IDs 1 and 2 in any order")
// Test ListL0 with empty buffer
emptyBuffer := NewDoubleCacheDeleteBuffer[*Item](10, 1000)
s.Equal(0, len(emptyBuffer.ListL0()))
// Test UnRegister
buffer.UnRegister(15)
segments = buffer.ListL0()
s.Equal(1, len(segments))
s.Equal(int64(2), segments[0].ID())
// Verify Release was called on unregistered segment
seg1.AssertCalled(s.T(), "Release", mock.Anything)
// Test Clear
buffer.RegisterL0(seg3)
s.Equal(2, len(buffer.ListL0()))
buffer.Clear()
s.Equal(0, len(buffer.ListL0()))
// Verify Release was called on all segments
seg2.AssertCalled(s.T(), "Release", mock.Anything)
seg3.AssertCalled(s.T(), "Release", mock.Anything)
// Test RegisterL0 with nil segment (should not panic)
buffer.RegisterL0(nil)
s.Equal(0, len(buffer.ListL0()))
}
func TestDoubleCacheDeleteBuffer(t *testing.T) {
suite.Run(t, new(DoubleCacheBufferSuite))
}

View File

@ -17,10 +17,12 @@
package deletebuffer
import (
"context"
"sync"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/v2/metrics"
)
@ -30,6 +32,7 @@ func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []s
sizePerBlock: sizePerBlock,
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
labels: labels,
l0Segments: make([]segments.Segment, 0),
}
}
@ -50,6 +53,60 @@ type listDeleteBuffer[T timed] struct {
// metrics labels
labels []string
// maintain l0 segment list
l0Segments []segments.Segment
}
func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
b.mut.Lock()
defer b.mut.Unlock()
// Filter out nil segments
for _, seg := range segmentList {
if seg != nil {
b.l0Segments = append(b.l0Segments, seg)
}
}
b.updateMetrics()
}
func (b *listDeleteBuffer[T]) ListL0() []segments.Segment {
b.mut.RLock()
defer b.mut.RUnlock()
return b.l0Segments
}
func (b *listDeleteBuffer[T]) UnRegister(ts uint64) {
b.mut.Lock()
defer b.mut.Unlock()
var newSegments []segments.Segment
for _, s := range b.l0Segments {
if s.StartPosition().GetTimestamp() > ts {
newSegments = append(newSegments, s)
} else {
s.Release(context.TODO())
}
}
b.l0Segments = newSegments
b.tryCleanDelete(ts)
b.updateMetrics()
}
func (b *listDeleteBuffer[T]) Clear() {
b.mut.Lock()
defer b.mut.Unlock()
// clean l0 segments
for _, s := range b.l0Segments {
s.Release(context.TODO())
}
b.l0Segments = nil
// reset cache block
b.list = []*cacheBlock[T]{newCacheBlock[T](b.safeTs, b.sizePerBlock)}
b.updateMetrics()
}
func (b *listDeleteBuffer[T]) updateMetrics() {
@ -93,6 +150,10 @@ func (b *listDeleteBuffer[T]) SafeTs() uint64 {
func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
b.mut.Lock()
defer b.mut.Unlock()
b.tryCleanDelete(ts)
}
func (b *listDeleteBuffer[T]) tryCleanDelete(ts uint64) {
if len(b.list) == 1 {
return
}

View File

@ -19,8 +19,11 @@ package deletebuffer
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
)
@ -126,6 +129,67 @@ func (s *ListDeleteBufferSuite) TestTryDiscard() {
s.EqualValues(120, memorySize)
}
func (s *ListDeleteBufferSuite) TestL0SegmentOperations() {
buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"})
// Create mock segments with specific IDs
seg1 := segments.NewMockSegment(s.T())
seg1.On("ID").Return(int64(1))
seg1.On("Release", mock.Anything).Return()
seg1.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 10,
})
seg2 := segments.NewMockSegment(s.T())
seg2.On("ID").Return(int64(2))
seg2.On("Release", mock.Anything).Return()
seg2.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 20,
})
seg3 := segments.NewMockSegment(s.T())
seg3.On("Release", mock.Anything).Return()
// Test RegisterL0 with multiple segments
buffer.RegisterL0(seg1, seg2)
segments := buffer.ListL0()
s.Equal(2, len(segments))
// Verify segment IDs by collecting them first
ids := make([]int64, 0, len(segments))
for _, seg := range segments {
ids = append(ids, seg.ID())
}
s.ElementsMatch([]int64{1, 2}, ids, "expected segment IDs 1 and 2 in any order")
// Test ListL0 with empty buffer
emptyBuffer := NewListDeleteBuffer[*Item](10, 1000, []string{})
s.Equal(0, len(emptyBuffer.ListL0()))
// Test UnRegister
buffer.UnRegister(15)
segments = buffer.ListL0()
s.Equal(1, len(segments))
s.Equal(int64(2), segments[0].ID())
// Verify Release was called on unregistered segment
seg1.AssertCalled(s.T(), "Release", mock.Anything)
// Test Clear
buffer.RegisterL0(seg3)
s.Equal(2, len(buffer.ListL0()))
buffer.Clear()
s.Equal(0, len(buffer.ListL0()))
// Verify Release was called on all segments
seg2.AssertCalled(s.T(), "Release", mock.Anything)
seg3.AssertCalled(s.T(), "Release", mock.Anything)
// Test RegisterL0 with nil segment (should not panic)
buffer.RegisterL0(nil)
s.Equal(0, len(buffer.ListL0()))
}
func TestListDeleteBuffer(t *testing.T) {
suite.Run(t, new(ListDeleteBufferSuite))
}

View File

@ -183,9 +183,7 @@ func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context,
}
func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog {
level0Segments := sd.segmentManager.GetBy(
segments.WithLevel(datapb.SegmentLevel_L0),
segments.WithChannel(sd.vchannelName))
level0Segments := sd.deleteBuffer.ListL0()
var deltalogs []*datapb.FieldBinlog

View File

@ -416,7 +416,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() {
}
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
s.Require().NoError(err)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)
s.delegator.deleteBuffer.RegisterL0(l0Segment)
seg.EXPECT().ID().Return(10000)
seg.EXPECT().Partition().Return(100)
@ -463,7 +463,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingLoad() {
}
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
s.Require().NoError(err)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)
s.delegator.deleteBuffer.RegisterL0(l0Segment)
seg.EXPECT().ID().Return(10000)
seg.EXPECT().Partition().Return(100)

View File

@ -498,6 +498,54 @@ func (_c *MockShardDelegator_LoadGrowing_Call) RunAndReturn(run func(context.Con
return _c
}
// LoadL0 provides a mock function with given fields: ctx, infos, version
func (_m *MockShardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
ret := _m.Called(ctx, infos, version)
if len(ret) == 0 {
panic("no return value specified for LoadL0")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []*querypb.SegmentLoadInfo, int64) error); ok {
r0 = rf(ctx, infos, version)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockShardDelegator_LoadL0_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadL0'
type MockShardDelegator_LoadL0_Call struct {
*mock.Call
}
// LoadL0 is a helper method to define mock.On call
// - ctx context.Context
// - infos []*querypb.SegmentLoadInfo
// - version int64
func (_e *MockShardDelegator_Expecter) LoadL0(ctx interface{}, infos interface{}, version interface{}) *MockShardDelegator_LoadL0_Call {
return &MockShardDelegator_LoadL0_Call{Call: _e.mock.On("LoadL0", ctx, infos, version)}
}
func (_c *MockShardDelegator_LoadL0_Call) Run(run func(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64)) *MockShardDelegator_LoadL0_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]*querypb.SegmentLoadInfo), args[2].(int64))
})
return _c
}
func (_c *MockShardDelegator_LoadL0_Call) Return(_a0 error) *MockShardDelegator_LoadL0_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockShardDelegator_LoadL0_Call) RunAndReturn(run func(context.Context, []*querypb.SegmentLoadInfo, int64) error) *MockShardDelegator_LoadL0_Call {
_c.Call.Return(run)
return _c
}
// LoadSegments provides a mock function with given fields: ctx, req
func (_m *MockShardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
ret := _m.Called(ctx, req)
@ -985,9 +1033,9 @@ func (_c *MockShardDelegator_SyncPartitionStats_Call) RunAndReturn(run func(cont
return _c
}
// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) {
_m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)
// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, deleteSeekPos
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, deleteSeekPos *msgpb.MsgPosition) {
_m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, deleteSeekPos)
}
// MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion'
@ -1002,13 +1050,14 @@ type MockShardDelegator_SyncTargetVersion_Call struct {
// - sealedInTarget []int64
// - droppedInTarget []int64
// - checkpoint *msgpb.MsgPosition
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call {
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)}
// - deleteSeekPos *msgpb.MsgPosition
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}, deleteSeekPos interface{}) *MockShardDelegator_SyncTargetVersion_Call {
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, deleteSeekPos)}
}
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, deleteSeekPos *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition))
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition), args[6].(*msgpb.MsgPosition))
})
return _c
}
@ -1018,7 +1067,7 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) Return() *MockShardDelegato
return _c
}
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
_c.Call.Return(run)
return _c
}

View File

@ -68,18 +68,7 @@ func loadL0Segments(ctx context.Context, delegator delegator.ShardDelegator, req
}
}
return delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: req.GetBase(),
DstNodeID: req.GetNodeID(),
Infos: l0Segments,
Schema: req.GetSchema(),
CollectionID: req.GetCollectionID(),
LoadMeta: req.GetLoadMeta(),
ReplicaID: req.GetReplicaID(),
Version: req.GetVersion(),
NeedTransfer: false,
IndexInfoList: req.GetIndexInfoList(),
})
return delegator.LoadL0(ctx, l0Segments, req.GetVersion())
}
func loadGrowingSegments(ctx context.Context, delegator delegator.ShardDelegator, req *querypb.WatchDmChannelsRequest) error {

View File

@ -355,7 +355,9 @@ func (loader *segmentLoader) Load(ctx context.Context,
return errors.Wrap(err, "At LoadDeltaLogs")
}
loader.manager.Segment.Put(ctx, segmentType, segment)
if segment.Level() != datapb.SegmentLevel_L0 {
loader.manager.Segment.Put(ctx, segmentType, segment)
}
newSegments.GetAndRemove(segmentID)
loaded.Insert(segmentID, segment)
loader.notifyLoadFinish(loadInfo)

View File

@ -272,17 +272,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
defer func() {
if err != nil {
node.delegators.GetAndRemove(channel.GetChannelName())
delegator.Close()
}
}()
// create tSafe
// node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp())
// defer func() {
// if err != nil {
// node.tSafeManager.Remove(ctx, channel.ChannelName)
// }
// }()
pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName())
if err != nil {
msg := "failed to create pipeline"
@ -306,9 +299,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
// remove legacy growing
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(channel.GetChannelName()),
segments.WithType(segments.SegmentTypeGrowing))
// remove legacy l0 segments
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(channel.GetChannelName()),
segments.WithLevel(datapb.SegmentLevel_L0))
}
}()
@ -371,10 +361,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
_, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
// node.tSafeManager.Remove(ctx, req.GetChannelName())
node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed))
node.manager.Collection.Unref(req.GetCollectionID(), 1)
}
log.Info("unsubscribed channel")
@ -1322,8 +1309,12 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
return id, action.GetCheckpoint().Timestamp
})
shardDelegator.AddExcludedSegments(flushedInfo)
deleteCP := &msgpb.MsgPosition{}
if deleteCP.GetTimestamp() == 0 {
deleteCP = action.GetCheckpoint()
}
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(),
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint())
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint(), deleteCP)
case querypb.SyncType_UpdatePartitionStats:
log.Info("sync update partition stats versions")
shardDelegator.SyncPartitionStats(ctx, action.PartitionStatsVersions)

View File

@ -543,16 +543,6 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
// prepate
suite.TestWatchDmChannelsInt64()
l0Segment := segments.NewMockSegment(suite.T())
l0Segment.EXPECT().ID().Return(10000)
l0Segment.EXPECT().Collection().Return(suite.collectionID)
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
l0Segment.EXPECT().Shard().Return(suite.channel)
l0Segment.EXPECT().Release(ctx).Return()
suite.node.manager.Segment.Put(ctx, segments.SegmentTypeSealed, l0Segment)
// data
req := &querypb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{
@ -567,10 +557,6 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
status, err := suite.node.UnsubDmChannel(ctx, req)
suite.NoError(merr.CheckRPCCall(status, err))
suite.Len(suite.node.manager.Segment.GetBy(
segments.WithChannel(suite.vchannel),
segments.WithLevel(datapb.SegmentLevel_L0)), 0)
}
func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {
@ -1417,7 +1403,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
syncVersionAction := &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
SealedInTarget: []int64{1, 2, 3, 4},
SealedInTarget: []int64{1, 2, 3},
TargetVersion: time.Now().UnixMilli(),
}
@ -2134,7 +2120,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
suite.True(ok)
sealedSegments, _ := delegator.GetSegmentInfo(false)
// 1 level 0 + 3 sealed segments
suite.Len(sealedSegments[0].Segments, 4)
suite.Len(sealedSegments[0].Segments, 3)
// data
req := &querypb.SyncDistributionRequest{
@ -2158,7 +2144,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
sealedSegments, _ = delegator.GetSegmentInfo(false)
suite.Len(sealedSegments[0].Segments, 3)
suite.Len(sealedSegments[0].Segments, 2)
releaseAction = &querypb.SyncAction{
Type: querypb.SyncType_Remove,
@ -2172,7 +2158,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
sealedSegments, _ = delegator.GetSegmentInfo(false)
suite.Len(sealedSegments[0].Segments, 2)
suite.Len(sealedSegments[0].Segments, 1)
}
func (suite *ServiceSuite) TestSyncDistribution_Failed() {

View File

@ -288,6 +288,8 @@ message VchannelInfo {
repeated SegmentInfo indexed_segments = 11; // deprecated, keep it for compatibility
repeated int64 level_zero_segment_ids = 12;
map<int64, int64> partition_stats_versions = 13;
// delete record which ts is smaller than delete_checkpoint already be dispatch to sealed segments.
msg.MsgPosition delete_checkpoint = 14;
}
message WatchDmChannelsRequest {

File diff suppressed because it is too large Load Diff

View File

@ -725,6 +725,7 @@ message SyncAction {
repeated int64 droppedInTarget = 10;
msg.MsgPosition checkpoint = 11;
map<int64, int64> partition_stats_versions = 12;
msg.MsgPosition deleteCP = 13;
}
message SyncDistributionRequest {

File diff suppressed because it is too large Load Diff

View File

@ -182,7 +182,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
return len(resp.Channels) == 1 && len(resp.Segments) == 2
}, 30*time.Second, 1*time.Second)
// check total segment number and total channel number
@ -195,7 +195,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
segNum += len(resp1.Segments)
chNum += len(resp1.Channels)
}
return segNum == 8 && chNum == 2
return segNum == 4 && chNum == 2
}, 30*time.Second, 1*time.Second)
}
@ -220,13 +220,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
s.Eventually(func() bool {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
return len(resp.Channels) == 1 && len(resp.Segments) == 2
}, 30*time.Second, 1*time.Second)
s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
return len(resp.Channels) == 1 && len(resp.Segments) == 2
}, 30*time.Second, 1*time.Second)
// check total segment number and total channel number
@ -239,7 +239,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
segNum += len(resp1.Segments)
chNum += len(resp1.Channels)
}
return segNum == 16 && chNum == 4
return segNum == 8 && chNum == 4
}, 30*time.Second, 1*time.Second)
}
@ -250,12 +250,12 @@ func (s *BalanceTestSuit) TestNodeDown() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key, "false")
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.EnableStoppingBalance.Key, "false")
// init collection with 3 channel, each channel has 15 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
// init collection with 2 channel, each channel has 15 segment, each segment has 2000 row
// and load it with 1 replicas on 2 nodes.
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 15, 2000, 500)
// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
// then we add 2 query node, after balance happens, expected each node have 10 segments
qn1 := s.Cluster.AddQueryNode()
qn2 := s.Cluster.AddQueryNode()
@ -264,7 +264,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", len(resp.Segments)))
return len(resp.Channels) == 0 && len(resp.Segments) >= 10
}, 30*time.Second, 1*time.Second)
@ -295,7 +295,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 1 && len(resp.Segments) >= 15
return len(resp.Channels) == 1 && len(resp.Segments) == 15
}, 30*time.Second, 1*time.Second)
// expect all delegator will recover to healthy