diff --git a/internal/util/importutilv2/binlog/filter.go b/internal/util/importutilv2/binlog/filter.go index fd7982905c..593490bd00 100644 --- a/internal/util/importutilv2/binlog/filter.go +++ b/internal/util/importutilv2/binlog/filter.go @@ -31,10 +31,8 @@ func FilterWithDelete(r *reader) (Filter, error) { return func(row map[int64]interface{}) bool { rowPk := row[pkField.GetFieldID()] rowTs := row[common.TimeStampField] - for i, pk := range r.deleteData.Pks { - if pk.GetValue() == rowPk && int64(r.deleteData.Tss[i]) > rowTs.(int64) { - return false - } + if ts, ok := r.deleteData[rowPk]; ok && int64(ts) > rowTs.(int64) { + return false } return true }, nil diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 6cbf19b8ce..b3a32c75c7 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -24,9 +24,11 @@ import ( "github.com/samber/lo" "go.uber.org/atomic" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -37,8 +39,8 @@ type reader struct { schema *schemapb.CollectionSchema fileSize *atomic.Int64 - deleteData *storage.DeleteData - insertLogs map[int64][]string // fieldID -> binlogs + deleteData map[any]typeutil.Timestamp // pk2ts + insertLogs map[int64][]string // fieldID -> binlogs readIdx int filters []Filter @@ -101,6 +103,11 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { return err } + log.Ctx(context.TODO()).Info("read delete done", + zap.String("collection", r.schema.GetName()), + zap.Int("deleteRows", len(r.deleteData)), + ) + deleteFilter, err := FilterWithDelete(r) if err != nil { return err @@ -109,8 +116,8 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { return nil } -func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage.DeleteData, error) { - deleteData := storage.NewDeleteData(nil, nil) +func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]typeutil.Timestamp, error) { + deleteData := make(map[any]typeutil.Timestamp) for _, path := range deltaLogs { reader, err := newBinlogReader(r.ctx, r.cm, path) if err != nil { @@ -129,7 +136,10 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage return nil, err } if dl.Ts >= tsStart && dl.Ts <= tsEnd { - deleteData.Append(dl.Pk, dl.Ts) + pk := dl.Pk.GetValue() + if ts, ok := deleteData[pk]; !ok || ts < dl.Ts { + deleteData[pk] = dl.Ts + } } } }