diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 29ec489cda..efd7c4e034 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -1163,8 +1163,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( fieldBinlogPaths := make([][]string, 0) // initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state var ( - timestampTo int64 = -1 - timestampFrom int64 = -1 remained int64 = 0 analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0) ) @@ -1183,10 +1181,16 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty") } log.Debug("binlogNum", zap.Int("binlogNum", binlogNum)) + requiredFields := typeutil.NewSet[int64]() + requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID()) for idx := 0; idx < binlogNum; idx++ { var ps []string for _, f := range segment.GetFieldBinlogs() { - ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) + // Only download necessary fields during clustering analyze phase + // system fields, pk field, clustering key field + if requiredFields.Contain(f.GetFieldID()) { + ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) + } } fieldBinlogPaths = append(fieldBinlogPaths, ps) } @@ -1225,14 +1229,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { continue } - - // Update timestampFrom, timestampTo - if v.Timestamp < timestampFrom || timestampFrom == -1 { - timestampFrom = v.Timestamp - } - if v.Timestamp > timestampTo || timestampFrom == -1 { - timestampTo = v.Timestamp - } // rowValue := vIter.GetData().(*iterators.InsertRow).GetValue() row, ok := v.Value.(map[typeutil.UniqueID]interface{}) if !ok { diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 022088c0ee..b8f7db6d94 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -173,6 +173,8 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() { } func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { + s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + dblobs, err := getInt64DeltaBlobs( 1, []int64{100}, @@ -199,7 +201,24 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) s.NoError(err) - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) + var ml sync.RWMutex + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error { + ml.Lock() + defer ml.Unlock() + for k, v := range m { + kvs[k] = v + } + return nil + }) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, strings []string) ([][]byte, error) { + result := make([][]byte, 0, len(strings)) + ml.RLock() + defer ml.RUnlock() + for _, path := range strings { + result = append(result, kvs[path]) + } + return result, nil + }) s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ { @@ -211,6 +230,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { }, } + s.task.binlogIO = s.mockBinlogIO s.task.plan.Schema = genCollectionSchema() s.task.plan.ClusteringKeyField = 100 s.task.plan.PreferSegmentRows = 2048 @@ -281,6 +301,16 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) s.NoError(err) + var ml sync.RWMutex + s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error { + ml.Lock() + defer ml.Unlock() + for k, v := range m { + kvs[k] = v + } + return nil + }) var one sync.Once s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, strings []string) ([][]byte, error) { @@ -288,7 +318,13 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( one.Do(func() { s.task.memoryBufferSize = 32 * 1024 * 1024 }) - return lo.Values(kvs), nil + result := make([][]byte, 0, len(strings)) + ml.RLock() + defer ml.RUnlock() + for _, path := range strings { + result = append(result, kvs[path]) + } + return result, nil }) s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ @@ -298,6 +334,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( }, } + s.task.binlogIO = s.mockBinlogIO s.task.plan.Schema = genCollectionSchema() s.task.plan.ClusteringKeyField = 100 s.task.plan.PreferSegmentRows = 3000 @@ -365,7 +402,26 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) s.NoError(err) - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) + var ml sync.RWMutex + s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, m map[string][]byte) error { + ml.Lock() + defer ml.Unlock() + for k, v := range m { + kvs[k] = v + } + return nil + }) + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, strings []string) ([][]byte, error) { + result := make([][]byte, 0, len(strings)) + ml.RLock() + defer ml.RUnlock() + for _, path := range strings { + result = append(result, kvs[path]) + } + return result, nil + }) s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ { @@ -374,6 +430,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { }, } + s.task.binlogIO = s.mockBinlogIO s.task.bm25FieldIds = []int64{102} s.task.plan.Schema = schema s.task.plan.ClusteringKeyField = 100