diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index bdf5b3d5ff..d97bd4e6fc 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -366,11 +366,12 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut } } - // init metaCache meta - metaCache, err := initMetaCache(initCtx, pipelineParams.ChunkManager, info, nil, unflushedSegmentInfos, flushedSegmentInfos) - if err != nil { - return nil, err - } + // In streaming service mode, flushed segments no longer maintain a bloom filter. + // So, here we skip loading the bloom filter for flushed segments. + info.Vchan.UnflushedSegments = unflushedSegmentInfos + metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { + return pkoracle.NewBloomFilterSet() + }) return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input) } diff --git a/internal/flushcommon/syncmgr/key_lock_dispatcher.go b/internal/flushcommon/syncmgr/key_lock_dispatcher.go index 42ba32f12f..9dbeead8fe 100644 --- a/internal/flushcommon/syncmgr/key_lock_dispatcher.go +++ b/internal/flushcommon/syncmgr/key_lock_dispatcher.go @@ -8,7 +8,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/lock" ) -//go:generate mockery --name=Task --structname=MockTask --output=./ --filename=mock_task.go --with-expecter --inpackage type Task interface { SegmentID() int64 Checkpoint() *msgpb.MsgPosition @@ -16,6 +15,7 @@ type Task interface { ChannelName() string Run(context.Context) error HandleError(error) + IsFlush() bool } type keyLockDispatcher[K comparable] struct { diff --git a/internal/flushcommon/syncmgr/mock_task.go b/internal/flushcommon/syncmgr/mock_task.go index 6087e5ba3e..01a80a59c5 100644 --- a/internal/flushcommon/syncmgr/mock_task.go +++ b/internal/flushcommon/syncmgr/mock_task.go @@ -139,6 +139,47 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han return _c } +// IsFlush provides a mock function with given fields: +func (_m *MockTask) IsFlush() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTask_IsFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsFlush' +type MockTask_IsFlush_Call struct { + *mock.Call +} + +// IsFlush is a helper method to define mock.On call +func (_e *MockTask_Expecter) IsFlush() *MockTask_IsFlush_Call { + return &MockTask_IsFlush_Call{Call: _e.mock.On("IsFlush")} +} + +func (_c *MockTask_IsFlush_Call) Run(run func()) *MockTask_IsFlush_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_IsFlush_Call) Return(_a0 bool) *MockTask_IsFlush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_IsFlush_Call) RunAndReturn(run func() bool) *MockTask_IsFlush_Call { + _c.Call.Return(run) + return _c +} + // Run provides a mock function with given fields: _a0 func (_m *MockTask) Run(_a0 context.Context) error { ret := _m.Called(_a0) diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index ed63b3586c..e8dd6054ef 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -344,6 +344,10 @@ func (t *SyncTask) ChannelName() string { return t.channelName } +func (t *SyncTask) IsFlush() bool { + return t.isFlush +} + func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog } diff --git a/internal/flushcommon/writebuffer/l0_write_buffer.go b/internal/flushcommon/writebuffer/l0_write_buffer.go index 60e2b634c8..45adda5f50 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" @@ -139,6 +140,17 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs }) } +func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { + for _, msg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos) + pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) + pkTss := msg.GetTimestamps() + if len(pks) > 0 { + wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos) + } + } +} + func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { wb.mut.Lock() defer wb.mut.Unlock() @@ -156,9 +168,15 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg } } - // distribute delete msg - // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data - wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + if streamingutil.IsStreamingServiceEnabled() { + // In streaming service mode, flushed segments no longer maintain a bloom filter. + // So, here we skip filtering delete entries by bf. + wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) + } else { + // distribute delete msg + // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data + wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + } // update pk oracle for _, inData := range groups { diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 5554e84a33..e94e6a09ed 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -345,6 +346,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) if syncTask.StartPosition() != nil { wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp()) } + + if streamingutil.IsStreamingServiceEnabled() && syncTask.IsFlush() { + wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID())) + log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName())) + } return nil })) }