mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Accelerate delete filtering during binlog import (#41551)
Use map for deleteData instead of slice to accelerate delete filtering during binlog import. issue: https://github.com/milvus-io/milvus/issues/41550 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
3892451880
commit
16eb5eb921
@ -31,10 +31,8 @@ func FilterWithDelete(r *reader) (Filter, error) {
|
||||
return func(row map[int64]interface{}) bool {
|
||||
rowPk := row[pkField.GetFieldID()]
|
||||
rowTs := row[common.TimeStampField]
|
||||
for i, pk := range r.deleteData.Pks {
|
||||
if pk.GetValue() == rowPk && int64(r.deleteData.Tss[i]) > rowTs.(int64) {
|
||||
return false
|
||||
}
|
||||
if ts, ok := r.deleteData[rowPk]; ok && int64(ts) > rowTs.(int64) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}, nil
|
||||
|
||||
@ -24,9 +24,11 @@ import (
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
@ -37,8 +39,8 @@ type reader struct {
|
||||
schema *schemapb.CollectionSchema
|
||||
|
||||
fileSize *atomic.Int64
|
||||
deleteData *storage.DeleteData
|
||||
insertLogs map[int64][]string // fieldID -> binlogs
|
||||
deleteData map[any]typeutil.Timestamp // pk2ts
|
||||
insertLogs map[int64][]string // fieldID -> binlogs
|
||||
|
||||
readIdx int
|
||||
filters []Filter
|
||||
@ -101,6 +103,11 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Ctx(context.TODO()).Info("read delete done",
|
||||
zap.String("collection", r.schema.GetName()),
|
||||
zap.Int("deleteRows", len(r.deleteData)),
|
||||
)
|
||||
|
||||
deleteFilter, err := FilterWithDelete(r)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -109,8 +116,8 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage.DeleteData, error) {
|
||||
deleteData := storage.NewDeleteData(nil, nil)
|
||||
func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]typeutil.Timestamp, error) {
|
||||
deleteData := make(map[any]typeutil.Timestamp)
|
||||
for _, path := range deltaLogs {
|
||||
reader, err := newBinlogReader(r.ctx, r.cm, path)
|
||||
if err != nil {
|
||||
@ -129,7 +136,10 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage
|
||||
return nil, err
|
||||
}
|
||||
if dl.Ts >= tsStart && dl.Ts <= tsEnd {
|
||||
deleteData.Append(dl.Pk, dl.Ts)
|
||||
pk := dl.Pk.GetValue()
|
||||
if ts, ok := deleteData[pk]; !ok || ts < dl.Ts {
|
||||
deleteData[pk] = dl.Ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user