diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 9fd965f94d..180c68ab2e 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -25,7 +25,6 @@ import ( "sync" "time" - "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -129,28 +128,6 @@ func (t *compactionTask) getChannelName() string { return t.plan.GetChannel() } -func (t *compactionTask) getPlanTargetEntryNumber() int64 { - if t.plan == nil { - // if plan empty return default size - return int64(bloomFilterSize) - } - var result int64 - for _, info := range t.plan.GetSegmentBinlogs() { - for _, fieldLog := range info.GetFieldBinlogs() { - for _, binlog := range fieldLog.GetBinlogs() { - result += binlog.GetEntriesNum() - } - } - } - - // prevent bloom filter too small - if result == 0 { - log.Warn("compaction target entry number zero", zap.Int64("planID", t.getPlanID())) - return int64(bloomFilterSize) - } - return result -} - func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) { log := log.With(zap.Int64("planID", t.getPlanID())) mergeStart := time.Now() @@ -210,7 +187,7 @@ func nano2Milli(nano time.Duration) float64 { return float64(nano) / float64(time.Millisecond) } -func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, []byte, int64, error) { +func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, *Segment, int64, error) { log := log.With(zap.Int64("planID", t.getPlanID())) mergeStart := time.Now() @@ -224,7 +201,6 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam // statslog generation segment *Segment // empty segment used for bf generation pkID UniqueID - pkType schemapb.DataType iDatas = make([]*InsertData, 0) fID2Type = make(map[UniqueID]schemapb.DataType) @@ -239,19 +215,14 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam return false } - // - targetRowCount := t.getPlanTargetEntryNumber() - log.Debug("merge estimate target row count", zap.Int64("row count", targetRowCount)) - segment = &Segment{ - pkFilter: bloom.NewWithEstimates(uint(targetRowCount), maxBloomFalsePositive), - } + segment = &Segment{} + t.Replica.initSegmentBloomFilter(segment) // get dim for _, fs := range schema.GetFields() { fID2Type[fs.GetFieldID()] = fs.GetDataType() if fs.GetIsPrimaryKey() { pkID = fs.GetFieldID() - pkType = fs.GetDataType() } if fs.GetDataType() == schemapb.DataType_FloatVector || fs.GetDataType() == schemapb.DataType_BinaryVector { @@ -348,17 +319,10 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam } } - // marshal segment statslog - segStats, err := segment.getSegmentStatslog(pkID, pkType) - if err != nil { - log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err)) - return nil, nil, 0, err - } - log.Debug("merge end", zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) - return iDatas, segStats, numRows, nil + return iDatas, segment, numRows, nil } func (t *compactionTask) compact() (*datapb.CompactionResult, error) { @@ -522,12 +486,19 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return nil, err } - iDatas, segStats, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime()) + iDatas, segment, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime()) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return nil, err } + // marshal segment statslog + segStats, err := segment.getSegmentStatslog(PKfieldID, PkType) + if err != nil { + log.Warn("failed to generate segment statslog", zap.Int64("pkID", PKfieldID), zap.Error(err)) + return nil, err + } + uploadStart := time.Now() segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, segStats, deltaBuf.delData, meta) if err != nil { @@ -583,7 +554,13 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { } // no need to shorten the PK range of a segment, deleting dup PKs is valid } else { - err = t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows) + segment.collectionID = collID + segment.partitionID = partID + segment.segmentID = targetSegID + segment.channelName = t.plan.GetChannel() + segment.numRows = numRows + + err = t.mergeFlushedSegments(segment, t.plan.GetPlanID(), segIDs) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return nil, err diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 0f01738777..f361514cb8 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -24,13 +24,16 @@ import ( "time" memkv "github.com/milvus-io/milvus/internal/kv/mem" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -247,10 +250,19 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Test merge", func(t *testing.T) { + collectionID := int64(1) + meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64) + + rc := &mocks.RootCoord{} + rc.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(&milvuspb.DescribeCollectionResponse{ + Schema: meta.GetSchema(), + }, nil) + replica, err := newReplica(context.Background(), rc, nil, collectionID) + require.NoError(t, err) t.Run("Merge without expiration", func(t *testing.T) { Params.CommonCfg.EntityExpirationTTL = 0 iData := genInsertDataWithExpiredTS() - meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) iblobs, err := getInsertBlobs(100, iData, meta) require.NoError(t, err) @@ -264,13 +276,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{} - idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + ct := &compactionTask{ + Replica: replica, + } + idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) assert.Equal(t, 1, len(idata)) assert.NotEmpty(t, idata[0].Data) - assert.NotEmpty(t, segStats) + assert.NotNil(t, segment) }) t.Run("Merge without expiration2", func(t *testing.T) { Params.CommonCfg.EntityExpirationTTL = 0 @@ -292,13 +306,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) { dm := map[interface{}]Timestamp{} - ct := &compactionTask{} - idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + ct := &compactionTask{ + Replica: replica, + } + idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) assert.Equal(t, 2, len(idata)) assert.NotEmpty(t, idata[0].Data) - assert.NotEmpty(t, segStats) + assert.NotEmpty(t, segment) }) t.Run("Merge with expiration", func(t *testing.T) { @@ -317,12 +333,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{} - idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) + ct := &compactionTask{ + Replica: replica, + } + idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) assert.NoError(t, err) assert.Equal(t, int64(1), numOfRow) assert.Equal(t, 1, len(idata)) - assert.NotEmpty(t, segStats) + assert.NotEmpty(t, segment) }) t.Run("Merge with meta error", func(t *testing.T) { @@ -342,7 +360,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{} + ct := &compactionTask{ + Replica: replica, + } _, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ {Key: "dim", Value: "64"}, @@ -368,7 +388,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) { 1: 10000, } - ct := &compactionTask{} + ct := &compactionTask{ + Replica: replica, + } _, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ {Key: "dim", Value: "dim"}, @@ -595,7 +617,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { }, }, StartTime: 0, - TimeoutInSeconds: 1, + TimeoutInSeconds: 10, Type: datapb.CompactionType_MergeCompaction, Timetravel: 40000, Channel: "channelname", @@ -727,7 +749,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { }, }, StartTime: 0, - TimeoutInSeconds: 1, + TimeoutInSeconds: 10, Type: datapb.CompactionType_MergeCompaction, Timetravel: 40000, Channel: "channelname", diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index f6bfc9d43c..e78c3ff1e5 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -187,7 +187,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } err := retry.Do(ddn.ctx, func() error { return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) - }, flowGraphRetryOpt) + }, getFlowGraphRetryOpt()) if err != nil { err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vChannelName, err) log.Error(err.Error()) diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index c5ab5be22b..afd27acb15 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -320,7 +320,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) // Test - flowGraphRetryOpt = retry.Attempts(1) + setFlowGraphRetryOpt(retry.Attempts(1)) assert.Panics(t, func() { ddn.Operate([]Msg{msgStreamMsg}) }) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 050c853bf6..ab4667e51a 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -183,7 +183,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { } else { err := retry.Do(dn.ctx, func() error { return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0]) - }, flowGraphRetryOpt) + }, getFlowGraphRetryOpt()) if err != nil { err = fmt.Errorf("failed to flush delete data, err = %s", err) log.Error(err.Error()) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 4aeddc1fb8..220035aea3 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -418,7 +418,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { var fgMsg flowgraph.Msg = &msg - flowGraphRetryOpt = retry.Attempts(1) + setFlowGraphRetryOpt(retry.Attempts(1)) assert.Panics(te, func() { delNode.Operate([]flowgraph.Msg{fgMsg}) }) @@ -462,7 +462,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) var fgMsg flowgraph.Msg = &msg - flowGraphRetryOpt = retry.Attempts(1) + setFlowGraphRetryOpt(retry.Attempts(1)) assert.NotPanics(t, func() { delNode.Operate([]flowgraph.Msg{fgMsg}) }) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 5e73e7e0f4..6ffd2d4672 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -416,7 +416,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { task.flushed, task.dropped, endPositions[0]) - }, flowGraphRetryOpt) + }, getFlowGraphRetryOpt()) if err != nil { metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc() metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc() diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index d4602c22e3..dda842428e 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -235,7 +235,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) // test flushBufferData failed - flowGraphRetryOpt = retry.Attempts(1) + setFlowGraphRetryOpt(retry.Attempts(1)) inMsg = genFlowGraphInsertMsg(insertChannelName) iBNode.flushManager = &mockFlushManager{returnError: true} iBNode.insertBuffer.Store(inMsg.insertMessages[0].SegmentID, &BufferData{}) diff --git a/internal/datanode/flow_graph_node.go b/internal/datanode/flow_graph_node.go index c1649982ba..526a0cc01c 100644 --- a/internal/datanode/flow_graph_node.go +++ b/internal/datanode/flow_graph_node.go @@ -17,6 +17,8 @@ package datanode import ( + "sync/atomic" + "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/retry" ) @@ -33,3 +35,19 @@ type ( ) var flowGraphRetryOpt = retry.Attempts(5) + +var fgRetryOptVal atomic.Value + +func init() { + setFlowGraphRetryOpt(retry.Attempts(5)) +} + +// setFlowGraphRetryOpt set retry option for flowgraph +// used for tests only +func setFlowGraphRetryOpt(opt retry.Option) { + fgRetryOptVal.Store(opt) +} + +func getFlowGraphRetryOpt() retry.Option { + return fgRetryOptVal.Load().(retry.Option) +} diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index cb0e7e612f..bb96f68b4f 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -34,11 +34,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/typeutil" ) const ( - // TODO silverxia maybe need set from config - bloomFilterSize uint = 100000 maxBloomFalsePositive float64 = 0.005 ) @@ -71,7 +70,7 @@ type Replica interface { updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) updateSegmentCheckPoint(segID UniqueID) updateSegmentPKRange(segID UniqueID, ids storage.FieldData) - mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) error + mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error hasSegment(segID UniqueID, countFlushed bool) bool removeSegments(segID ...UniqueID) listCompactedSegmentIDs() map[UniqueID][]UniqueID @@ -81,6 +80,7 @@ type Replica interface { getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) segmentFlushed(segID UniqueID) getSegmentStatslog(segID UniqueID) ([]byte, error) + initSegmentBloomFilter(seg *Segment) error } // Segment is the data structure of segments in data node replica. @@ -198,6 +198,12 @@ func newReplica(ctx context.Context, rc types.RootCoord, cm storage.ChunkManager metaService: metaService, chunkManager: cm, } + // try to cache latest schema + _, err := replica.getCollectionSchema(collID, 0) + if err != nil { + log.Warn("failed to get schema when create replica", zap.Int64("collID", collID), zap.Error(err)) + return nil, err + } return replica, nil } @@ -267,24 +273,58 @@ func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (coll return 0, 0, fmt.Errorf("cannot find segment, id = %v", segID) } +// maxRowCountPerSegment returns max row count for a segment based on estimation of row size. +func (replica *SegmentReplica) maxRowCountPerSegment(ts Timestamp) (int64, error) { + log := log.With(zap.Int64("collectionID", replica.collectionID), zap.Uint64("timpstamp", ts)) + schema, err := replica.getCollectionSchema(replica.collectionID, ts) + if err != nil { + log.Warn("failed to get collection schema", zap.Error(err)) + return 0, err + } + sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) + if err != nil { + log.Warn("failed to estimate size per record", zap.Error(err)) + return 0, err + } + threshold := Params.DataCoordCfg.SegmentMaxSize * 1024 * 1024 + return int64(threshold / float64(sizePerRecord)), nil +} + +// initSegmentBloomFilter initialize segment pkFilter with a new bloom filter. +// this new BF will be initialized with estimated max rows and default false positive rate. +func (replica *SegmentReplica) initSegmentBloomFilter(s *Segment) error { + var ts Timestamp + if s.startPos != nil { + ts = s.startPos.Timestamp + } + maxRowCount, err := replica.maxRowCountPerSegment(ts) + if err != nil { + log.Warn("initSegmentBloomFilter failed, cannot estimate max row count", zap.Error(err)) + return err + } + + s.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive) + return nil +} + // addNewSegment adds a *New* and *NotFlushed* new segment. Before add, please make sure there's no // such segment by `hasSegment` func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error { - if collID != replica.collectionID { - log.Warn("Mismatch collection", - zap.Int64("input ID", collID), - zap.Int64("expected ID", replica.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", collID) - } - - log.Info("Add new segment", + log := log.With( zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), - zap.String("channel name", channelName), - ) + zap.String("channel name", channelName)) + + if collID != replica.collectionID { + log.Warn("Mismatch collection", + zap.Int64("expected collectionID", replica.collectionID)) + return fmt.Errorf("mismatch collection, ID=%d", collID) + } + + log.Info("Add new segment") seg := &Segment{ collectionID: collID, @@ -295,8 +335,12 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID checkPoint: segmentCheckPoint{0, *startPos}, startPos: startPos, endPos: endPos, + } - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + err := replica.initSegmentBloomFilter(seg) + if err != nil { + log.Warn("failed to addNewSegment, init segment bf returns error", zap.Error(err)) + return err } seg.isNew.Store(true) @@ -353,19 +397,19 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un // addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no // such segment by `hasSegment` func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error { - if collID != replica.collectionID { - log.Warn("Mismatch collection", - zap.Int64("input ID", collID), - zap.Int64("expected ID", replica.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", collID) - } - - log.Info("Add Normal segment", + log := log.With( zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), - zap.String("channel name", channelName), - ) + zap.String("channel name", channelName)) + + if collID != replica.collectionID { + log.Warn("Mismatch collection", + zap.Int64("expected collectionID", replica.collectionID)) + return fmt.Errorf("mismatch collection, ID=%d", collID) + } + + log.Info("Add Normal segment") seg := &Segment{ collectionID: collID, @@ -373,8 +417,6 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu segmentID: segID, channelName: channelName, numRows: numOfRows, - - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), } if cp != nil { @@ -401,19 +443,19 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu // such segment by `hasSegment` func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, recoverTs Timestamp) error { - if collID != replica.collectionID { - log.Warn("Mismatch collection", - zap.Int64("input ID", collID), - zap.Int64("expected ID", replica.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", collID) - } - - log.Info("Add Flushed segment", + log := log.With( zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), - zap.String("channel name", channelName), - ) + zap.String("channel name", channelName)) + + if collID != replica.collectionID { + log.Warn("Mismatch collection", + zap.Int64("expected collectionID", replica.collectionID)) + return fmt.Errorf("mismatch collection, ID=%d", collID) + } + + log.Info("Add Flushed segment") seg := &Segment{ collectionID: collID, @@ -421,9 +463,6 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq segmentID: segID, channelName: channelName, numRows: numOfRows, - - //TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), } err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs) @@ -442,9 +481,11 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq } func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error { + log := log.With(zap.Int64("segmentID", s.segmentID)) log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs))) schema, err := replica.getCollectionSchema(s.collectionID, ts) if err != nil { + log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err)) return err } @@ -468,6 +509,12 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat } } + // no stats log to parse, initialize a new BF + if len(bloomFilterFiles) == 0 { + log.Warn("no stats files to load, initializa a new one") + return replica.initSegmentBloomFilter(s) + } + values, err := replica.chunkManager.MultiRead(bloomFilterFiles) if err != nil { log.Warn("failed to load bloom filter files", zap.Error(err)) @@ -484,13 +531,22 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat return err } for _, stat := range stats { - err = s.pkFilter.Merge(stat.BF) - if err != nil { - return err + // use first BF to merge + if s.pkFilter == nil { + s.pkFilter = stat.BF + } else { + // for compatibility, statslog before 2.1.2 uses separated stats log which needs to be merged + // assuming all legacy BF has same attributes. + err = s.pkFilter.Merge(stat.BF) + if err != nil { + return err + } } + s.updatePk(stat.MinPk) s.updatePk(stat.MaxPk) } + return nil } @@ -723,32 +779,23 @@ func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) { log.Warn("There's no segment", zap.Int64("ID", segID)) } -func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) error { - if collID != replica.collectionID { - log.Warn("Mismatch collection", - zap.Int64("input ID", collID), - zap.Int64("expected ID", replica.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", collID) - } +func (replica *SegmentReplica) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error { - log.Info("merge flushed segments", + log := log.With( + zap.Int64("segment ID", seg.segmentID), + zap.Int64("collection ID", seg.collectionID), + zap.Int64("partition ID", seg.partitionID), + zap.Int64s("compacted from", compactedFrom), zap.Int64("planID", planID), - zap.Int64("compacted To segmentID", segID), - zap.Int64s("compacted From segmentIDs", compactedFrom), - zap.Int64("partition ID", partID), - zap.String("channel name", channelName), - ) + zap.String("channel name", seg.channelName)) - seg := &Segment{ - collectionID: collID, - partitionID: partID, - segmentID: segID, - channelName: channelName, - numRows: numOfRows, - - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + if seg.collectionID != replica.collectionID { + log.Warn("Mismatch collection", + zap.Int64("expected collectionID", replica.collectionID)) + return fmt.Errorf("mismatch collection, ID=%d", seg.collectionID) } + log.Info("merge flushed segments") replica.segMu.Lock() for _, ID := range compactedFrom { s, ok := replica.flushedSegments[ID] @@ -758,11 +805,9 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planI continue } - s.compactedTo = segID + s.compactedTo = seg.segmentID replica.compactedSegments[ID] = s delete(replica.flushedSegments, ID) - - seg.pkFilter.Merge(s.pkFilter) } replica.segMu.Unlock() @@ -770,7 +815,7 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planI seg.isFlushed.Store(true) replica.segMu.Lock() - replica.flushedSegments[segID] = seg + replica.flushedSegments[seg.segmentID] = seg replica.segMu.Unlock() return nil @@ -798,8 +843,11 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un segmentID: segID, channelName: channelName, numRows: numOfRows, + } - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + err := replica.initSegmentBloomFilter(seg) + if err != nil { + return err } seg.updatePKRange(ids) diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index 3071487e19..51adef3390 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -19,6 +19,7 @@ package datanode import ( "context" "encoding/json" + "errors" "fmt" "math/rand" "testing" @@ -56,7 +57,7 @@ func (kv *mockDataCM) MultiRead(keys []string) ([][]byte, error) { FieldID: common.RowIDField, Min: 0, Max: 10, - BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + BF: bloom.NewWithEstimates(100000, maxBloomFalsePositive), } buffer, _ := json.Marshal(stats) return [][]byte{buffer}, nil @@ -67,14 +68,16 @@ type mockPkfilterMergeError struct { } func (kv *mockPkfilterMergeError) MultiRead(keys []string) ([][]byte, error) { - stats := &storage.PrimaryKeyStats{ - FieldID: common.RowIDField, - Min: 0, - Max: 10, - BF: bloom.NewWithEstimates(1, 0.0001), - } - buffer, _ := json.Marshal(stats) - return [][]byte{buffer}, nil + /* + stats := &storage.PrimaryKeyStats{ + FieldID: common.RowIDField, + Min: 0, + Max: 10, + BF: bloom.NewWithEstimates(1, 0.0001), + } + buffer, _ := json.Marshal(stats) + return [][]byte{buffer}, nil*/ + return nil, errors.New("mocked multi read error") } type mockDataCMError struct { @@ -566,6 +569,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { assert.Nil(t, err) if test.metaServiceErr { + sr.collSchema = nil rc.setCollectionID(-1) } else { rc.setCollectionID(1) @@ -657,7 +661,15 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { require.True(t, sr.hasSegment(1, true)) require.True(t, sr.hasSegment(2, true)) - sr.mergeFlushedSegments(3, 1, 0, 100, []UniqueID{1, 2}, "channel", 15) + s := &Segment{ + segmentID: 3, + collectionID: 1, + partitionID: 0, + channelName: "channel", + + numRows: 15, + } + sr.mergeFlushedSegments(s, 100, []UniqueID{1, 2}) assert.True(t, sr.hasSegment(3, true)) assert.False(t, sr.hasSegment(1, true)) assert.False(t, sr.hasSegment(2, true))