fix: Pooling datanode decompress statslog without rootpath (#44288)

This bug makes pooling datanode unable to execute L0 compactions

See also: #44289

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-09-10 16:11:56 +08:00 committed by GitHub
parent f5618d5153
commit c5a8aace18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 232 additions and 2 deletions

View File

@ -439,8 +439,20 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*
segment := segment
innerCtx := ctx
future := pool.Submit(func() (any, error) {
_ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(),
segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
err := binlog.DecompressBinLogWithRootPath(
t.compactionParams.StorageConfig.GetRootPath(),
storage.StatsBinlog,
segment.GetCollectionID(),
segment.GetPartitionID(),
segment.GetSegmentID(),
segment.GetField2StatslogPaths())
if err != nil {
log.Warn("failed to decompress segment stats log",
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Error(err))
return err, err
}
pks, err := compaction.LoadStats(innerCtx, t.cm,
t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
if err != nil {

View File

@ -661,3 +661,221 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() {
s.Error(err)
})
}
func (s *LevelZeroCompactionTaskSuite) TestLoadBFWithDecompression() {
s.Run("successful decompression with root path", func() {
// Test that DecompressBinLogWithRootPath is called with correct parameters
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
CollectionID: 100,
PartitionID: 200,
SegmentID: 300,
Level: datapb.SegmentLevel_L1,
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
},
},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
}
s.task.plan = plan
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3, 4, 5},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.NoError(err)
s.Len(bfs, 1)
s.Contains(bfs, int64(300))
})
s.Run("multiple segments with decompression", func() {
plan := &datapb.CompactionPlan{
PlanID: 19531,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
CollectionID: 100,
PartitionID: 200,
SegmentID: 301,
Level: datapb.SegmentLevel_L1,
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogID: 10001, LogSize: 100},
},
},
},
},
{
CollectionID: 100,
PartitionID: 200,
SegmentID: 302,
Level: datapb.SegmentLevel_L1,
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogID: 10002, LogSize: 100},
},
},
},
},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
}
s.task.plan = plan
data1 := &storage.Int64FieldData{
Data: []int64{1, 2, 3},
}
data2 := &storage.Int64FieldData{
Data: []int64{4, 5, 6},
}
sw1 := &storage.StatsWriter{}
err := sw1.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data1)
s.NoError(err)
sw2 := &storage.StatsWriter{}
err = sw2.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data2)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw1.GetBuffer()}, nil).Once()
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw2.GetBuffer()}, nil).Once()
s.task.cm = cm
bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.NoError(err)
s.Len(bfs, 2)
s.Contains(bfs, int64(301))
s.Contains(bfs, int64(302))
})
s.Run("decompression with empty field2statslogpaths", func() {
plan := &datapb.CompactionPlan{
PlanID: 19532,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
CollectionID: 100,
PartitionID: 200,
SegmentID: 303,
Level: datapb.SegmentLevel_L1,
Field2StatslogPaths: []*datapb.FieldBinlog{},
},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
}
s.task.plan = plan
cm := mocks.NewChunkManager(s.T())
// Expect no calls since there are no stats log paths
s.task.cm = cm
bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.NoError(err)
s.Len(bfs, 1)
s.Contains(bfs, int64(303))
// Bloom filter should be empty since no stats were loaded
})
s.Run("verify root path parameter usage", func() {
// Test that the root path from compactionParams.StorageConfig is used correctly
plan := &datapb.CompactionPlan{
PlanID: 19533,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
CollectionID: 100,
PartitionID: 200,
SegmentID: 304,
Level: datapb.SegmentLevel_L1,
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogID: 10003, LogSize: 100, LogPath: ""},
},
},
},
},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
}
s.task.plan = plan
// Verify that compactionParams.StorageConfig.GetRootPath() is used
s.NotEmpty(s.task.compactionParams.StorageConfig.GetRootPath())
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.MatchedBy(func(paths []string) bool {
// Verify the path includes the root path
return len(paths) > 0
})).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.NoError(err)
s.Len(bfs, 1)
s.Contains(bfs, int64(304))
})
}