mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [cherry-pick] exclude insertData before growing checkpoint (#29559)
Cherry-pick from master pr: #29558 See also: #29556 Refine exclude segment function signature Add exclude growing before checkpoint logic Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a13fc4d346
commit
67313ccc86
@ -23,7 +23,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
@ -39,7 +38,7 @@ type filterNode struct {
|
||||
*BaseNode
|
||||
collectionID UniqueID
|
||||
manager *DataManager
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
|
||||
channel string
|
||||
InsertMsgPolicys []InsertMsgFilter
|
||||
DeleteMsgPolicys []DeleteMsgFilter
|
||||
@ -134,7 +133,7 @@ func newFilterNode(
|
||||
collectionID int64,
|
||||
channel string,
|
||||
manager *DataManager,
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo],
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, uint64],
|
||||
maxQueueLength int32,
|
||||
) *filterNode {
|
||||
return &filterNode{
|
||||
|
||||
@ -22,8 +22,6 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
@ -40,7 +38,7 @@ type FilterNodeSuite struct {
|
||||
channel string
|
||||
|
||||
validSegmentIDs []int64
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
|
||||
excludedSegmentIDs []int64
|
||||
insertSegmentIDs []int64
|
||||
deleteSegmentSum int
|
||||
@ -64,13 +62,9 @@ func (suite *FilterNodeSuite) SetupSuite() {
|
||||
suite.errSegmentID = 7
|
||||
|
||||
// init excludedSegment
|
||||
suite.excludedSegments = typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]()
|
||||
suite.excludedSegments = typeutil.NewConcurrentMap[int64, uint64]()
|
||||
for _, id := range suite.excludedSegmentIDs {
|
||||
suite.excludedSegments.Insert(id, &datapb.SegmentInfo{
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
Timestamp: 1,
|
||||
},
|
||||
})
|
||||
suite.excludedSegments.Insert(id, 1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -54,11 +54,11 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error {
|
||||
}
|
||||
|
||||
func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
|
||||
segInfo, ok := n.excludedSegments.Get(msg.SegmentID)
|
||||
ts, ok := n.excludedSegments.Get(msg.SegmentID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if msg.EndTimestamp <= segInfo.GetDmlPosition().GetTimestamp() {
|
||||
if msg.EndTimestamp <= ts {
|
||||
m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID())
|
||||
return merr.WrapErrSegmentLack(msg.GetSegmentID(), m)
|
||||
}
|
||||
|
||||
@ -19,7 +19,6 @@ package pipeline
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
@ -32,23 +31,23 @@ import (
|
||||
// pipeline used for querynode
|
||||
type Pipeline interface {
|
||||
base.StreamPipeline
|
||||
ExcludedSegments(segInfos ...*datapb.SegmentInfo)
|
||||
ExcludedSegments(info map[int64]uint64)
|
||||
}
|
||||
|
||||
type pipeline struct {
|
||||
base.StreamPipeline
|
||||
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
|
||||
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (p *pipeline) ExcludedSegments(segInfos ...*datapb.SegmentInfo) {
|
||||
for _, segInfo := range segInfos {
|
||||
func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) {
|
||||
for segmentID, ts := range excludeInfo {
|
||||
log.Debug("pipeline add exclude info",
|
||||
zap.Int64("segmentID", segInfo.GetID()),
|
||||
zap.Uint64("ts", segInfo.GetDmlPosition().GetTimestamp()),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Uint64("ts", ts),
|
||||
)
|
||||
p.excludedSegments.Insert(segInfo.GetID(), segInfo)
|
||||
p.excludedSegments.Insert(segmentID, ts)
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,7 +65,7 @@ func NewPipeLine(
|
||||
delegator delegator.ShardDelegator,
|
||||
) (Pipeline, error) {
|
||||
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
|
||||
excludedSegments := typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]()
|
||||
excludedSegments := typeutil.NewConcurrentMap[int64, uint64]()
|
||||
|
||||
p := &pipeline{
|
||||
collectionID: collectionID,
|
||||
|
||||
@ -32,7 +32,6 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
@ -300,25 +299,21 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||
}
|
||||
}()
|
||||
|
||||
flushedSegments := lo.Map(channel.GetFlushedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: id,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
Timestamp: typeutil.MaxTimestamp,
|
||||
},
|
||||
}
|
||||
growingInfo := lo.SliceToMap(channel.GetUnflushedSegmentIds(), func(id int64) (int64, uint64) {
|
||||
info := req.GetSegmentInfos()[id]
|
||||
return id, info.GetDmlPosition().GetTimestamp()
|
||||
})
|
||||
pipeline.ExcludedSegments(flushedSegments...)
|
||||
pipeline.ExcludedSegments(growingInfo)
|
||||
|
||||
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
pipeline.ExcludedSegments(flushedInfo)
|
||||
for _, channelInfo := range req.GetInfos() {
|
||||
droppedInfos := lo.Map(channelInfo.GetDroppedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: id,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
Timestamp: typeutil.MaxTimestamp,
|
||||
},
|
||||
}
|
||||
droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
pipeline.ExcludedSegments(droppedInfos...)
|
||||
pipeline.ExcludedSegments(droppedInfos)
|
||||
}
|
||||
|
||||
err = loadGrowingSegments(ctx, delegator, req)
|
||||
@ -568,15 +563,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
|
||||
// in case of consumed it's growing segment again
|
||||
pipeline := node.pipelineManager.Get(req.GetShard())
|
||||
if pipeline != nil {
|
||||
droppedInfos := lo.Map(req.GetSegmentIDs(), func(id int64, _ int) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: id,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
Timestamp: typeutil.MaxTimestamp,
|
||||
},
|
||||
}
|
||||
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
pipeline.ExcludedSegments(droppedInfos...)
|
||||
pipeline.ExcludedSegments(droppedInfos)
|
||||
}
|
||||
|
||||
req.NeedTransfer = false
|
||||
@ -1376,15 +1366,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
||||
pipeline := node.pipelineManager.Get(req.GetChannel())
|
||||
if pipeline != nil {
|
||||
droppedInfos := lo.Map(action.GetDroppedInTarget(), func(id int64, _ int) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: id,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
Timestamp: typeutil.MaxTimestamp,
|
||||
},
|
||||
}
|
||||
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
pipeline.ExcludedSegments(droppedInfos...)
|
||||
|
||||
pipeline.ExcludedSegments(droppedInfos)
|
||||
}
|
||||
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
||||
action.GetSealedInTarget(), action.GetDroppedInTarget())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user