diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 44bc68f647..e909f2087d 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -6,6 +6,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -28,30 +29,8 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca }, nil } -func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { - wb.mut.Lock() - defer wb.mut.Unlock() - - // process insert msgs - pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos) - if err != nil { - return err - } - - // update pk oracle - for segmentID, dataList := range pkData { - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) - for _, segment := range segments { - for _, fieldData := range dataList { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err - } - } - } - } - - // distribute delete msg +func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { + // distribute delete msg for previous data for _, delMsg := range deleteMsgs { pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), @@ -72,6 +51,59 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) } } + + for _, inData := range groups { + if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID { + var deletePks []storage.PrimaryKey + var deleteTss []typeutil.Timestamp + for idx, pk := range pks { + ts := delMsg.GetTimestamps()[idx] + if inData.pkExists(pk, ts) { + deletePks = append(deletePks, pk) + deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + } + } + if len(deletePks) > 0 { + wb.bufferDelete(inData.segmentID, deletePks, deleteTss, startPos, endPos) + } + } + } + } +} + +func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { + wb.mut.Lock() + defer wb.mut.Unlock() + + groups, err := wb.prepareInsert(insertMsgs) + if err != nil { + return err + } + + // buffer insert data and add segment if not exists + for _, inData := range groups { + err := wb.bufferInsert(inData, startPos, endPos) + if err != nil { + return err + } + } + + // distribute delete msg + // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data + wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + + // update pk oracle + for _, inData := range groups { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) + for _, segment := range segments { + for _, fieldData := range inData.pkField { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err + } + } + } } // update buffer last checkpoint diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index fa7ce89dd0..3c255bc6ec 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -33,19 +33,21 @@ import ( type BFWriteBufferSuite struct { testutils.PromMetricsSuite - collID int64 - channelName string - collSchema *schemapb.CollectionSchema - syncMgr *syncmgr.MockSyncManager - metacache *metacache.MockMetaCache - broker *broker.MockBroker - storageV2Cache *metacache.StorageV2Cache + collID int64 + channelName string + collInt64Schema *schemapb.CollectionSchema + collVarcharSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacacheInt64 *metacache.MockMetaCache + metacacheVarchar *metacache.MockMetaCache + broker *broker.MockBroker + storageV2Cache *metacache.StorageV2Cache } func (s *BFWriteBufferSuite) SetupSuite() { paramtable.Get().Init(paramtable.NewBaseTable()) s.collID = 100 - s.collSchema = &schemapb.CollectionSchema{ + s.collInt64Schema = &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ { @@ -65,17 +67,69 @@ func (s *BFWriteBufferSuite) SetupSuite() { }, }, } + s.collVarcharSchema = &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxLengthKey, Value: "100"}, + }, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } - storageCache, err := metacache.NewStorageV2Cache(s.collSchema) + storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema) s.Require().NoError(err) s.storageV2Cache = storageCache } -func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { +func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) { tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) }) + + var pkField *schemapb.FieldData + switch pkType { + case schemapb.DataType_Int64: + pkField = &schemapb.FieldData{ + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + } + case schemapb.DataType_VarChar: + pkField = &schemapb.FieldData{ + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }), + }, + }, + }, + }, + } + } flatten := lo.Flatten(vectors) return tss, &msgstream.InsertMsg{ InsertRequest: msgpb.InsertRequest{ @@ -108,18 +162,7 @@ func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim }, }, }, - { - FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: tss, - }, - }, - }, - }, - }, + pkField, { FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, Field: &schemapb.FieldData_Vectors{ @@ -142,7 +185,7 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre delMsg := &msgstream.DeleteMsg{ DeleteRequest: msgpb.DeleteRequest{ PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), - Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }), + Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx+1)) }), }, } return delMsg @@ -150,74 +193,152 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre func (s *BFWriteBufferSuite) SetupTest() { s.syncMgr = syncmgr.NewMockSyncManager(s.T()) - s.metacache = metacache.NewMockMetaCache(s.T()) - s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe() - s.metacache.EXPECT().Collection().Return(s.collID).Maybe() + s.metacacheInt64 = metacache.NewMockMetaCache(s.T()) + s.metacacheInt64.EXPECT().Schema().Return(s.collInt64Schema).Maybe() + s.metacacheInt64.EXPECT().Collection().Return(s.collID).Maybe() + s.metacacheVarchar = metacache.NewMockMetaCache(s.T()) + s.metacacheVarchar.EXPECT().Schema().Return(s.collVarcharSchema).Maybe() + s.metacacheVarchar.EXPECT().Collection().Return(s.collID).Maybe() + s.broker = broker.NewMockBroker(s.T()) var err error - s.storageV2Cache, err = metacache.NewStorageV2Cache(s.collSchema) + s.storageV2Cache, err = metacache.NewStorageV2Cache(s.collInt64Schema) s.Require().NoError(err) } func (s *BFWriteBufferSuite) TestBufferData() { - storageCache, err := metacache.NewStorageV2Cache(s.collSchema) - s.Require().NoError(err) - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{}) - s.NoError(err) - - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) - - pks, msg := s.composeInsertMsg(1000, 10, 128) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - - metrics.DataNodeFlowGraphBufferDataSize.Reset() - err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) - - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) - s.NoError(err) - s.MetricsEqual(value, 5524) -} - -func (s *BFWriteBufferSuite) TestAutoSync() { - paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") - - s.Run("normal_auto_sync", func() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ - syncPolicies: []SyncPolicy{ - GetFullBufferPolicy(), - GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), - GetFlushingSegmentsPolicy(s.metacache), - }, - }) + s.Run("normal_run_int64", func() { + storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{}) s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() - s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) - pks, msg := s.composeInsertMsg(1000, 10, 128) + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) metrics.DataNodeFlowGraphBufferDataSize.Reset() err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.NoError(err) - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) + value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) + s.NoError(err) + s.MetricsEqual(value, 5524) + + delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + s.MetricsEqual(value, 5684) + }) + + s.Run("normal_run_varchar", func() { + storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{}) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) })) + + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + + value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) + s.NoError(err) + s.MetricsEqual(value, 5884) + }) + + s.Run("int_pk_type_not_match", func() { + storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{}) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Error(err) + }) + + s.Run("varchar_pk_not_match", func() { + storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema) + s.Require().NoError(err) + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{}) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Error(err) + }) +} + +func (s *BFWriteBufferSuite) TestAutoSync() { + paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") + + s.Run("normal_auto_sync", func() { + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, nil, s.syncMgr, &writeBufferOption{ + syncPolicies: []SyncPolicy{ + GetFullBufferPolicy(), + GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), + GetFlushingSegmentsPolicy(s.metacacheInt64), + }, + }) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet()) + s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + + value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) s.NoError(err) s.MetricsEqual(value, 0) }) @@ -228,7 +349,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") params.Params.CommonCfg.StorageScheme.SwapTempValue("file") tmpDir := s.T().TempDir() - arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) + arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields) s.Require().NoError(err) space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder(). SetSchema(schema.NewSchema(arrowSchema, &schema.SchemaOptions{ @@ -236,17 +357,17 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { })).Build()) s.Require().NoError(err) s.storageV2Cache.SetSpace(1000, space) - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{}) + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{}) s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - pks, msg := s.composeInsertMsg(1000, 10, 128) + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) @@ -258,7 +379,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") tmpDir := s.T().TempDir() - arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) + arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields) s.Require().NoError(err) space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder(). @@ -269,11 +390,11 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { s.storageV2Cache.SetSpace(1002, space) s.Run("normal_auto_sync", func() { - wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{ + wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{ syncPolicies: []SyncPolicy{ GetFullBufferPolicy(), GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), - GetFlushingSegmentsPolicy(s.metacache), + GetFlushingSegmentsPolicy(s.metacacheInt64), }, }) s.NoError(err) @@ -283,26 +404,26 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { segCompacted := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) metacache.CompactTo(2001)(segCompacted) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() - s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted - s.metacache.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003}) - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted}) + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once() + s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true) + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) + s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted + s.metacacheInt64.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003}) + s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) - pks, msg := s.composeInsertMsg(1000, 10, 128) + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) metrics.DataNodeFlowGraphBufferDataSize.Reset() err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.NoError(err) - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) + value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) s.NoError(err) s.MetricsEqual(value, 0) }) diff --git a/internal/datanode/writebuffer/insert_buffer.go b/internal/datanode/writebuffer/insert_buffer.go index 1d66f05e5f..adc052d001 100644 --- a/internal/datanode/writebuffer/insert_buffer.go +++ b/internal/datanode/writebuffer/insert_buffer.go @@ -10,8 +10,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -118,38 +116,17 @@ func (ib *InsertBuffer) Yield() *storage.InsertData { return ib.buffer } -func (ib *InsertBuffer) Buffer(msgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) ([]storage.FieldData, int64, error) { - pkData := make([]storage.FieldData, 0, len(msgs)) - var totalMemSize int64 = 0 - for _, msg := range msgs { - tmpBuffer, err := storage.InsertMsgToInsertData(msg, ib.collSchema) - if err != nil { - log.Warn("failed to transfer insert msg to insert data", zap.Error(err)) - return nil, 0, err - } - - pkFieldData, err := storage.GetPkFromInsertData(ib.collSchema, tmpBuffer) - if err != nil { - return nil, 0, err - } - if pkFieldData.RowNum() != tmpBuffer.GetRowNum() { - return nil, 0, merr.WrapErrServiceInternal("pk column row num not match") - } - pkData = append(pkData, pkFieldData) - - storage.MergeInsertData(ib.buffer, tmpBuffer) - - tsData, err := storage.GetTimestampFromInsertData(tmpBuffer) - if err != nil { - log.Warn("no timestamp field found in insert msg", zap.Error(err)) - return nil, 0, err - } +func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 { + totalMemSize := int64(0) + for idx, data := range inData.data { + storage.MergeInsertData(ib.buffer, data) + tsData := inData.tsField[idx] // update buffer size - ib.UpdateStatistics(int64(tmpBuffer.GetRowNum()), int64(tmpBuffer.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos) - totalMemSize += int64(tmpBuffer.GetMemorySize()) + ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos) + totalMemSize += int64(data.GetMemorySize()) } - return pkData, totalMemSize, nil + return totalMemSize } func (ib *InsertBuffer) getTimestampRange(tsData *storage.Int64FieldData) TimeRange { diff --git a/internal/datanode/writebuffer/insert_buffer_test.go b/internal/datanode/writebuffer/insert_buffer_test.go index 581119be65..9828ba6b88 100644 --- a/internal/datanode/writebuffer/insert_buffer_test.go +++ b/internal/datanode/writebuffer/insert_buffer_test.go @@ -11,7 +11,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -128,65 +127,28 @@ func (s *InsertBufferSuite) TestBasic() { } func (s *InsertBufferSuite) TestBuffer() { - s.Run("normal_buffer", func() { - pks, insertMsg := s.composeInsertMsg(10, 128) + wb := &writeBufferBase{ + collSchema: s.collSchema, + } + _, insertMsg := s.composeInsertMsg(10, 128) - insertBuffer, err := NewInsertBuffer(s.collSchema) - s.Require().NoError(err) + insertBuffer, err := NewInsertBuffer(s.collSchema) + s.Require().NoError(err) - fieldData, memSize, err := insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) + groups, err := wb.prepareInsert([]*msgstream.InsertMsg{insertMsg}) + s.Require().NoError(err) + s.Require().Len(groups, 1) - pkData := lo.Map(fieldData, func(fd storage.FieldData, _ int) []int64 { - return lo.RepeatBy(fd.RowNum(), func(idx int) int64 { return fd.GetRow(idx).(int64) }) - }) - s.ElementsMatch(pks, lo.Flatten(pkData)) - s.EqualValues(100, insertBuffer.MinTimestamp()) - s.EqualValues(5364, memSize) - }) + memSize := insertBuffer.Buffer(groups[0], &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.Run("pk_not_found", func() { - _, insertMsg := s.composeInsertMsg(10, 128) - - insertMsg.FieldsData = []*schemapb.FieldData{insertMsg.FieldsData[0], insertMsg.FieldsData[1], insertMsg.FieldsData[3]} - - insertBuffer, err := NewInsertBuffer(s.collSchema) - s.Require().NoError(err) - - _, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.Error(err) - }) - - s.Run("schema_without_pk", func() { - badSchema := &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, - }, - { - FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "128"}, - }, - }, - }, - } - - _, insertMsg := s.composeInsertMsg(10, 128) - - insertBuffer, err := NewInsertBuffer(badSchema) - s.Require().NoError(err) - - _, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.Error(err) - }) + s.EqualValues(100, insertBuffer.MinTimestamp()) + s.EqualValues(5364, memSize) } func (s *InsertBufferSuite) TestYield() { + wb := &writeBufferBase{ + collSchema: s.collSchema, + } insertBuffer, err := NewInsertBuffer(s.collSchema) s.Require().NoError(err) @@ -197,8 +159,11 @@ func (s *InsertBufferSuite) TestYield() { s.Require().NoError(err) pks, insertMsg := s.composeInsertMsg(10, 128) - _, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + groups, err := wb.prepareInsert([]*msgstream.InsertMsg{insertMsg}) s.Require().NoError(err) + s.Require().Len(groups, 1) + + insertBuffer.Buffer(groups[0], &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) result = insertBuffer.Yield() s.NotNil(result) diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 5423efa80e..c98f1b6440 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -12,10 +12,12 @@ import ( "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type l0WriteBuffer struct { @@ -45,22 +47,75 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca }, nil } +func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { + for _, delMsg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + for _, segment := range segments { + if segment.CompactTo() != 0 { + continue + } + var deletePks []storage.PrimaryKey + var deleteTss []typeutil.Timestamp + for idx, pk := range pks { + if segment.GetBloomFilterSet().PkExists(pk) { + deletePks = append(deletePks, pk) + deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + } + } + if len(deletePks) > 0 { + wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos) + } + } + + for _, inData := range groups { + if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID { + var deletePks []storage.PrimaryKey + var deleteTss []typeutil.Timestamp + for idx, pk := range pks { + ts := delMsg.GetTimestamps()[idx] + if inData.pkExists(pk, ts) { + deletePks = append(deletePks, pk) + deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + } + } + if len(deletePks) > 0 { + wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos) + } + } + } + } +} + func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { wb.mut.Lock() defer wb.mut.Unlock() - // process insert msgs - pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos) + groups, err := wb.prepareInsert(insertMsgs) if err != nil { - log.Warn("failed to buffer insert data", zap.Error(err)) return err } + // buffer insert data and add segment if not exists + for _, inData := range groups { + err := wb.bufferInsert(inData, startPos, endPos) + if err != nil { + return err + } + } + + // distribute delete msg + // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data + wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + // update pk oracle - for segmentID, dataList := range pkData { - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) + for _, inData := range groups { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) for _, segment := range segments { - for _, fieldData := range dataList { + for _, fieldData := range inData.pkField { err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) if err != nil { return err @@ -69,16 +124,6 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg } } - for _, msg := range deleteMsgs { - l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos) - pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) - err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos) - if err != nil { - log.Warn("failed to buffer delete data", zap.Error(err)) - return err - } - } - // update buffer last checkpoint wb.checkpoint = endPos diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index 60e65f9b79..f6e1c4d81a 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -67,12 +67,41 @@ func (s *L0WriteBufferSuite) SetupSuite() { s.storageCache = storageCache } -func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { +func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) { tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) }) flatten := lo.Flatten(vectors) + var pkField *schemapb.FieldData + switch pkType { + case schemapb.DataType_Int64: + pkField = &schemapb.FieldData{ + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + } + case schemapb.DataType_VarChar: + pkField = &schemapb.FieldData{ + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }), + }, + }, + }, + }, + } + } return tss, &msgstream.InsertMsg{ InsertRequest: msgpb.InsertRequest{ SegmentID: segmentID, @@ -104,18 +133,7 @@ func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim }, }, }, - { - FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: tss, - }, - }, - }, - }, - }, + pkField, { FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, Field: &schemapb.FieldData_Vectors{ @@ -138,7 +156,7 @@ func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre delMsg := &msgstream.DeleteMsg{ DeleteRequest: msgpb.DeleteRequest{ PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), - Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }), + Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)+1) }), }, } return delMsg @@ -154,28 +172,55 @@ func (s *L0WriteBufferSuite) SetupTest() { } func (s *L0WriteBufferSuite) TestBufferData() { - wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ - idAllocator: s.allocator, + s.Run("normal_run", func() { + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.NoError(err) + + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + + value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) + s.NoError(err) + s.MetricsEqual(value, 5524) + + delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + s.MetricsEqual(value, 5684) }) - s.NoError(err) - pks, msg := s.composeInsertMsg(1000, 10, 128) - delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + s.Run("pk_type_not_match", func() { + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.NoError(err) - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() - s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) + pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - metrics.DataNodeFlowGraphBufferDataSize.Reset() - err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) - s.NoError(err) + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) - value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) - s.NoError(err) - s.MetricsEqual(value, 5524) + metrics.DataNodeFlowGraphBufferDataSize.Reset() + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Error(err) + }) } func (s *L0WriteBufferSuite) TestCreateFailure() { diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 2a328e20aa..065037db56 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/atomic" @@ -325,51 +326,138 @@ func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *s return insert, delta, timeRange, start } -// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage. -func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) (map[int64][]storage.FieldData, error) { - insertGroups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.GetSegmentID() }) - segmentPKData := make(map[int64][]storage.FieldData) - segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() }) +type inData struct { + segmentID int64 + partitionID int64 + data []*storage.InsertData + pkField []storage.FieldData + tsField []*storage.Int64FieldData + rowNum int64 - for segmentID, msgs := range insertGroups { - _, ok := wb.metaCache.GetSegmentByID(segmentID) - // new segment - if !ok { - wb.metaCache.AddSegment(&datapb.SegmentInfo{ - ID: segmentID, - PartitionID: segmentPartition[segmentID], - CollectionID: wb.collectionID, - InsertChannel: wb.channelName, - StartPosition: startPos, - State: commonpb.SegmentState_Growing, - }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { - return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize()) - }, metacache.SetStartPosRecorded(false)) - } + batchBF *storage.PkStatistics +} - segBuf := wb.getOrCreateBuffer(segmentID) - - pkData, totalMemSize, err := segBuf.insertBuffer.Buffer(msgs, startPos, endPos) - if err != nil { - log.Warn("failed to buffer insert data", zap.Int64("segmentID", segmentID), zap.Error(err)) - return nil, err - } - segmentPKData[segmentID] = pkData - wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows), - metacache.WithSegmentIDs(segmentID)) - - metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize)) +func (id *inData) generatePkStats() { + id.batchBF = &storage.PkStatistics{ + PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } - return segmentPKData, nil + for _, ids := range id.pkField { + id.batchBF.UpdatePKRange(ids) + } +} + +func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool { + if !id.batchBF.PkExist(pk) { + return false + } + + for batchIdx, timestamps := range id.tsField { + ids := id.pkField[batchIdx] + var primaryKey storage.PrimaryKey + switch pk.Type() { + case schemapb.DataType_Int64: + primaryKey = storage.NewInt64PrimaryKey(0) + case schemapb.DataType_VarChar: + primaryKey = storage.NewVarCharPrimaryKey("") + } + for idx := 0; idx < timestamps.RowNum(); idx++ { + timestamp := timestamps.GetRow(idx).(int64) + if int64(ts) <= timestamp { + continue + } + primaryKey.SetValue(ids.GetRow(idx)) + + if pk.EQ(primaryKey) { + return true + } + } + } + return false +} + +// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID +// also returns primary key field data +func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*inData, error) { + groups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.SegmentID }) + segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() }) + + result := make([]*inData, 0, len(groups)) + for segment, msgs := range groups { + inData := &inData{ + segmentID: segment, + partitionID: segmentPartition[segment], + data: make([]*storage.InsertData, 0, len(msgs)), + pkField: make([]storage.FieldData, 0, len(msgs)), + } + for _, msg := range msgs { + data, err := storage.InsertMsgToInsertData(msg, wb.collSchema) + if err != nil { + log.Warn("failed to transfer insert msg to insert data", zap.Error(err)) + return nil, err + } + + pkFieldData, err := storage.GetPkFromInsertData(wb.collSchema, data) + if err != nil { + return nil, err + } + if pkFieldData.RowNum() != data.GetRowNum() { + return nil, merr.WrapErrServiceInternal("pk column row num not match") + } + + tsFieldData, err := storage.GetTimestampFromInsertData(data) + if err != nil { + return nil, err + } + if tsFieldData.RowNum() != data.GetRowNum() { + return nil, merr.WrapErrServiceInternal("timestamp column row num not match") + } + + inData.data = append(inData.data, data) + inData.pkField = append(inData.pkField, pkFieldData) + inData.tsField = append(inData.tsField, tsFieldData) + inData.rowNum += int64(data.GetRowNum()) + } + inData.generatePkStats() + result = append(result, inData) + } + + return result, nil +} + +// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage. +func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb.MsgPosition) error { + _, ok := wb.metaCache.GetSegmentByID(inData.segmentID) + // new segment + if !ok { + wb.metaCache.AddSegment(&datapb.SegmentInfo{ + ID: inData.segmentID, + PartitionID: inData.partitionID, + CollectionID: wb.collectionID, + InsertChannel: wb.channelName, + StartPosition: startPos, + State: commonpb.SegmentState_Growing, + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize()) + }, metacache.SetStartPosRecorded(false)) + } + + segBuf := wb.getOrCreateBuffer(inData.segmentID) + + totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos) + wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows), + metacache.WithSegmentIDs(inData.segmentID)) + + metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize)) + + return nil } // bufferDelete buffers DeleteMsg into DeleteData. -func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error { +func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) { segBuf := wb.getOrCreateBuffer(segmentID) bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos) metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize)) - return nil } func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {