fix: use correct delete checkpoint to prevent premature data cleanup (#40366)

issue: #40292
related to #39552

- Fix incorrect delete checkpoint usage in SyncDistribution
- Change checkpoint parameter from action.GetCheckpoint() to
action.GetDeleteCP() in SyncTargetVersion call
- This resolves the issue where delete buffer data was being cleaned
prematurely due to wrong checkpoint reference

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-03-12 15:00:08 +08:00 committed by GitHub
parent c0e03b6ca4
commit 0420dc1eb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 789 additions and 724 deletions

View File

@ -213,15 +213,22 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
zap.Any("partition stats", partStatsVersionsMap), zap.Any("partition stats", partStatsVersionsMap),
) )
seekPosition := h.GetChannelSeekPosition(channel, partitionIDs...)
// if no l0 segment exist, use checkpoint as delete checkpoint
if len(levelZeroIDs) == 0 {
deleteCheckPoint = seekPosition
}
return &datapb.VchannelInfo{ return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(), CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(), ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...), SeekPosition: seekPosition,
FlushedSegmentIds: flushedIDs.Collect(), FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(), UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(), DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(), LevelZeroSegmentIds: levelZeroIDs.Collect(),
PartitionStatsVersions: partStatsVersionsMap, PartitionStatsVersions: partStatsVersionsMap,
DeleteCheckpoint: deleteCheckPoint,
} }
} }

View File

@ -238,6 +238,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
ChannelName: "ch1", ChannelName: "ch1",
MsgID: []byte{8, 9, 10}, MsgID: []byte{8, 9, 10},
MsgGroup: "", MsgGroup: "",
Timestamp: 1,
}, },
DmlPosition: &msgpb.MsgPosition{ DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1", ChannelName: "ch1",
@ -268,6 +269,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1) infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1)
assert.EqualValues(t, 0, infos.CollectionID) assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 1, len(infos.GetLevelZeroSegmentIds())) assert.EqualValues(t, 1, len(infos.GetLevelZeroSegmentIds()))
assert.EqualValues(t, uint64(1), infos.GetDeleteCheckpoint().GetTimestamp())
}) })
t.Run("empty collection with passed positions", func(t *testing.T) { t.Run("empty collection with passed positions", func(t *testing.T) {

View File

@ -897,6 +897,8 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())), zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())),
zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())),
zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())),
zap.Time("# of check point", tsoutil.PhysicalTime(channelInfo.GetSeekPosition().GetTimestamp())),
zap.Time("# of delete check point", tsoutil.PhysicalTime(channelInfo.GetDeleteCheckpoint().GetTimestamp())),
) )
flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...)
} }

View File

