enhance: Get compaction params from request (#41125)

Make DataNode use compaction parameters from request instead of
configuration.

issue: https://github.com/milvus-io/milvus/issues/41123

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-04-15 10:28:53 +08:00 committed by GitHub
parent bc11feae74
commit dccfc69660
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1443 additions and 1195 deletions

View File

@ -0,0 +1,57 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type Params struct {
EnableStorageV2 bool `json:"enable_storage_v2,omitempty"`
BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"`
UseMergeSort bool `json:"use_merge_sort,omitempty"`
MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"`
}
func genParams() Params {
return Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
}
}
func GenerateJSONParams() (string, error) {
compactionParams := genParams()
params, err := json.Marshal(compactionParams)
if err != nil {
return "", err
}
return string(params), nil
}
func ParseParamsFromJSON(jsonStr string) (Params, error) {
var compactionParams Params
err := json.Unmarshal([]byte(jsonStr), &compactionParams)
if err != nil && jsonStr == "" {
// Ensure the compatibility with the legacy requests sent by the old datacoord.
return genParams(), nil
}
return compactionParams, err
}

View File

@ -0,0 +1,81 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestGetJSONParams(t *testing.T) {
paramtable.Init()
jsonStr, err := GenerateJSONParams()
assert.NoError(t, err)
var result Params
err = json.Unmarshal([]byte(jsonStr), &result)
assert.NoError(t, err)
assert.Equal(t, Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
}, result)
}
func TestGetParamsFromJSON(t *testing.T) {
input := `{
"enable_storage_v2": false,
"binlog_max_size": 4096,
"use_merge_sort": false,
"max_segment_merge_sort": 2
}`
expected := Params{
EnableStorageV2: false,
BinLogMaxSize: 4096,
UseMergeSort: false,
MaxSegmentMergeSort: 2,
}
result, err := ParseParamsFromJSON(input)
assert.NoError(t, err)
assert.Equal(t, expected, result)
}
func TestGetParamsFromJSON_InvalidJSON(t *testing.T) {
invalidJSON := `{ this is not valid json }`
_, err := ParseParamsFromJSON(invalidJSON)
assert.Error(t, err)
}
func TestGetParamsFromJSON_EmptyJSON(t *testing.T) {
// Test compatibility
emptyJSON := ``
result, err := ParseParamsFromJSON(emptyJSON)
assert.NoError(t, err)
assert.Equal(t, Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
}, result)
}

View File

@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/storage"
@ -190,6 +191,10 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
if err != nil {
return nil, err
}
compactionParams, err := compaction.GenerateJSONParams()
if err != nil {
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
@ -209,6 +214,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
PreAllocatedLogIDs: logIDRange,
SlotUsage: t.GetSlotUsage(),
MaxSize: taskProto.GetMaxSize(),
JsonParams: compactionParams,
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/common"
@ -282,6 +283,10 @@ func (t *l0CompactionTask) PreparePlan() bool {
}
func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
compactionParams, err := compaction.GenerateJSONParams()
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
@ -293,6 +298,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
SlotUsage: t.GetSlotUsage(),
JsonParams: compactionParams,
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

View File

@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -306,6 +307,10 @@ func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
compactionParams, err := compaction.GenerateJSONParams()
if err != nil {
return nil, err
}
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
@ -320,6 +325,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: taskProto.GetMaxSize(),
JsonParams: compactionParams,
}
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
@ -526,6 +527,11 @@ func Test_compactionTrigger_force(t *testing.T) {
im.segmentIndexes.Insert(2, segIdx2)
im.segmentIndexes.Insert(3, segIdx3)
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
}
tests := []struct {
name string
fields fields
@ -631,6 +637,7 @@ func Test_compactionTrigger_force(t *testing.T) {
PreAllocatedLogIDs: &datapb.IDRange{Begin: 100, End: 200},
MaxSize: 1342177280,
SlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
JsonParams: params,
},
},
},

View File

@ -316,6 +316,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
buckets, containsNull := t.splitClusterByScalarValue(analyzeDict)
scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0)
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
for id, bucket := range buckets {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
@ -326,7 +330,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer)
@ -342,7 +349,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, nullBuffer)
@ -395,7 +405,14 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
fieldStats.SetVectorCentroids(centroidValues...)
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer)

View File

