From 44e97b7cdaeff7bf5a714f80c4ac96c5885a8e15 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 4 Jun 2024 19:21:54 +0800 Subject: [PATCH] enhance: [2.4] Use map PK to timestamp in buffer insert (#33566) (#33582) Cherry-pick from master pr: #33566 Related to #27675 Store pk to minimal timestamp in `inData` instead of bloom filter to check whether some delete entry hit current insert batch Signed-off-by: Congqi Xia --- internal/datanode/writebuffer/write_buffer.go | 87 +++++++++++-------- 1 file changed, 50 insertions(+), 37 deletions(-) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 9f0f93f304..d18f374a73 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/atomic" @@ -85,6 +84,8 @@ type writeBufferBase struct { metaWriter syncmgr.MetaWriter collSchema *schemapb.CollectionSchema + helper *typeutil.SchemaHelper + pkField *schemapb.FieldSchema estSizePerRecord int metaCache metacache.MetaCache syncMgr syncmgr.SyncManager @@ -132,11 +133,21 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2 if err != nil { return nil, err } + helper, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + return nil, err + } + pkField, err := helper.GetPrimaryKeyField() + if err != nil { + return nil, err + } wb := &writeBufferBase{ channelName: channel, collectionID: metacache.Collection(), collSchema: schema, + helper: helper, + pkField: pkField, estSizePerRecord: estSize, syncMgr: syncMgr, metaWriter: option.metaWriter, @@ -392,46 +403,21 @@ type inData struct { tsField []*storage.Int64FieldData rowNum int64 - batchBF *storage.PkStatistics -} - -func (id *inData) generatePkStats() { - id.batchBF = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } - - for _, ids := range id.pkField { - id.batchBF.UpdatePKRange(ids) - } + intPKTs map[int64]int64 + strPKTs map[string]int64 } func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool { - if !id.batchBF.PkExist(pk) { - return false + var ok bool + var minTs int64 + switch pk.Type() { + case schemapb.DataType_Int64: + minTs, ok = id.intPKTs[pk.GetValue().(int64)] + case schemapb.DataType_VarChar: + minTs, ok = id.strPKTs[pk.GetValue().(string)] } - for batchIdx, timestamps := range id.tsField { - ids := id.pkField[batchIdx] - var primaryKey storage.PrimaryKey - switch pk.Type() { - case schemapb.DataType_Int64: - primaryKey = storage.NewInt64PrimaryKey(0) - case schemapb.DataType_VarChar: - primaryKey = storage.NewVarCharPrimaryKey("") - } - for idx := 0; idx < timestamps.RowNum(); idx++ { - timestamp := timestamps.GetRow(idx).(int64) - if int64(ts) <= timestamp { - continue - } - primaryKey.SetValue(ids.GetRow(idx)) - - if pk.EQ(primaryKey) { - return true - } - } - } - return false + return ok && ts > uint64(minTs) } // prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID @@ -448,6 +434,13 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]* data: make([]*storage.InsertData, 0, len(msgs)), pkField: make([]storage.FieldData, 0, len(msgs)), } + switch wb.pkField.GetDataType() { + case schemapb.DataType_Int64: + inData.intPKTs = make(map[int64]int64) + case schemapb.DataType_VarChar: + inData.strPKTs = make(map[string]int64) + } + for _, msg := range msgs { data, err := storage.InsertMsgToInsertData(msg, wb.collSchema) if err != nil { @@ -471,12 +464,32 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]* return nil, merr.WrapErrServiceInternal("timestamp column row num not match") } + timestamps := tsFieldData.GetRows().([]int64) + + switch wb.pkField.GetDataType() { + case schemapb.DataType_Int64: + pks := pkFieldData.GetRows().([]int64) + for idx, pk := range pks { + ts, ok := inData.intPKTs[pk] + if !ok || timestamps[idx] < ts { + inData.intPKTs[pk] = timestamps[idx] + } + } + case schemapb.DataType_VarChar: + pks := pkFieldData.GetRows().([]string) + for idx, pk := range pks { + ts, ok := inData.strPKTs[pk] + if !ok || timestamps[idx] < ts { + inData.strPKTs[pk] = timestamps[idx] + } + } + } + inData.data = append(inData.data, data) inData.pkField = append(inData.pkField, pkFieldData) inData.tsField = append(inData.tsField, tsFieldData) inData.rowNum += int64(data.GetRowNum()) } - inData.generatePkStats() result = append(result, inData) }