diff --git a/internal/datanode/compactor/l0_compactor.go b/internal/datanode/compactor/l0_compactor.go index 63fc4675e3..147f10e21e 100644 --- a/internal/datanode/compactor/l0_compactor.go +++ b/internal/datanode/compactor/l0_compactor.go @@ -439,8 +439,20 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []* segment := segment innerCtx := ctx future := pool.Submit(func() (any, error) { - _ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(), - segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) + err := binlog.DecompressBinLogWithRootPath( + t.compactionParams.StorageConfig.GetRootPath(), + storage.StatsBinlog, + segment.GetCollectionID(), + segment.GetPartitionID(), + segment.GetSegmentID(), + segment.GetField2StatslogPaths()) + if err != nil { + log.Warn("failed to decompress segment stats log", + zap.Int64("planID", t.plan.GetPlanID()), + zap.String("type", t.plan.GetType().String()), + zap.Error(err)) + return err, err + } pks, err := compaction.LoadStats(innerCtx, t.cm, t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) if err != nil { diff --git a/internal/datanode/compactor/l0_compactor_test.go b/internal/datanode/compactor/l0_compactor_test.go index ecfab8e437..3641b7c161 100644 --- a/internal/datanode/compactor/l0_compactor_test.go +++ b/internal/datanode/compactor/l0_compactor_test.go @@ -661,3 +661,221 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() { s.Error(err) }) } + +func (s *LevelZeroCompactionTaskSuite) TestLoadBFWithDecompression() { + s.Run("successful decompression with root path", func() { + // Test that DecompressBinLogWithRootPath is called with correct parameters + plan := &datapb.CompactionPlan{ + PlanID: 19530, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + CollectionID: 100, + PartitionID: 200, + SegmentID: 300, + Level: datapb.SegmentLevel_L1, + Field2StatslogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogID: 9999, LogSize: 100}, + }, + }, + }, + }, + }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + }, + }, + } + + s.task.plan = plan + + data := &storage.Int64FieldData{ + Data: []int64{1, 2, 3, 4, 5}, + } + sw := &storage.StatsWriter{} + err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data) + s.NoError(err) + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) + s.task.cm = cm + + bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) + s.NoError(err) + s.Len(bfs, 1) + s.Contains(bfs, int64(300)) + }) + + s.Run("multiple segments with decompression", func() { + plan := &datapb.CompactionPlan{ + PlanID: 19531, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + CollectionID: 100, + PartitionID: 200, + SegmentID: 301, + Level: datapb.SegmentLevel_L1, + Field2StatslogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogID: 10001, LogSize: 100}, + }, + }, + }, + }, + { + CollectionID: 100, + PartitionID: 200, + SegmentID: 302, + Level: datapb.SegmentLevel_L1, + Field2StatslogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogID: 10002, LogSize: 100}, + }, + }, + }, + }, + }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + }, + }, + } + + s.task.plan = plan + + data1 := &storage.Int64FieldData{ + Data: []int64{1, 2, 3}, + } + data2 := &storage.Int64FieldData{ + Data: []int64{4, 5, 6}, + } + sw1 := &storage.StatsWriter{} + err := sw1.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data1) + s.NoError(err) + sw2 := &storage.StatsWriter{} + err = sw2.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data2) + s.NoError(err) + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw1.GetBuffer()}, nil).Once() + cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw2.GetBuffer()}, nil).Once() + s.task.cm = cm + + bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) + s.NoError(err) + s.Len(bfs, 2) + s.Contains(bfs, int64(301)) + s.Contains(bfs, int64(302)) + }) + + s.Run("decompression with empty field2statslogpaths", func() { + plan := &datapb.CompactionPlan{ + PlanID: 19532, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + CollectionID: 100, + PartitionID: 200, + SegmentID: 303, + Level: datapb.SegmentLevel_L1, + Field2StatslogPaths: []*datapb.FieldBinlog{}, + }, + }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + }, + }, + } + + s.task.plan = plan + + cm := mocks.NewChunkManager(s.T()) + // Expect no calls since there are no stats log paths + s.task.cm = cm + + bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) + s.NoError(err) + s.Len(bfs, 1) + s.Contains(bfs, int64(303)) + // Bloom filter should be empty since no stats were loaded + }) + + s.Run("verify root path parameter usage", func() { + // Test that the root path from compactionParams.StorageConfig is used correctly + plan := &datapb.CompactionPlan{ + PlanID: 19533, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + CollectionID: 100, + PartitionID: 200, + SegmentID: 304, + Level: datapb.SegmentLevel_L1, + Field2StatslogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogID: 10003, LogSize: 100, LogPath: ""}, + }, + }, + }, + }, + }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + }, + }, + } + + s.task.plan = plan + // Verify that compactionParams.StorageConfig.GetRootPath() is used + s.NotEmpty(s.task.compactionParams.StorageConfig.GetRootPath()) + + data := &storage.Int64FieldData{ + Data: []int64{1, 2, 3}, + } + sw := &storage.StatsWriter{} + err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data) + s.NoError(err) + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().MultiRead(mock.Anything, mock.MatchedBy(func(paths []string) bool { + // Verify the path includes the root path + return len(paths) > 0 + })).Return([][]byte{sw.GetBuffer()}, nil) + s.task.cm = cm + + bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) + s.NoError(err) + s.Len(bfs, 1) + s.Contains(bfs, int64(304)) + }) +}