mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
enhance: Remove bf from streaming node (#35902)
Remove bf from streaming node: 1. When watching vchannels, skip loading bloom filters for segments. 2. Bypass bloom filter checks for delete messages, directly writing to L0 segments. 3. Remove flushed segments proactively after flush. issue: https://github.com/milvus-io/milvus/issues/33285, https://github.com/milvus-io/milvus/issues/34585 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
325f1987d9
commit
6130a85444
@ -366,11 +366,12 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// init metaCache meta
|
// In streaming service mode, flushed segments no longer maintain a bloom filter.
|
||||||
metaCache, err := initMetaCache(initCtx, pipelineParams.ChunkManager, info, nil, unflushedSegmentInfos, flushedSegmentInfos)
|
// So, here we skip loading the bloom filter for flushed segments.
|
||||||
if err != nil {
|
info.Vchan.UnflushedSegments = unflushedSegmentInfos
|
||||||
return nil, err
|
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
|
||||||
}
|
return pkoracle.NewBloomFilterSet()
|
||||||
|
})
|
||||||
|
|
||||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
|
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate mockery --name=Task --structname=MockTask --output=./ --filename=mock_task.go --with-expecter --inpackage
|
|
||||||
type Task interface {
|
type Task interface {
|
||||||
SegmentID() int64
|
SegmentID() int64
|
||||||
Checkpoint() *msgpb.MsgPosition
|
Checkpoint() *msgpb.MsgPosition
|
||||||
@ -16,6 +15,7 @@ type Task interface {
|
|||||||
ChannelName() string
|
ChannelName() string
|
||||||
Run(context.Context) error
|
Run(context.Context) error
|
||||||
HandleError(error)
|
HandleError(error)
|
||||||
|
IsFlush() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type keyLockDispatcher[K comparable] struct {
|
type keyLockDispatcher[K comparable] struct {
|
||||||
|
|||||||
@ -139,6 +139,47 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsFlush provides a mock function with given fields:
|
||||||
|
func (_m *MockTask) IsFlush() bool {
|
||||||
|
ret := _m.Called()
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
if rf, ok := ret.Get(0).(func() bool); ok {
|
||||||
|
r0 = rf()
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockTask_IsFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsFlush'
|
||||||
|
type MockTask_IsFlush_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsFlush is a helper method to define mock.On call
|
||||||
|
func (_e *MockTask_Expecter) IsFlush() *MockTask_IsFlush_Call {
|
||||||
|
return &MockTask_IsFlush_Call{Call: _e.mock.On("IsFlush")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockTask_IsFlush_Call) Run(run func()) *MockTask_IsFlush_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run()
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockTask_IsFlush_Call) Return(_a0 bool) *MockTask_IsFlush_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockTask_IsFlush_Call) RunAndReturn(run func() bool) *MockTask_IsFlush_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Run provides a mock function with given fields: _a0
|
// Run provides a mock function with given fields: _a0
|
||||||
func (_m *MockTask) Run(_a0 context.Context) error {
|
func (_m *MockTask) Run(_a0 context.Context) error {
|
||||||
ret := _m.Called(_a0)
|
ret := _m.Called(_a0)
|
||||||
|
|||||||
@ -344,6 +344,10 @@ func (t *SyncTask) ChannelName() string {
|
|||||||
return t.channelName
|
return t.channelName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *SyncTask) IsFlush() bool {
|
||||||
|
return t.isFlush
|
||||||
|
}
|
||||||
|
|
||||||
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
|
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
|
||||||
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
|
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
@ -139,6 +140,17 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
|
||||||
|
for _, msg := range deleteMsgs {
|
||||||
|
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
|
||||||
|
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
|
||||||
|
pkTss := msg.GetTimestamps()
|
||||||
|
if len(pks) > 0 {
|
||||||
|
wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
|
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
|
||||||
wb.mut.Lock()
|
wb.mut.Lock()
|
||||||
defer wb.mut.Unlock()
|
defer wb.mut.Unlock()
|
||||||
@ -156,9 +168,15 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// distribute delete msg
|
if streamingutil.IsStreamingServiceEnabled() {
|
||||||
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
// In streaming service mode, flushed segments no longer maintain a bloom filter.
|
||||||
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
|
// So, here we skip filtering delete entries by bf.
|
||||||
|
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
|
||||||
|
} else {
|
||||||
|
// distribute delete msg
|
||||||
|
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
||||||
|
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
|
||||||
|
}
|
||||||
|
|
||||||
// update pk oracle
|
// update pk oracle
|
||||||
for _, inData := range groups {
|
for _, inData := range groups {
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
@ -345,6 +346,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
|
|||||||
if syncTask.StartPosition() != nil {
|
if syncTask.StartPosition() != nil {
|
||||||
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
|
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if streamingutil.IsStreamingServiceEnabled() && syncTask.IsFlush() {
|
||||||
|
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
|
||||||
|
log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName()))
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user