mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Cherry-pick from master pr: #33566 Related to #27675 Store pk to minimal timestamp in `inData` instead of bloom filter to check whether some delete entry hit current insert batch Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3a037884ac
commit
44e97b7cda
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/bits-and-blooms/bloom/v3"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
@ -85,6 +84,8 @@ type writeBufferBase struct {
|
||||
|
||||
metaWriter syncmgr.MetaWriter
|
||||
collSchema *schemapb.CollectionSchema
|
||||
helper *typeutil.SchemaHelper
|
||||
pkField *schemapb.FieldSchema
|
||||
estSizePerRecord int
|
||||
metaCache metacache.MetaCache
|
||||
syncMgr syncmgr.SyncManager
|
||||
@ -132,11 +133,21 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
helper, err := typeutil.CreateSchemaHelper(schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pkField, err := helper.GetPrimaryKeyField()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wb := &writeBufferBase{
|
||||
channelName: channel,
|
||||
collectionID: metacache.Collection(),
|
||||
collSchema: schema,
|
||||
helper: helper,
|
||||
pkField: pkField,
|
||||
estSizePerRecord: estSize,
|
||||
syncMgr: syncMgr,
|
||||
metaWriter: option.metaWriter,
|
||||
@ -392,46 +403,21 @@ type inData struct {
|
||||
tsField []*storage.Int64FieldData
|
||||
rowNum int64
|
||||
|
||||
batchBF *storage.PkStatistics
|
||||
}
|
||||
|
||||
func (id *inData) generatePkStats() {
|
||||
id.batchBF = &storage.PkStatistics{
|
||||
PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
||||
}
|
||||
|
||||
for _, ids := range id.pkField {
|
||||
id.batchBF.UpdatePKRange(ids)
|
||||
}
|
||||
intPKTs map[int64]int64
|
||||
strPKTs map[string]int64
|
||||
}
|
||||
|
||||
func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
|
||||
if !id.batchBF.PkExist(pk) {
|
||||
return false
|
||||
var ok bool
|
||||
var minTs int64
|
||||
switch pk.Type() {
|
||||
case schemapb.DataType_Int64:
|
||||
minTs, ok = id.intPKTs[pk.GetValue().(int64)]
|
||||
case schemapb.DataType_VarChar:
|
||||
minTs, ok = id.strPKTs[pk.GetValue().(string)]
|
||||
}
|
||||
|
||||
for batchIdx, timestamps := range id.tsField {
|
||||
ids := id.pkField[batchIdx]
|
||||
var primaryKey storage.PrimaryKey
|
||||
switch pk.Type() {
|
||||
case schemapb.DataType_Int64:
|
||||
primaryKey = storage.NewInt64PrimaryKey(0)
|
||||
case schemapb.DataType_VarChar:
|
||||
primaryKey = storage.NewVarCharPrimaryKey("")
|
||||
}
|
||||
for idx := 0; idx < timestamps.RowNum(); idx++ {
|
||||
timestamp := timestamps.GetRow(idx).(int64)
|
||||
if int64(ts) <= timestamp {
|
||||
continue
|
||||
}
|
||||
primaryKey.SetValue(ids.GetRow(idx))
|
||||
|
||||
if pk.EQ(primaryKey) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
return ok && ts > uint64(minTs)
|
||||
}
|
||||
|
||||
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
|
||||
@ -448,6 +434,13 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*
|
||||
data: make([]*storage.InsertData, 0, len(msgs)),
|
||||
pkField: make([]storage.FieldData, 0, len(msgs)),
|
||||
}
|
||||
switch wb.pkField.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
inData.intPKTs = make(map[int64]int64)
|
||||
case schemapb.DataType_VarChar:
|
||||
inData.strPKTs = make(map[string]int64)
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
data, err := storage.InsertMsgToInsertData(msg, wb.collSchema)
|
||||
if err != nil {
|
||||
@ -471,12 +464,32 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*
|
||||
return nil, merr.WrapErrServiceInternal("timestamp column row num not match")
|
||||
}
|
||||
|
||||
timestamps := tsFieldData.GetRows().([]int64)
|
||||
|
||||
switch wb.pkField.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
pks := pkFieldData.GetRows().([]int64)
|
||||
for idx, pk := range pks {
|
||||
ts, ok := inData.intPKTs[pk]
|
||||
if !ok || timestamps[idx] < ts {
|
||||
inData.intPKTs[pk] = timestamps[idx]
|
||||
}
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
pks := pkFieldData.GetRows().([]string)
|
||||
for idx, pk := range pks {
|
||||
ts, ok := inData.strPKTs[pk]
|
||||
if !ok || timestamps[idx] < ts {
|
||||
inData.strPKTs[pk] = timestamps[idx]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inData.data = append(inData.data, data)
|
||||
inData.pkField = append(inData.pkField, pkFieldData)
|
||||
inData.tsField = append(inData.tsField, tsFieldData)
|
||||
inData.rowNum += int64(data.GetRowNum())
|
||||
}
|
||||
inData.generatePkStats()
|
||||
result = append(result, inData)
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user