diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index a2ad7580bc..56b945c8b5 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -21,18 +21,15 @@ import ( "math/rand" "os" "strconv" - "strings" "testing" "time" "github.com/stretchr/testify/assert" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" util2 "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -40,25 +37,12 @@ import ( func TestMain(t *testing.M) { rand.Seed(time.Now().Unix()) - // init embed etcd - embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer() - if err != nil { - log.Fatal("failed to start embed etcd server", zap.Error(err)) - } - defer os.RemoveAll(tempDir) - defer embedetcdServer.Close() - - addrs := etcd.GetEmbedEtcdEndpoints(embedetcdServer) - // setup env for etcd endpoint - os.Setenv("etcd.endpoints", strings.Join(addrs, ",")) - path := "/tmp/milvus_ut/rdb_data" os.Setenv("ROCKSMQ_PATH", path) defer os.RemoveAll(path) paramtable.Init() // change to specific channel for test - paramtable.Get().Save(Params.EtcdCfg.Endpoints.Key, strings.Join(addrs, ",")) paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int())) code := t.Run() diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index d36a88ecbd..c920f97c9a 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -164,8 +165,15 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] { if err != nil { return } + + // Parse ts parameters from options + tsStart, tsEnd, err := importutilv2.ParseTimeRange(t.req.GetOptions()) + if err != nil { + return + } + var reader binlog.L0Reader - reader, err = binlog.NewL0Reader(t.ctx, t.cm, pkField, file, bufferSize) + reader, err = binlog.NewL0Reader(t.ctx, t.cm, pkField, file, bufferSize, tsStart, tsEnd) if err != nil { return } diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 8e3dab269d..0760e58198 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -152,7 +153,14 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] { if err != nil { return } - reader, err := binlog.NewL0Reader(t.ctx, t.cm, pkField, file, bufferSize) + + // Parse ts parameters from options + tsStart, tsEnd, err := importutilv2.ParseTimeRange(t.req.GetOptions()) + if err != nil { + return + } + + reader, err := binlog.NewL0Reader(t.ctx, t.cm, pkField, file, bufferSize, tsStart, tsEnd) if err != nil { return } diff --git a/internal/util/importutilv2/binlog/filter.go b/internal/util/importutilv2/binlog/filter.go index 593490bd00..6c8333865a 100644 --- a/internal/util/importutilv2/binlog/filter.go +++ b/internal/util/importutilv2/binlog/filter.go @@ -17,6 +17,7 @@ package binlog import ( + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -44,3 +45,11 @@ func FilterWithTimeRange(tsStart, tsEnd uint64) Filter { return uint64(ts) >= tsStart && uint64(ts) <= tsEnd } } + +type L0Filter func(dl *storage.DeleteLog) bool + +func FilterDeleteWithTimeRange(tsStart, tsEnd uint64) L0Filter { + return func(dl *storage.DeleteLog) bool { + return dl.Ts >= tsStart && dl.Ts <= tsEnd + } +} diff --git a/internal/util/importutilv2/binlog/l0_reader.go b/internal/util/importutilv2/binlog/l0_reader.go index 2a87717cbc..df84ee8bca 100644 --- a/internal/util/importutilv2/binlog/l0_reader.go +++ b/internal/util/importutilv2/binlog/l0_reader.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "math" "go.uber.org/zap" @@ -42,6 +43,9 @@ type l0Reader struct { bufferSize int deltaLogs []string readIdx int + + // new filter-based approach + filters []L0Filter } func NewL0Reader(ctx context.Context, @@ -49,6 +53,8 @@ func NewL0Reader(ctx context.Context, pkField *schemapb.FieldSchema, importFile *internalpb.ImportFile, bufferSize int, + tsStart, + tsEnd uint64, ) (*l0Reader, error) { r := &l0Reader{ ctx: ctx, @@ -56,6 +62,10 @@ func NewL0Reader(ctx context.Context, pkField: pkField, bufferSize: bufferSize, } + + // Initialize filters + r.initFilters(tsStart, tsEnd) + if len(importFile.GetPaths()) != 1 { return nil, merr.WrapErrImportFailed( fmt.Sprintf("there should be one prefix, but got %s", importFile.GetPaths())) @@ -72,6 +82,24 @@ func NewL0Reader(ctx context.Context, return r, nil } +// initFilters initializes the filter chain for L0 reader +func (r *l0Reader) initFilters(tsStart, tsEnd uint64) { + // Add time range filter if specified + if tsStart != 0 || tsEnd != math.MaxUint64 { + r.filters = append(r.filters, FilterDeleteWithTimeRange(tsStart, tsEnd)) + } +} + +// filter applies all filters to a delete log record +func (r *l0Reader) filter(dl *storage.DeleteLog) bool { + for _, f := range r.filters { + if !f(dl) { + return false + } + } + return true +} + func (r *l0Reader) Read() (*storage.DeleteData, error) { deleteData := storage.NewDeleteData(nil, nil) for { @@ -109,6 +137,11 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) { return nil, err } + // Apply filters + if !r.filter(*dl) { + continue + } + deleteData.Append((*dl).Pk, (*dl).Ts) } diff --git a/internal/util/importutilv2/binlog/l0_reader_test.go b/internal/util/importutilv2/binlog/l0_reader_test.go index 53963cedf2..49f41b1bbd 100644 --- a/internal/util/importutilv2/binlog/l0_reader_test.go +++ b/internal/util/importutilv2/binlog/l0_reader_test.go @@ -20,9 +20,11 @@ import ( "context" "fmt" "io" + "math" "os" "testing" + "github.com/bytedance/mockey" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -39,13 +41,13 @@ func TestL0Reader_NewL0Reader(t *testing.T) { t.Run("normal", func(t *testing.T) { cm := mocks.NewChunkManager(t) cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100, 0, math.MaxUint64) assert.NoError(t, err) assert.NotNil(t, r) }) t.Run("invalid path", func(t *testing.T) { - r, err := NewL0Reader(ctx, nil, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix", "mock-prefix2"}}, 100) + r, err := NewL0Reader(ctx, nil, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix", "mock-prefix2"}}, 100, 0, math.MaxUint64) assert.Error(t, err) assert.Nil(t, r) }) @@ -53,7 +55,7 @@ func TestL0Reader_NewL0Reader(t *testing.T) { t.Run("list failed", func(t *testing.T) { cm := mocks.NewChunkManager(t) cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error")) - r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100, 0, math.MaxUint64) assert.Error(t, err) assert.Nil(t, r) }) @@ -83,7 +85,7 @@ func TestL0Reader_Read(t *testing.T) { }) cm.EXPECT().Read(mock.Anything, mock.Anything).Return(blob.Value, nil) - r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100, 0, math.MaxUint64) assert.NoError(t, err) res, err := r.Read() @@ -100,3 +102,74 @@ func TestMain(m *testing.M) { paramtable.Init() os.Exit(m.Run()) } + +func TestL0Reader_NoFilters(t *testing.T) { + ctx := context.Background() + const ( + delCnt = 100 + ) + + deleteData := storage.NewDeleteData(nil, nil) + for i := 0; i < delCnt; i++ { + deleteData.Append(storage.NewVarCharPrimaryKey(fmt.Sprintf("No.%d", i)), uint64(i+1)) + } + deleteCodec := storage.NewDeleteCodec() + blob, err := deleteCodec.Serialize(1, 2, 3, deleteData) + assert.NoError(t, err) + + // Mock storage.ListAllChunkWithPrefix + mockListAllChunk := mockey.Mock(storage.ListAllChunkWithPrefix).Return([]string{"a/b/c/"}, nil, nil).Build() + defer mockListAllChunk.UnPatch() + + // Mock ChunkManager.Read using interface + mockChunkManager := storage.NewLocalChunkManager() + mockRead := mockey.Mock((*storage.LocalChunkManager).Read).Return(blob.Value, nil).Build() + defer mockRead.UnPatch() + + // Create reader without any filters + r, err := NewL0Reader(ctx, mockChunkManager, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100, 0, math.MaxUint64) + assert.NoError(t, err) + + res, err := r.Read() + assert.NoError(t, err) + // Should include all records + assert.Equal(t, int64(delCnt), res.RowCount) +} + +func TestFilterDeleteWithTimeRange(t *testing.T) { + t.Run("normal range", func(t *testing.T) { + filter := FilterDeleteWithTimeRange(10, 20) + + // Test within range + dl := &storage.DeleteLog{Ts: 15} + assert.True(t, filter(dl)) + + // Test at boundaries + dl.Ts = 10 + assert.True(t, filter(dl)) + dl.Ts = 20 + assert.True(t, filter(dl)) + + // Test outside range + dl.Ts = 9 + assert.False(t, filter(dl)) + dl.Ts = 21 + assert.False(t, filter(dl)) + }) + + t.Run("edge cases", func(t *testing.T) { + // Test with max uint64 + filter := FilterDeleteWithTimeRange(0, math.MaxUint64) + dl := &storage.DeleteLog{Ts: 1000} + assert.True(t, filter(dl)) + + // Test with same start and end + filter = FilterDeleteWithTimeRange(5, 5) + dl.Ts = 5 + assert.True(t, filter(dl)) + dl.Ts = 4 + assert.False(t, filter(dl)) + dl.Ts = 6 + assert.False(t, filter(dl)) + }) +}