enhance: [2.5] Only download necessary fields during clustering analyze phase (#43362)

issue: #43310 

master pr: #43322

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-07-23 10:18:53 +08:00 committed by GitHub
parent 3ed3bf92e5
commit 2a516697c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 67 additions and 14 deletions

View File

@ -1163,8 +1163,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
fieldBinlogPaths := make([][]string, 0)
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
timestampTo int64 = -1
timestampFrom int64 = -1
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
@ -1183,10 +1181,16 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty")
}
log.Debug("binlogNum", zap.Int("binlogNum", binlogNum))
requiredFields := typeutil.NewSet[int64]()
requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID())
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, f := range segment.GetFieldBinlogs() {
ps = append(ps, f.GetBinlogs()[idx].GetLogPath())
// Only download necessary fields during clustering analyze phase
// system fields, pk field, clustering key field
if requiredFields.Contain(f.GetFieldID()) {
ps = append(ps, f.GetBinlogs()[idx].GetLogPath())
}
}
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}
@ -1225,14 +1229,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}
// Update timestampFrom, timestampTo
if v.Timestamp < timestampFrom || timestampFrom == -1 {
timestampFrom = v.Timestamp
}
if v.Timestamp > timestampTo || timestampFrom == -1 {
timestampTo = v.Timestamp
}
// rowValue := vIter.GetData().(*iterators.InsertRow).GetValue()
row, ok := v.Value.(map[typeutil.UniqueID]interface{})
if !ok {

View File

@ -173,6 +173,8 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
}
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
dblobs, err := getInt64DeltaBlobs(
1,
[]int64{100},
@ -199,7 +201,24 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
var ml sync.RWMutex
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error {
ml.Lock()
defer ml.Unlock()
for k, v := range m {
kvs[k] = v
}
return nil
})
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, strings []string) ([][]byte, error) {
result := make([][]byte, 0, len(strings))
ml.RLock()
defer ml.RUnlock()
for _, path := range strings {
result = append(result, kvs[path])
}
return result, nil
})
s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
@ -211,6 +230,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
},
}
s.task.binlogIO = s.mockBinlogIO
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 2048
@ -281,6 +301,16 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
var ml sync.RWMutex
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error {
ml.Lock()
defer ml.Unlock()
for k, v := range m {
kvs[k] = v
}
return nil
})
var one sync.Once
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, strings []string) ([][]byte, error) {
@ -288,7 +318,13 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
one.Do(func() {
s.task.memoryBufferSize = 32 * 1024 * 1024
})
return lo.Values(kvs), nil
result := make([][]byte, 0, len(strings))
ml.RLock()
defer ml.RUnlock()
for _, path := range strings {
result = append(result, kvs[path])
}
return result, nil
})
s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
@ -298,6 +334,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
},
}
s.task.binlogIO = s.mockBinlogIO
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 3000
@ -365,7 +402,26 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
var ml sync.RWMutex
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error {
ml.Lock()
defer ml.Unlock()
for k, v := range m {
kvs[k] = v
}
return nil
})
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, strings []string) ([][]byte, error) {
result := make([][]byte, 0, len(strings))
ml.RLock()
defer ml.RUnlock()
for _, path := range strings {
result = append(result, kvs[path])
}
return result, nil
})
s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
@ -374,6 +430,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
},
}
s.task.binlogIO = s.mockBinlogIO
s.task.bm25FieldIds = []int64{102}
s.task.plan.Schema = schema
s.task.plan.ClusteringKeyField = 100