congqixia 277849a915
enhance: separate serializer logic from sync task (#29413)
See also #27675

Since serialization segment buffer does not related to sync manager can
shall be done before submit into sync manager. So that the pk statistic
file could be more accurate and reduce complex logic inside sync
manager.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2023-12-26 10:40:47 +08:00

454 lines
15 KiB
Go

package writebuffer
import (
"context"
"fmt"
"sync"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
nonFlushTS uint64 = 0
)
// WriteBuffer is the interface for channel write buffer.
// It provides abstraction for channel write buffer and pk bloom filter & L0 delta logic.
type WriteBuffer interface {
// HasSegment checks whether certain segment exists in this buffer.
HasSegment(segmentID int64) bool
// BufferData is the method to buffer dml data msgs.
BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// FlushTimestamp set flush timestamp for write buffer
SetFlushTimestamp(flushTs uint64)
// GetFlushTimestamp get current flush timestamp
GetFlushTimestamp() uint64
// FlushSegments is the method to perform `Sync` operation with provided options.
FlushSegments(ctx context.Context, segmentIDs []int64) error
// GetCheckpoint returns current channel checkpoint.
// If there are any non-empty segment buffer, returns the earliest buffer start position.
// Otherwise, returns latest buffered checkpoint.
GetCheckpoint() *msgpb.MsgPosition
// Close is the method to close and sink current buffer data.
Close(drop bool)
}
func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption(metacache)
for _, opt := range opts {
opt(option)
}
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
}
// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex
collectionID int64
channelName string
metaWriter syncmgr.MetaWriter
collSchema *schemapb.CollectionSchema
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
broker broker.Broker
serializer syncmgr.Serializer
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64
storagev2Cache *metacache.StorageV2Cache
}
func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) {
flushTs := atomic.NewUint64(nonFlushTS)
flushTsPolicy := GetFlushTsPolicy(flushTs, metacache)
option.syncPolicies = append(option.syncPolicies, flushTsPolicy)
var serializer syncmgr.Serializer
var err error
if params.Params.CommonCfg.EnableStorageV2.GetAsBool() {
serializer, err = syncmgr.NewStorageV2Serializer(
storageV2Cache,
metacache,
option.metaWriter,
)
} else {
serializer, err = syncmgr.NewStorageSerializer(
metacache,
option.metaWriter,
)
}
if err != nil {
return nil, err
}
return &writeBufferBase{
channelName: channel,
collectionID: metacache.Collection(),
collSchema: metacache.Schema(),
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
storagev2Cache: storageV2Cache,
}, nil
}
func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
wb.mut.RLock()
defer wb.mut.RUnlock()
_, ok := wb.buffers[segmentID]
return ok
}
func (wb *writeBufferBase) FlushSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.RLock()
defer wb.mut.RUnlock()
return wb.flushSegments(ctx, segmentIDs)
}
func (wb *writeBufferBase) SetFlushTimestamp(flushTs uint64) {
wb.flushTimestamp.Store(flushTs)
}
func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
return wb.flushTimestamp.Load()
}
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := log.Ctx(context.Background()).
With(zap.String("channel", wb.channelName)).
WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
wb.mut.RLock()
defer wb.mut.RUnlock()
// syncCandidate from sync manager
syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName)
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
}
var bufferCandidate *checkpointCandidate
candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
if len(candidates) > 0 {
bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
})
}
var checkpoint *msgpb.MsgPosition
var segmentID int64
var cpSource string
switch {
case bufferCandidate == nil && syncCandidate == nil:
// all buffer are empty
log.RatedInfo(60, "checkpoint from latest consumed msg")
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
case syncCandidate == nil && bufferCandidate != nil:
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp():
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp():
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
}
log.RatedInfo(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
return checkpoint
}
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp())
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
wb.syncSegments(context.Background(), segmentsToSync)
}
return segmentsToSync
}
func (wb *writeBufferBase) cleanupCompactedSegments() {
segmentIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithCompacted(), metacache.WithNoSyncingTask())
// remove compacted only when there is no writebuffer
targetIDs := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
_, ok := wb.buffers[segmentID]
return !ok
})
if len(targetIDs) == 0 {
return
}
removed := wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(targetIDs...))
if len(removed) > 0 {
log.Info("remove compacted segments", zap.Int64s("removed", removed))
}
}
func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error {
// mark segment flushing if segment was growing
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
metacache.WithSegmentIDs(segmentIDs...),
metacache.WithSegmentState(commonpb.SegmentState_Growing))
// mark segment flushing if segment was importing
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
metacache.WithSegmentIDs(segmentIDs...),
metacache.WithImporting())
return nil
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) {
for _, segmentID := range segmentIDs {
syncTask, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
// TODO check err type
// segment info not found
log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
continue
}
// discard Future here, handle error in callback
_ = wb.syncMgr.SyncData(ctx, syncTask)
}
}
// getSegmentsToSync applies all policies to get segments list to sync.
// **NOTE** shall be invoked within mutex protection
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range wb.syncPolicies {
result := policy.SelectSegments(buffers, ts)
if len(result) > 0 {
log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason()))
segments.Insert(result...)
}
}
return segments.Collect()
}
func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer {
buffer, ok := wb.buffers[segmentID]
if !ok {
var err error
buffer, err = newSegmentBuffer(segmentID, wb.collSchema)
if err != nil {
// TODO avoid panic here
panic(err)
}
wb.buffers[segmentID] = buffer
}
return buffer
}
func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) {
buffer, ok := wb.buffers[segmentID]
if !ok {
return nil, nil, nil, nil
}
// remove buffer and move it to sync manager
delete(wb.buffers, segmentID)
start := buffer.EarliestPosition()
timeRange := buffer.GetTimeRange()
insert, delta := buffer.Yield()
return insert, delta, timeRange, start
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) (map[int64][]storage.FieldData, error) {
insertGroups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.GetSegmentID() })
segmentPKData := make(map[int64][]storage.FieldData)
segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() })
for segmentID, msgs := range insertGroups {
_, ok := wb.metaCache.GetSegmentByID(segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
PartitionID: segmentPartition[segmentID],
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false))
}
segBuf := wb.getOrCreateBuffer(segmentID)
pkData, err := segBuf.insertBuffer.Buffer(msgs, startPos, endPos)
if err != nil {
log.Warn("failed to buffer insert data", zap.Int64("segmentID", segmentID), zap.Error(err))
return nil, err
}
segmentPKData[segmentID] = pkData
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(segmentID))
totalSize := lo.SumBy(pkData, func(iData storage.FieldData) float64 {
return float64(iData.GetMemorySize())
})
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(totalSize)
}
return segmentPKData, nil
}
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error {
segBuf := wb.getOrCreateBuffer(segmentID)
bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize))
return nil
}
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segmentID),
)
segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) // wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
if !ok {
log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID))
return nil, merr.WrapErrSegmentNotFound(segmentID)
}
var batchSize int64
var totalMemSize float64
var tsFrom, tsTo uint64
insert, delta, timeRange, startPos := wb.yieldBuffer(segmentID)
if timeRange != nil {
tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax
}
actions := []metacache.SegmentAction{}
if insert != nil {
batchSize = int64(insert.GetRowNum())
totalMemSize += float64(insert.GetMemorySize())
}
if delta != nil {
totalMemSize += float64(delta.Size())
}
actions = append(actions, metacache.StartSyncing(batchSize))
wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID))
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insert).
WithDeleteData(delta).
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithChannelName(wb.channelName).
WithSegmentID(segmentID).
WithStartPosition(startPos).
WithTimeRange(tsFrom, tsTo).
WithLevel(segmentInfo.Level()).
WithCheckpoint(wb.checkpoint).
WithBatchSize(batchSize)
if segmentInfo.State() == commonpb.SegmentState_Flushing {
pack.WithFlush()
}
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return wb.serializer.EncodeBuffer(ctx, pack)
}
func (wb *writeBufferBase) Close(drop bool) {
// sink all data and call Drop for meta writer
wb.mut.Lock()
defer wb.mut.Unlock()
if !drop {
return
}
var futures []*conc.Future[error]
for id := range wb.buffers {
syncTask, err := wb.getSyncTask(context.Background(), id)
if err != nil {
// TODO
continue
}
switch t := syncTask.(type) {
case *syncmgr.SyncTask:
t.WithDrop()
case *syncmgr.SyncTaskV2:
t.WithDrop()
}
f := wb.syncMgr.SyncData(context.Background(), syncTask)
futures = append(futures, f)
}
err := conc.AwaitAll(futures...)
if err != nil {
log.Error("failed to sink write buffer data", zap.String("channel", wb.channelName), zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
err = wb.metaWriter.DropChannel(wb.channelName)
if err != nil {
log.Error("failed to drop channel", zap.String("channel", wb.channelName), zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
}