mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Remove bf from datanode: 1. When watching vchannels, skip loading **flushed** segments's bf. For generating merged bf, we need to keep loading **growing** segments's bf. 2. Bypass bloom filter checks for delete messages, directly writing to L0 segments. 3. In version 2.4, when dropping a partition, marking segments as dropped depends on having the full segment list in the DataNode. So, we need to keep syncing the segments every 10 minutes. issue: https://github.com/milvus-io/milvus/issues/34585 pr: https://github.com/milvus-io/milvus/pull/35902, https://github.com/milvus-io/milvus/pull/36367, https://github.com/milvus-io/milvus/pull/36592 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
a2d4520dc7
commit
539f56220f
@ -178,8 +178,11 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C
|
||||
}
|
||||
}
|
||||
|
||||
// growing segments's stats should always be loaded, for generating merged pk bf.
|
||||
loadSegmentStats("growing", unflushed)
|
||||
loadSegmentStats("sealed", flushed)
|
||||
if !paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
|
||||
loadSegmentStats("sealed", flushed)
|
||||
}
|
||||
|
||||
// use fetched segment info
|
||||
info.Vchan.FlushedSegments = flushed
|
||||
@ -345,13 +348,22 @@ func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *da
|
||||
// NOTE: compactiable for event manager
|
||||
func newDataSyncService(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) {
|
||||
// recover segment checkpoints
|
||||
unflushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var (
|
||||
err error
|
||||
unflushedSegmentInfos []*datapb.SegmentInfo
|
||||
flushedSegmentInfos []*datapb.SegmentInfo
|
||||
)
|
||||
if len(info.GetVchan().GetUnflushedSegmentIds()) > 0 {
|
||||
unflushedSegmentInfos, err = node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
flushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if len(info.GetVchan().GetFlushedSegmentIds()) > 0 {
|
||||
flushedSegmentInfos, err = node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var storageCache *metacache.StorageV2Cache
|
||||
|
||||
@ -135,6 +135,17 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs
|
||||
})
|
||||
}
|
||||
|
||||
func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
|
||||
for _, msg := range deleteMsgs {
|
||||
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
|
||||
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
|
||||
pkTss := msg.GetTimestamps()
|
||||
if len(pks) > 0 {
|
||||
wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
|
||||
wb.mut.Lock()
|
||||
defer wb.mut.Unlock()
|
||||
@ -152,9 +163,15 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
||||
}
|
||||
}
|
||||
|
||||
// distribute delete msg
|
||||
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
||||
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
|
||||
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
|
||||
// In Skip BF mode, datanode no longer maintains bloom filters.
|
||||
// So, here we skip filtering delete entries.
|
||||
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
|
||||
} else {
|
||||
// distribute delete msg
|
||||
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
||||
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
|
||||
}
|
||||
|
||||
// update pk oracle
|
||||
for _, inData := range groups {
|
||||
|
||||
@ -4317,7 +4317,8 @@ Setting this parameter too small causes the system to store a small amount of da
|
||||
Key: "dataNode.skip.BFStats.Load",
|
||||
Version: "2.2.5",
|
||||
PanicIfEmpty: false,
|
||||
DefaultValue: "false",
|
||||
DefaultValue: "true",
|
||||
Forbidden: true, // The SkipBFStatsLoad is a static config that not allow dynamic refresh.
|
||||
}
|
||||
p.SkipBFStatsLoad.Init(base.mgr)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user