yihao.dai 7ce876a072
fix: Decoupling importing segment from flush process (#30402)
This pr decoups importing segment from flush process by:
1. Exclude the importing segment from the flush policy, this approch
avoids notifying the datanode to flush the importing segment, which may
not exist.
2. When RootCoord call Flush, DataCoord directly set the importing
segment state to `Flushed`.

issue: https://github.com/milvus-io/milvus/issues/30359

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2024-02-03 13:01:12 +08:00

595 lines
19 KiB
Go

package writebuffer
import (
"context"
"fmt"
"sync"
"github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
nonFlushTS uint64 = 0
)
// WriteBuffer is the interface for channel write buffer.
// It provides abstraction for channel write buffer and pk bloom filter & L0 delta logic.
type WriteBuffer interface {
// HasSegment checks whether certain segment exists in this buffer.
HasSegment(segmentID int64) bool
// BufferData is the method to buffer dml data msgs.
BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// FlushTimestamp set flush timestamp for write buffer
SetFlushTimestamp(flushTs uint64)
// GetFlushTimestamp get current flush timestamp
GetFlushTimestamp() uint64
// SealSegments is the method to perform `Sync` operation with provided options.
SealSegments(ctx context.Context, segmentIDs []int64) error
// GetCheckpoint returns current channel checkpoint.
// If there are any non-empty segment buffer, returns the earliest buffer start position.
// Otherwise, returns latest buffered checkpoint.
GetCheckpoint() *msgpb.MsgPosition
// MemorySize returns the size in bytes currently used by this write buffer.
MemorySize() int64
// EvictBuffer evicts buffer to sync manager which match provided sync policies.
EvictBuffer(policies ...SyncPolicy)
// Close is the method to close and sink current buffer data.
Close(drop bool)
}
func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption(metacache)
for _, opt := range opts {
opt(option)
}
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
}
// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex
collectionID int64
channelName string
metaWriter syncmgr.MetaWriter
collSchema *schemapb.CollectionSchema
estSizePerRecord int
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
broker broker.Broker
serializer syncmgr.Serializer
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64
storagev2Cache *metacache.StorageV2Cache
}
func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) {
flushTs := atomic.NewUint64(nonFlushTS)
flushTsPolicy := GetFlushTsPolicy(flushTs, metacache)
option.syncPolicies = append(option.syncPolicies, flushTsPolicy)
var serializer syncmgr.Serializer
var err error
if params.Params.CommonCfg.EnableStorageV2.GetAsBool() {
serializer, err = syncmgr.NewStorageV2Serializer(
storageV2Cache,
metacache,
option.metaWriter,
)
} else {
serializer, err = syncmgr.NewStorageSerializer(
metacache,
option.metaWriter,
)
}
if err != nil {
return nil, err
}
schema := metacache.Schema()
estSize, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
return nil, err
}
return &writeBufferBase{
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
storagev2Cache: storageV2Cache,
}, nil
}
func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
wb.mut.RLock()
defer wb.mut.RUnlock()
_, ok := wb.buffers[segmentID]
return ok
}
func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.RLock()
defer wb.mut.RUnlock()
return wb.sealSegments(ctx, segmentIDs)
}
func (wb *writeBufferBase) SetFlushTimestamp(flushTs uint64) {
wb.flushTimestamp.Store(flushTs)
}
func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
return wb.flushTimestamp.Load()
}
func (wb *writeBufferBase) MemorySize() int64 {
wb.mut.RLock()
defer wb.mut.RUnlock()
var size int64
for _, segBuf := range wb.buffers {
size += segBuf.MemorySize()
}
return size
}
func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
wb.mut.Lock()
defer wb.mut.Unlock()
log := log.Ctx(context.Background()).With(
zap.Int64("collectionID", wb.collectionID),
zap.String("channel", wb.channelName),
)
// need valid checkpoint before triggering syncing
if wb.checkpoint == nil {
log.Warn("evict buffer before buffering data")
return
}
ts := wb.checkpoint.GetTimestamp()
segmentIDs := wb.getSegmentsToSync(ts, policies...)
if len(segmentIDs) > 0 {
log.Info("evict buffer find segments to sync", zap.Int64s("segmentIDs", segmentIDs))
wb.syncSegments(context.Background(), segmentIDs)
}
}
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := log.Ctx(context.Background()).
With(zap.String("channel", wb.channelName)).
WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
wb.mut.RLock()
defer wb.mut.RUnlock()
// syncCandidate from sync manager
syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName)
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
}
var bufferCandidate *checkpointCandidate
candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
if len(candidates) > 0 {
bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
})
}
var checkpoint *msgpb.MsgPosition
var segmentID int64
var cpSource string
switch {
case bufferCandidate == nil && syncCandidate == nil:
// all buffer are empty
log.RatedInfo(60, "checkpoint from latest consumed msg")
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
case syncCandidate == nil && bufferCandidate != nil:
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp():
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp():
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
}
log.RatedInfo(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
return checkpoint
}
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp(), wb.syncPolicies...)
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
wb.syncSegments(context.Background(), segmentsToSync)
}
return segmentsToSync
}
func (wb *writeBufferBase) cleanupCompactedSegments() {
segmentIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithCompacted(), metacache.WithNoSyncingTask())
// remove compacted only when there is no writebuffer
targetIDs := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
_, ok := wb.buffers[segmentID]
return !ok
})
if len(targetIDs) == 0 {
return
}
removed := wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(targetIDs...))
if len(removed) > 0 {
log.Info("remove compacted segments", zap.Int64s("removed", removed))
}
}
func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64) error {
// mark segment flushing if segment was growing
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed),
metacache.WithSegmentIDs(segmentIDs...),
metacache.WithSegmentState(commonpb.SegmentState_Growing))
return nil
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) {
log := log.Ctx(ctx)
for _, segmentID := range segmentIDs {
syncTask, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
if errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
continue
} else {
log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err))
}
}
// discard Future here, handle error in callback
_ = wb.syncMgr.SyncData(ctx, syncTask)
}
}
// getSegmentsToSync applies all policies to get segments list to sync.
// **NOTE** shall be invoked within mutex protection
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp, policies ...SyncPolicy) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range policies {
result := policy.SelectSegments(buffers, ts)
if len(result) > 0 {
log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason()))
segments.Insert(result...)
}
}
return segments.Collect()
}
func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer {
buffer, ok := wb.buffers[segmentID]
if !ok {
var err error
buffer, err = newSegmentBuffer(segmentID, wb.collSchema)
if err != nil {
// TODO avoid panic here
panic(err)
}
wb.buffers[segmentID] = buffer
}
return buffer
}
func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) {
buffer, ok := wb.buffers[segmentID]
if !ok {
return nil, nil, nil, nil
}
// remove buffer and move it to sync manager
delete(wb.buffers, segmentID)
start := buffer.EarliestPosition()
timeRange := buffer.GetTimeRange()
insert, delta := buffer.Yield()
return insert, delta, timeRange, start
}
type inData struct {
segmentID int64
partitionID int64
data []*storage.InsertData
pkField []storage.FieldData
tsField []*storage.Int64FieldData
rowNum int64
batchBF *storage.PkStatistics
}
func (id *inData) generatePkStats() {
id.batchBF = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
for _, ids := range id.pkField {
id.batchBF.UpdatePKRange(ids)
}
}
func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
if !id.batchBF.PkExist(pk) {
return false
}
for batchIdx, timestamps := range id.tsField {
ids := id.pkField[batchIdx]
var primaryKey storage.PrimaryKey
switch pk.Type() {
case schemapb.DataType_Int64:
primaryKey = storage.NewInt64PrimaryKey(0)
case schemapb.DataType_VarChar:
primaryKey = storage.NewVarCharPrimaryKey("")
}
for idx := 0; idx < timestamps.RowNum(); idx++ {
timestamp := timestamps.GetRow(idx).(int64)
if int64(ts) <= timestamp {
continue
}
primaryKey.SetValue(ids.GetRow(idx))
if pk.EQ(primaryKey) {
return true
}
}
}
return false
}
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
// also returns primary key field data
func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*inData, error) {
groups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.SegmentID })
segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() })
result := make([]*inData, 0, len(groups))
for segment, msgs := range groups {
inData := &inData{
segmentID: segment,
partitionID: segmentPartition[segment],
data: make([]*storage.InsertData, 0, len(msgs)),
pkField: make([]storage.FieldData, 0, len(msgs)),
}
for _, msg := range msgs {
data, err := storage.InsertMsgToInsertData(msg, wb.collSchema)
if err != nil {
log.Warn("failed to transfer insert msg to insert data", zap.Error(err))
return nil, err
}
pkFieldData, err := storage.GetPkFromInsertData(wb.collSchema, data)
if err != nil {
return nil, err
}
if pkFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("pk column row num not match")
}
tsFieldData, err := storage.GetTimestampFromInsertData(data)
if err != nil {
return nil, err
}
if tsFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("timestamp column row num not match")
}
inData.data = append(inData.data, data)
inData.pkField = append(inData.pkField, pkFieldData)
inData.tsField = append(inData.tsField, tsFieldData)
inData.rowNum += int64(data.GetRowNum())
}
inData.generatePkStats()
result = append(result, inData)
}
return result, nil
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb.MsgPosition) error {
_, ok := wb.metaCache.GetSegmentByID(inData.segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: inData.segmentID,
PartitionID: inData.partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.SetStartPosRecorded(false))
}
segBuf := wb.getOrCreateBuffer(inData.segmentID)
totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos)
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(inData.segmentID))
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize))
return nil
}
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) {
segBuf := wb.getOrCreateBuffer(segmentID)
bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize))
}
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segmentID),
)
segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) // wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
if !ok {
log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID))
return nil, merr.WrapErrSegmentNotFound(segmentID)
}
var batchSize int64
var totalMemSize float64 = 0
var tsFrom, tsTo uint64
insert, delta, timeRange, startPos := wb.yieldBuffer(segmentID)
if timeRange != nil {
tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax
}
actions := []metacache.SegmentAction{}
if insert != nil {
batchSize = int64(insert.GetRowNum())
totalMemSize += float64(insert.GetMemorySize())
}
if delta != nil {
totalMemSize += float64(delta.Size())
}
actions = append(actions, metacache.StartSyncing(batchSize))
wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID))
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insert).
WithDeleteData(delta).
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithChannelName(wb.channelName).
WithSegmentID(segmentID).
WithStartPosition(startPos).
WithTimeRange(tsFrom, tsTo).
WithLevel(segmentInfo.Level()).
WithCheckpoint(wb.checkpoint).
WithBatchSize(batchSize)
if segmentInfo.State() == commonpb.SegmentState_Flushing ||
segmentInfo.Level() == datapb.SegmentLevel_L0 { // Level zero segment will always be sync as flushed
pack.WithFlush()
}
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return wb.serializer.EncodeBuffer(ctx, pack)
}
// getEstBatchSize returns the batch size based on estimated size per record and FlushBufferSize configuration value.
func (wb *writeBufferBase) getEstBatchSize() uint {
sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
return uint(sizeLimit / int64(wb.estSizePerRecord))
}
func (wb *writeBufferBase) Close(drop bool) {
// sink all data and call Drop for meta writer
wb.mut.Lock()
defer wb.mut.Unlock()
if !drop {
return
}
var futures []*conc.Future[error]
for id := range wb.buffers {
syncTask, err := wb.getSyncTask(context.Background(), id)
if err != nil {
// TODO
continue
}
switch t := syncTask.(type) {
case *syncmgr.SyncTask:
t.WithDrop()
case *syncmgr.SyncTaskV2:
t.WithDrop()
}
f := wb.syncMgr.SyncData(context.Background(), syncTask)
futures = append(futures, f)
}
err := conc.AwaitAll(futures...)
if err != nil {
log.Error("failed to sink write buffer data", zap.String("channel", wb.channelName), zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
err = wb.metaWriter.DropChannel(wb.channelName)
if err != nil {
log.Error("failed to drop channel", zap.String("channel", wb.channelName), zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
}