@ -111,6 +111,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
UnflushedSegmentIds: t.GetGrowingSegmentIDs(), UnflushedSegmentIds: t.GetGrowingSegmentIDs(),
FlushedSegmentIds: lo.Keys(segments), FlushedSegmentIds: lo.Keys(segments),
DroppedSegmentIds: t.GetDroppedSegmentIDs(), DroppedSegmentIds: t.GetDroppedSegmentIDs(),
DeleteCheckpoint: t.GetDeleteCheckpoint(),
}, },
} }
} }
@ -173,6 +174,7 @@ func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget {
GrowingSegmentIDs: channel.GetUnflushedSegmentIds(), GrowingSegmentIDs: channel.GetUnflushedSegmentIds(),
DroppedSegmentIDs: channel.GetDroppedSegmentIds(), DroppedSegmentIDs: channel.GetDroppedSegmentIds(),
PartitionTargets: lo.Values(partitionTargets), PartitionTargets: lo.Values(partitionTargets),
DeleteCheckpoint: channel.GetDeleteCheckpoint(),
} }
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -116,10 +117,16 @@ func (suite *TargetObserverSuite) SetupTest() {
{ {
CollectionID: suite.collectionID, CollectionID: suite.collectionID,
ChannelName: "channel-1", ChannelName: "channel-1",
DeleteCheckpoint: &msgpb.MsgPosition{
Timestamp: 200,
},
}, },
{ {
CollectionID: suite.collectionID, CollectionID: suite.collectionID,
ChannelName: "channel-2", ChannelName: "channel-2",
DeleteCheckpoint: &msgpb.MsgPosition{
Timestamp: 200,
},
}, },
} }
@ -197,8 +204,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
ready, err := suite.observer.UpdateNextTarget(suite.collectionID) ready, err := suite.observer.UpdateNextTarget(suite.collectionID)
suite.NoError(err) suite.NoError(err)
suite.distMgr.LeaderViewManager.Update(2, ch1View := &meta.LeaderView{
&meta.LeaderView{
ID: 2, ID: 2,
CollectionID: suite.collectionID, CollectionID: suite.collectionID,
Channel: "channel-1", Channel: "channel-1",
@ -206,16 +212,18 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
11: {NodeID: 2}, 11: {NodeID: 2},
13: {NodeID: 2}, 13: {NodeID: 2},
}, },
}, }
&meta.LeaderView{
ch2View := &meta.LeaderView{
ID: 2, ID: 2,
CollectionID: suite.collectionID, CollectionID: suite.collectionID,
Channel: "channel-2", Channel: "channel-2",
Segments: map[int64]*querypb.SegmentDist{ Segments: map[int64]*querypb.SegmentDist{
12: {NodeID: 2}, 12: {NodeID: 2},
}, },
}, }
)
suite.distMgr.LeaderViewManager.Update(2, ch1View, ch2View)
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
@ -233,6 +241,9 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 3 && len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 3 &&
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 2 len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 2
}, 7*time.Second, 1*time.Second) }, 7*time.Second, 1*time.Second)
action := suite.observer.checkNeedUpdateTargetVersion(ctx, ch1View, 100)
suite.Equal(action.GetDeleteCP().Timestamp, uint64(200))
} }
func (suite *TargetObserverSuite) TestTriggerRelease() { func (suite *TargetObserverSuite) TestTriggerRelease() {

View File

@ -533,7 +533,7 @@ func (sd *shardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLo
} }
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() }) segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs)) log.Info("load l0 segments done", zap.Int64s("segmentIDs", segmentIDs))
sd.deleteBuffer.RegisterL0(loaded...) sd.deleteBuffer.RegisterL0(loaded...)
// register l0 segment // register l0 segment
@ -623,6 +623,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
for _, info := range infos { for _, info := range infos {
log := log.With( log := log.With(
zap.Int64("segmentID", info.GetSegmentID()), zap.Int64("segmentID", info.GetSegmentID()),
zap.Time("startPosition", tsoutil.PhysicalTime(info.GetStartPosition().GetTimestamp())),
) )
candidate := idCandidates[info.GetSegmentID()] candidate := idCandidates[info.GetSegmentID()]
// after L0 segment feature // after L0 segment feature
@ -651,8 +652,11 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
// } // }
// list buffered delete // list buffered delete
deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp()) deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
tsHitDeleteRows := int64(0)
start := time.Now()
for _, entry := range deleteRecords { for _, entry := range deleteRecords {
for _, record := range entry.Data { for _, record := range entry.Data {
tsHitDeleteRows += int64(len(record.DeleteData.Pks))
if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID { if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
continue continue
} }
@ -676,7 +680,11 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
} }
// if delete count not empty, apply // if delete count not empty, apply
if deleteData.RowCount > 0 { if deleteData.RowCount > 0 {
log.Info("forward delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount)) log.Info("forward delete to worker...",
zap.Int64("tsHitDeleteRowNum", tsHitDeleteRows),
zap.Int64("bfHitDeleteRowNum", deleteData.RowCount),
zap.Int64("bfCost", time.Since(start).Milliseconds()),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{ err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(), CollectionId: info.GetCollectionID(),
@ -993,8 +1001,21 @@ func (sd *shardDelegator) SyncTargetVersion(
} }
sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs) sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs)
start := time.Now() start := time.Now()
sizeBeforeClean, _ := sd.deleteBuffer.Size()
sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp()) sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp())
log.Info("clean delete buffer cost", zap.Duration("cost", time.Since(start))) sizeAfterClean, _ := sd.deleteBuffer.Size()
if sizeAfterClean < sizeBeforeClean {
log.Info("clean delete buffer",
zap.String("channel", sd.vchannelName),
zap.Time("deleteSeekPos", tsoutil.PhysicalTime(deleteSeekPos.GetTimestamp())),
zap.Time("channelCP", tsoutil.PhysicalTime(checkpoint.GetTimestamp())),
zap.Int64("sizeBeforeClean", sizeBeforeClean),
zap.Int64("sizeAfterClean", sizeAfterClean),
zap.Duration("cost", time.Since(start)),
)
}
sd.RefreshLevel0DeletionStats() sd.RefreshLevel0DeletionStats()
} }

View File

@ -55,6 +55,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -1317,9 +1318,13 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
return id, action.GetCheckpoint().Timestamp return id, action.GetCheckpoint().Timestamp
}) })
shardDelegator.AddExcludedSegments(flushedInfo) shardDelegator.AddExcludedSegments(flushedInfo)
deleteCP := &msgpb.MsgPosition{} deleteCP := action.GetDeleteCP()
if deleteCP.GetTimestamp() == 0 { if deleteCP == nil {
// for compatible with 2.4, we use checkpoint as deleteCP when deleteCP is nil
deleteCP = action.GetCheckpoint() deleteCP = action.GetCheckpoint()
log.Info("use checkpoint as deleteCP",
zap.String("channelName", req.GetChannel()),
zap.Time("deleteSeekPos", tsoutil.PhysicalTime(action.GetCheckpoint().GetTimestamp())))
} }
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(), shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(),
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint(), deleteCP) action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint(), deleteCP)

View File

@ -855,6 +855,7 @@ message ChannelTarget {
repeated int64 growing_segmentIDs = 3; repeated int64 growing_segmentIDs = 3;
repeated PartitionTarget partition_targets = 4; repeated PartitionTarget partition_targets = 4;
msg.MsgPosition seek_position = 5; msg.MsgPosition seek_position = 5;
msg.MsgPosition delete_checkpoint = 6;
} }
message CollectionTarget { message CollectionTarget {

File diff suppressed because it is too large Load Diff