From 539f56220f35f2a91c37378e8f1d0d80623bf7da Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 22 Oct 2024 11:15:28 +0800 Subject: [PATCH] enhance: Remove bf from datanode (#36367) (#37027) Remove bf from datanode: 1. When watching vchannels, skip loading **flushed** segments's bf. For generating merged bf, we need to keep loading **growing** segments's bf. 2. Bypass bloom filter checks for delete messages, directly writing to L0 segments. 3. In version 2.4, when dropping a partition, marking segments as dropped depends on having the full segment list in the DataNode. So, we need to keep syncing the segments every 10 minutes. issue: https://github.com/milvus-io/milvus/issues/34585 pr: https://github.com/milvus-io/milvus/pull/35902, https://github.com/milvus-io/milvus/pull/36367, https://github.com/milvus-io/milvus/pull/36592 --------- Signed-off-by: bigsheeper --- internal/datanode/data_sync_service.go | 26 ++++++++++++++----- .../datanode/writebuffer/l0_write_buffer.go | 23 +++++++++++++--- pkg/util/paramtable/component_param.go | 3 ++- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 31804d3998..f399e45508 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -178,8 +178,11 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C } } + // growing segments's stats should always be loaded, for generating merged pk bf. loadSegmentStats("growing", unflushed) - loadSegmentStats("sealed", flushed) + if !paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { + loadSegmentStats("sealed", flushed) + } // use fetched segment info info.Vchan.FlushedSegments = flushed @@ -345,13 +348,22 @@ func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *da // NOTE: compactiable for event manager func newDataSyncService(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) { // recover segment checkpoints - unflushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds()) - if err != nil { - return nil, err + var ( + err error + unflushedSegmentInfos []*datapb.SegmentInfo + flushedSegmentInfos []*datapb.SegmentInfo + ) + if len(info.GetVchan().GetUnflushedSegmentIds()) > 0 { + unflushedSegmentInfos, err = node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds()) + if err != nil { + return nil, err + } } - flushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds()) - if err != nil { - return nil, err + if len(info.GetVchan().GetFlushedSegmentIds()) > 0 { + flushedSegmentInfos, err = node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds()) + if err != nil { + return nil, err + } } var storageCache *metacache.StorageV2Cache diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 0a30001883..1a9966020f 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -135,6 +135,17 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs }) } +func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { + for _, msg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos) + pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) + pkTss := msg.GetTimestamps() + if len(pks) > 0 { + wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos) + } + } +} + func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { wb.mut.Lock() defer wb.mut.Unlock() @@ -152,9 +163,15 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg } } - // distribute delete msg - // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data - wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { + // In Skip BF mode, datanode no longer maintains bloom filters. + // So, here we skip filtering delete entries. + wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) + } else { + // distribute delete msg + // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data + wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + } // update pk oracle for _, inData := range groups { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index b40de160cb..b68d2b7eaa 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4317,7 +4317,8 @@ Setting this parameter too small causes the system to store a small amount of da Key: "dataNode.skip.BFStats.Load", Version: "2.2.5", PanicIfEmpty: false, - DefaultValue: "false", + DefaultValue: "true", + Forbidden: true, // The SkipBFStatsLoad is a static config that not allow dynamic refresh. } p.SkipBFStatsLoad.Init(base.mgr)