mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: add proportion for capacity seal policy in streaming flusher (#36761)
issue: #36760 --------- Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
00e7e8c661
commit
8905b042f1
@ -221,6 +221,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
|
||||
func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
|
||||
info *datapb.ChannelWatchInfo, metacache metacache.MetaCache,
|
||||
unflushed, flushed []*datapb.SegmentInfo, input <-chan *msgstream.MsgPack,
|
||||
wbTaskObserverCallback writebuffer.TaskObserverCallback,
|
||||
) (*DataSyncService, error) {
|
||||
var (
|
||||
channelName = info.GetVchan().GetChannelName()
|
||||
@ -316,7 +317,8 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
|
||||
// if fail to init flowgraph nodes.
|
||||
err = params.WriteBufferManager.Register(channelName, metacache,
|
||||
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
|
||||
writebuffer.WithIDAllocator(params.Allocator))
|
||||
writebuffer.WithIDAllocator(params.Allocator),
|
||||
writebuffer.WithTaskObserverCallback(wbTaskObserverCallback))
|
||||
if err != nil {
|
||||
log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err))
|
||||
return nil, err
|
||||
@ -353,10 +355,16 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
|
||||
if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil, nil)
|
||||
}
|
||||
|
||||
func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, input <-chan *msgstream.MsgPack) (*DataSyncService, error) {
|
||||
func NewStreamingNodeDataSyncService(
|
||||
initCtx context.Context,
|
||||
pipelineParams *util.PipelineParams,
|
||||
info *datapb.ChannelWatchInfo,
|
||||
input <-chan *msgstream.MsgPack,
|
||||
wbTaskObserverCallback writebuffer.TaskObserverCallback,
|
||||
) (*DataSyncService, error) {
|
||||
// recover segment checkpoints
|
||||
var (
|
||||
err error
|
||||
@ -381,7 +389,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
|
||||
if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input, wbTaskObserverCallback)
|
||||
}
|
||||
|
||||
func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService {
|
||||
|
||||
@ -19,14 +19,17 @@ const (
|
||||
|
||||
type WriteBufferOption func(opt *writeBufferOption)
|
||||
|
||||
type TaskObserverCallback func(t syncmgr.Task, err error)
|
||||
|
||||
type writeBufferOption struct {
|
||||
deletePolicy string
|
||||
idAllocator allocator.Interface
|
||||
syncPolicies []SyncPolicy
|
||||
|
||||
pkStatsFactory metacache.PkStatsFactory
|
||||
metaWriter syncmgr.MetaWriter
|
||||
errorHandler func(error)
|
||||
pkStatsFactory metacache.PkStatsFactory
|
||||
metaWriter syncmgr.MetaWriter
|
||||
errorHandler func(error)
|
||||
taskObserverCallback TaskObserverCallback
|
||||
}
|
||||
|
||||
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
|
||||
@ -85,3 +88,11 @@ func WithErrorHandler(handler func(err error)) WriteBufferOption {
|
||||
opt.errorHandler = handler
|
||||
}
|
||||
}
|
||||
|
||||
// WithTaskObserverCallback sets the callback function for observing task status.
|
||||
// The callback will be called when every task is executed, should be concurrent safe to be called.
|
||||
func WithTaskObserverCallback(callback TaskObserverCallback) WriteBufferOption {
|
||||
return func(opt *writeBufferOption) {
|
||||
opt.taskObserverCallback = callback
|
||||
}
|
||||
}
|
||||
|
||||
@ -151,7 +151,8 @@ type writeBufferBase struct {
|
||||
checkpoint *msgpb.MsgPosition
|
||||
flushTimestamp *atomic.Uint64
|
||||
|
||||
errHandler func(err error)
|
||||
errHandler func(err error)
|
||||
taskObserverCallback func(t syncmgr.Task, err error) // execute when a sync task finished, should be concurrent safe.
|
||||
|
||||
// pre build logger
|
||||
logger *log.MLogger
|
||||
@ -181,19 +182,20 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s
|
||||
}
|
||||
|
||||
wb := &writeBufferBase{
|
||||
channelName: channel,
|
||||
collectionID: metacache.Collection(),
|
||||
collSchema: schema,
|
||||
estSizePerRecord: estSize,
|
||||
syncMgr: syncMgr,
|
||||
metaWriter: option.metaWriter,
|
||||
buffers: make(map[int64]*segmentBuffer),
|
||||
metaCache: metacache,
|
||||
serializer: serializer,
|
||||
syncCheckpoint: newCheckpointCandiates(),
|
||||
syncPolicies: option.syncPolicies,
|
||||
flushTimestamp: flushTs,
|
||||
errHandler: option.errorHandler,
|
||||
channelName: channel,
|
||||
collectionID: metacache.Collection(),
|
||||
collSchema: schema,
|
||||
estSizePerRecord: estSize,
|
||||
syncMgr: syncMgr,
|
||||
metaWriter: option.metaWriter,
|
||||
buffers: make(map[int64]*segmentBuffer),
|
||||
metaCache: metacache,
|
||||
serializer: serializer,
|
||||
syncCheckpoint: newCheckpointCandiates(),
|
||||
syncPolicies: option.syncPolicies,
|
||||
flushTimestamp: flushTs,
|
||||
errHandler: option.errorHandler,
|
||||
taskObserverCallback: option.taskObserverCallback,
|
||||
}
|
||||
|
||||
wb.logger = log.With(zap.Int64("collectionID", wb.collectionID),
|
||||
@ -342,6 +344,10 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
|
||||
}
|
||||
|
||||
result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
|
||||
if wb.taskObserverCallback != nil {
|
||||
wb.taskObserverCallback(syncTask, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -654,6 +660,10 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
|
||||
}
|
||||
|
||||
f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
|
||||
if wb.taskObserverCallback != nil {
|
||||
wb.taskObserverCallback(syncTask, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -26,10 +26,12 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
|
||||
adaptor2 "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/options"
|
||||
@ -116,7 +118,17 @@ func (c *channelLifetime) Run() error {
|
||||
|
||||
// Build and add pipeline.
|
||||
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
|
||||
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan())
|
||||
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
|
||||
if err != nil || t == nil {
|
||||
return
|
||||
}
|
||||
if tt, ok := t.(*syncmgr.SyncTask); ok {
|
||||
insertLogs, _, _ := tt.Binlogs()
|
||||
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(),
|
||||
stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))},
|
||||
)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -135,8 +135,8 @@ func TestSegmentAllocManager(t *testing.T) {
|
||||
assert.True(t, m.IsNoWaitSeal())
|
||||
|
||||
// Try to seal with a policy
|
||||
resource.Resource().SegmentAssignStatsManager().UpdateOnFlush(6000, stats.FlushOperationMetrics{
|
||||
BinLogCounter: 100,
|
||||
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(6000, stats.SyncOperationMetrics{
|
||||
BinLogCounterIncr: 100,
|
||||
})
|
||||
// ask a unacknowledgement seal for partition 3 to avoid seal operation.
|
||||
result, err = m.AssignSegment(ctx, &AssignSegmentRequest{
|
||||
@ -266,6 +266,7 @@ func initializeTestState(t *testing.T) {
|
||||
// s 6000g
|
||||
|
||||
paramtable.Init()
|
||||
paramtable.Get().DataCoordCfg.SegmentSealProportion.SwapTempValue("1.0")
|
||||
paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.SwapTempValue("0.0")
|
||||
paramtable.Get().DataCoordCfg.SegmentMaxSize.SwapTempValue("1")
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@ type SegmentLimitationPolicy interface {
|
||||
type jitterSegmentLimitationPolicyExtraInfo struct {
|
||||
Jitter float64
|
||||
JitterRatio float64
|
||||
Proportion float64
|
||||
MaxSegmentSize uint64
|
||||
}
|
||||
|
||||
@ -46,13 +47,15 @@ func (p jitterSegmentLimitationPolicy) GenerateLimitation() SegmentLimitation {
|
||||
jitterRatio = 1
|
||||
}
|
||||
maxSegmentSize := uint64(paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024)
|
||||
segmentSize := uint64(jitterRatio * float64(maxSegmentSize))
|
||||
proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat()
|
||||
segmentSize := uint64(jitterRatio * float64(maxSegmentSize) * proportion)
|
||||
return SegmentLimitation{
|
||||
PolicyName: "jitter_segment_limitation",
|
||||
SegmentSize: segmentSize,
|
||||
ExtraInfo: jitterSegmentLimitationPolicyExtraInfo{
|
||||
Jitter: jitter,
|
||||
JitterRatio: jitterRatio,
|
||||
Proportion: proportion,
|
||||
MaxSegmentSize: maxSegmentSize,
|
||||
},
|
||||
}
|
||||
|
||||
@ -48,9 +48,9 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS
|
||||
}
|
||||
}
|
||||
|
||||
// FlushOperationMetrics is the metrics of flush operation.
|
||||
type FlushOperationMetrics struct {
|
||||
BinLogCounter uint64
|
||||
// SyncOperationMetrics is the metrics of sync operation.
|
||||
type SyncOperationMetrics struct {
|
||||
BinLogCounterIncr uint64 // the counter increment of bin log.
|
||||
}
|
||||
|
||||
// AllocRows alloc space of rows on current segment.
|
||||
@ -71,9 +71,9 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
|
||||
return s.MaxBinarySize - s.Insert.BinarySize
|
||||
}
|
||||
|
||||
// UpdateOnFlush updates the stats of segment on flush.
|
||||
func (s *SegmentStats) UpdateOnFlush(f FlushOperationMetrics) {
|
||||
s.BinLogCounter = f.BinLogCounter
|
||||
// UpdateOnSync updates the stats of segment on sync.
|
||||
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
|
||||
s.BinLogCounter += f.BinLogCounterIncr
|
||||
}
|
||||
|
||||
// Copy copies the segment stats.
|
||||
|
||||
@ -112,9 +112,9 @@ func (m *StatsManager) GetStatsOfSegment(segmentID int64) *SegmentStats {
|
||||
return m.segmentStats[segmentID].Copy()
|
||||
}
|
||||
|
||||
// UpdateOnFlush updates the stats of segment on flush.
|
||||
// UpdateOnSync updates the stats of segment on sync.
|
||||
// It's an async update operation, so it's not necessary to do success.
|
||||
func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetrics) {
|
||||
func (m *StatsManager) UpdateOnSync(segmentID int64, syncMetric SyncOperationMetrics) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@ -122,7 +122,7 @@ func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetric
|
||||
if _, ok := m.segmentIndex[segmentID]; !ok {
|
||||
return
|
||||
}
|
||||
m.segmentStats[segmentID].UpdateOnFlush(flush)
|
||||
m.segmentStats[segmentID].UpdateOnSync(syncMetric)
|
||||
|
||||
// binlog counter is updated, notify seal manager to do seal scanning.
|
||||
m.sealNotifier.AddAndNotify(m.segmentIndex[segmentID])
|
||||
|
||||
@ -60,11 +60,11 @@ func TestStatsManager(t *testing.T) {
|
||||
assert.Equal(t, uint64(350), m.pchannelStats["pchannel"].BinarySize)
|
||||
assert.Equal(t, uint64(250), m.pchannelStats["pchannel2"].BinarySize)
|
||||
|
||||
m.UpdateOnFlush(3, FlushOperationMetrics{BinLogCounter: 100})
|
||||
m.UpdateOnSync(3, SyncOperationMetrics{BinLogCounterIncr: 100})
|
||||
<-m.SealNotifier().WaitChan()
|
||||
infos = m.SealNotifier().Get()
|
||||
assert.Len(t, infos, 1)
|
||||
m.UpdateOnFlush(1000, FlushOperationMetrics{BinLogCounter: 100})
|
||||
m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100})
|
||||
shouldBlock(t, m.SealNotifier().WaitChan())
|
||||
|
||||
m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400})
|
||||
|
||||
@ -68,8 +68,8 @@ func TestSegmentStats(t *testing.T) {
|
||||
assert.Equal(t, stat.Insert.Rows, uint64(160))
|
||||
assert.Equal(t, stat.Insert.BinarySize, uint64(320))
|
||||
|
||||
stat.UpdateOnFlush(FlushOperationMetrics{
|
||||
BinLogCounter: 4,
|
||||
stat.UpdateOnSync(SyncOperationMetrics{
|
||||
BinLogCounterIncr: 4,
|
||||
})
|
||||
assert.Equal(t, uint64(4), stat.BinLogCounter)
|
||||
assert.Equal(t, uint64(7), stat.BinLogCounter)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user