@ -53,6 +53,7 @@ func (s *ClusteringCompactionTaskStorageV2Suite) SetupTest() {
paramtable.Get().Save("common.storageType", "local")
paramtable.Get().Save("common.storage.enableV2", "true")
initcore.InitStorageV2FileSystem(paramtable.Get())
refreshPlanParams(s.plan)
}
func (s *ClusteringCompactionTaskStorageV2Suite) TearDownTest() {
@ -143,6 +144,7 @@ func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormal_V2To
func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormal_V2ToV1Format() {
paramtable.Get().Save("common.storage.enableV2", "false")
refreshPlanParams(s.plan)
var segmentID int64 = 1001
@ -220,6 +222,7 @@ func (s *ClusteringCompactionTaskStorageV2Suite) TestCompactionWithBM25Function(
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
refreshPlanParams(s.plan)
s.prepareCompactionWithBM25FunctionTask()
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
@ -242,6 +245,7 @@ func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormalByMem
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
refreshPlanParams(s.plan)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
@ -41,6 +42,14 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func refreshPlanParams(plan *datapb.CompactionPlan) {
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
}
plan.JsonParams = params
}
func TestClusteringCompactionTaskSuite(t *testing.T) {
suite.Run(t, new(ClusteringCompactionTaskSuite))
}
@ -81,6 +90,10 @@ func (s *ClusteringCompactionTaskSuite) setupTest() {
s.task = NewClusteringCompactionTask(context.Background(), s.mockBinlogIO, nil)
paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
}
s.plan = &datapb.CompactionPlan{
PlanID: 999,
@ -96,6 +109,7 @@ func (s *ClusteringCompactionTaskSuite) setupTest() {
Begin: 200,
End: 2000,
},
JsonParams: params,
}
s.task.plan = s.plan
}
@ -238,6 +252,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "60000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
refreshPlanParams(s.plan)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
@ -331,6 +346,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
refreshPlanParams(s.plan)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
@ -411,6 +427,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "50000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
refreshPlanParams(s.plan)
s.prepareCompactionWithBM25FunctionTask()
compactionResult, err := s.task.Compact()

View File

@ -41,7 +41,14 @@ func mergeSortMultipleSegments(ctx context.Context,
segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegmentIDs().GetBegin(), plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedLogIDs().GetBegin(), plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
writer := NewMultiSegmentWriter(ctx, binlogIO, compAlloc, plan.GetMaxSize(), plan.GetSchema(), maxRows, partitionID, collectionID, plan.GetChannel(), 4096)
compactionParams, err := compaction.ParseParamsFromJSON(plan.GetJsonParams())
if err != nil {
return nil, err
}
writer, err := NewMultiSegmentWriter(ctx, binlogIO, compAlloc, plan.GetMaxSize(), plan.GetSchema(), compactionParams, maxRows, partitionID, collectionID, plan.GetChannel(), 4096)
if err != nil {
return nil, err
}
pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema())
if err != nil {

View File

@ -143,7 +143,14 @@ func (t *mixCompactionTask) mergeSplit(
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return nil, err
}
mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
if err != nil {
return nil, err
}
deletedRowCount := int64(0)
expiredRowCount := int64(0)
@ -320,7 +327,12 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, errors.New("illegal compaction plan")
}
sortMergeAppicable := paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool()
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return nil, err
}
sortMergeAppicable := compactionParams.UseMergeSort
if sortMergeAppicable {
for _, segment := range t.plan.GetSegmentBinlogs() {
if !segment.GetIsSorted() {
@ -329,14 +341,13 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
}
}
if len(t.plan.GetSegmentBinlogs()) <= 1 ||
len(t.plan.GetSegmentBinlogs()) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() {
len(t.plan.GetSegmentBinlogs()) > compactionParams.MaxSegmentMergeSort {
// sort merge is not applicable if there is only one segment or too many segments
sortMergeAppicable = false
}
}
var res []*datapb.CompactionSegment
var err error
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,

View File

@ -57,6 +57,7 @@ func (s *MixCompactionTaskStorageV2Suite) SetupTest() {
paramtable.Get().Save("common.storageType", "local")
paramtable.Get().Save("common.storage.enableV2", "true")
initcore.InitStorageV2FileSystem(paramtable.Get())
refreshPlanParams(s.task.plan)
}
func (s *MixCompactionTaskStorageV2Suite) TearDownTest() {
@ -177,6 +178,7 @@ func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK_V2ToV2Format() {
func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK_V2ToV1Format() {
paramtable.Get().Save("common.storage.enableV2", "false")
refreshPlanParams(s.task.plan)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
@ -241,6 +243,7 @@ func (s *MixCompactionTaskStorageV2Suite) TestCompactSortedSegment() {
s.prepareCompactSortedSegment()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
refreshPlanParams(s.task.plan)
result, err := s.task.Compact()
s.NoError(err)
@ -265,6 +268,7 @@ func (s *MixCompactionTaskStorageV2Suite) TestCompactSortedSegmentLackBinlog() {
s.prepareCompactSortedSegmentLackBinlog()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
refreshPlanParams(s.task.plan)
result, err := s.task.Compact()
s.NoError(err)

View File

@ -71,6 +71,10 @@ func (s *MixCompactionTaskSuite) setupTest() {
s.meta = genTestCollectionMeta()
paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
}
plan := &datapb.CompactionPlan{
PlanID: 999,
@ -86,6 +90,7 @@ func (s *MixCompactionTaskSuite) setupTest() {
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 19531, End: math.MaxInt64},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 9530, End: 19530},
MaxSize: 64 * 1024 * 1024,
JsonParams: params,
}
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan)
@ -98,6 +103,10 @@ func (s *MixCompactionTaskSuite) SetupTest() {
func (s *MixCompactionTaskSuite) SetupBM25() {
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMetaWithBM25()
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
}
plan := &datapb.CompactionPlan{
PlanID: 999,
@ -113,6 +122,7 @@ func (s *MixCompactionTaskSuite) SetupBM25() {
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 19531, End: math.MaxInt64},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 9530, End: 19530},
MaxSize: 64 * 1024 * 1024,
JsonParams: params,
}
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan)
@ -325,6 +335,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
s.prepareCompactSortedSegment()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
refreshPlanParams(s.task.plan)
result, err := s.task.Compact()
s.NoError(err)
@ -404,6 +415,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() {
s.prepareCompactSortedSegmentLackBinlog()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
refreshPlanParams(s.task.plan)
result, err := s.task.Compact()
s.NoError(err)

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
@ -55,11 +56,12 @@ type MultiSegmentWriter struct {
// segmentSize might be changed dynamicly. To make sure a compaction plan is static,
// The target segmentSize is defined when creating the compaction plan.
schema *schemapb.CollectionSchema
partitionID int64
collectionID int64
channel string
batchSize int
schema *schemapb.CollectionSchema
partitionID int64
collectionID int64
channel string
batchSize int
binLogMaxSize uint64
res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
@ -85,13 +87,15 @@ func (alloc *compactionAlloactor) allocSegmentID() (typeutil.UniqueID, error) {
}
func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64,
schema *schemapb.CollectionSchema,
schema *schemapb.CollectionSchema, params compaction.Params,
maxRows int64, partitionID, collectionID int64, channel string, batchSize int, rwOption ...storage.RwOption,
) *MultiSegmentWriter {
) (*MultiSegmentWriter, error) {
storageVersion := storage.StorageV1
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
if params.EnableStorageV2 {
storageVersion = storage.StorageV2
}
rwOpts := rwOption
if len(rwOption) == 0 {
rwOpts = make([]storage.RwOption, 0)
@ -107,10 +111,11 @@ func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator
collectionID: collectionID,
channel: channel,
batchSize: batchSize,
binLogMaxSize: params.BinLogMaxSize,
res: make([]*datapb.CompactionSegment, 0),
storageVersion: storageVersion,
rwOption: rwOpts,
}
}, nil
}
func (w *MultiSegmentWriter) closeWriter() error {
@ -155,7 +160,7 @@ func (w *MultiSegmentWriter) rotateWriter() error {
}
w.currentSegmentID = newSegmentID
chunkSize := paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
chunkSize := w.binLogMaxSize
rootPath := binlog.GetRootPath()
w.rwOption = append(w.rwOption,

View File

@ -225,9 +225,7 @@ func (node *DataNode) Init() error {
var initError error
node.initOnce.Do(func() {
node.registerMetricsRequest()
log.Ctx(node.ctx).Info("DataNode server initializing",
zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()),
)
log.Ctx(node.ctx).Info("DataNode server initializing")
if err := node.initSession(); err != nil {
log.Error("DataNode server init session failed", zap.Error(err))
initError = err
@ -251,8 +249,7 @@ func (node *DataNode) Init() error {
node.allocator = alloc
node.factory.Init(Params)
log.Info("DataNode server init succeeded",
zap.String("MsgChannelSubName", Params.CommonCfg.DataNodeSubName.GetValue()))
log.Info("DataNode server init succeeded")
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
if err != nil {

View File

@ -643,6 +643,7 @@ message CompactionPlan {
// bf path for importing
// collection is importing
IDRange pre_allocated_logIDs = 21;
string json_params = 22;
}
message CompactionSegment {

File diff suppressed because it is too large Load Diff