From c2b8b5fe842f1043c6b70ebfe3be44f9bdf814b9 Mon Sep 17 00:00:00 2001 From: wayblink Date: Sun, 21 Jul 2024 19:23:40 +0800 Subject: [PATCH] enhance: refine clustering compaction configs and logs (#34784) #30633 --------- Signed-off-by: wayblink --- configs/milvus.yaml | 33 ++++++++++--------- .../datacoord/compaction_policy_clustering.go | 14 ++++---- internal/datacoord/compaction_policy_l0.go | 1 + pkg/util/paramtable/component_param.go | 6 ++-- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 52b4e52528..4a2a62cdc7 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -466,28 +466,29 @@ dataCoord: maxParallelTaskNum: 10 workerMaxParallelTaskNum: 2 clustering: - enable: true - autoEnable: false - triggerInterval: 600 + enable: true # Enable clustering compaction + autoEnable: false # Enable auto background clustering compaction + triggerInterval: 600 # clustering compaction trigger interval in seconds stateCheckInterval: 10 gcInterval: 600 - minInterval: 3600 - maxInterval: 259200 - newDataRatioThreshold: 0.2 - newDataSizeThreshold: 512m - timeout: 7200 + minInterval: 3600 # The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction + maxInterval: 259200 # If a collection haven't been clustering compacted for longer than maxInterval, force compact + newDataSizeThreshold: 512m # If new data size is large than newDataSizeThreshold, execute clustering compaction + timeout: 7200 # timeout in seconds for clustering compaction, the task will stop if timeout dropTolerance: 86400 # clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize]. # data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment # buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often preferSegmentSize: 512m maxSegmentSize: 1024m - maxTrainSizeRatio: 0.8 # max data size ratio in analyze, if data is larger than it, will down sampling to meet this limit - maxCentroidsNum: 10240 - minCentroidsNum: 16 - minClusterSizeRatio: 0.01 - maxClusterSizeRatio: 10 - maxClusterSize: 5g + + # vector clustering related + maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit + maxCentroidsNum: 10240 # maximum centroids number in Kmeans train + minCentroidsNum: 16 # minimum centroids number in Kmeans train + minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train + maxClusterSizeRatio: 10 #maximum cluster size / avg size in Kmeans train + maxClusterSize: 5g # maximum cluster size in Kmeans train levelzero: forceTrigger: @@ -579,8 +580,8 @@ dataNode: slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. clusteringCompaction: - memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage. - workPoolSize: 8 + memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage. + workPoolSize: 8 # worker pool size for one clustering compaction job streamingNode: # can specify ip for example diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 847346ce02..c8f201da8a 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -53,6 +53,7 @@ func (policy *clusteringCompactionPolicy) Enable() bool { } func (policy *clusteringCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { + log.Info("start trigger clusteringCompactionPolicy...") ctx := context.Background() collections := policy.meta.GetCollections() ts, err := policy.allocator.allocTimestamp(ctx) @@ -97,7 +98,8 @@ func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context } func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64, ts Timestamp, manual bool) ([]CompactionView, int64, error) { - log.Info("trigger collection clustering compaction", zap.Int64("collectionID", collectionID)) + log := log.With(zap.Int64("collectionID", collectionID)) + log.Info("trigger collection clustering compaction") collection, err := policy.handler.GetCollection(ctx, collectionID) if err != nil { log.Warn("fail to get collection") @@ -105,6 +107,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte } clusteringKeyField := clustering.GetClusteringKeyField(collection.Schema) if clusteringKeyField == nil { + log.Info("the collection has no clustering key, skip tigger clustering compaction") return nil, 0, nil } @@ -120,7 +123,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte compacting, triggerID := policy.collectionIsClusteringCompacting(collection.ID) if compacting { - log.Info("collection is clustering compacting", zap.Int64("collectionID", collection.ID), zap.Int64("triggerID", triggerID)) + log.Info("collection is clustering compacting", zap.Int64("triggerID", triggerID)) return nil, triggerID, nil } @@ -142,10 +145,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte views := make([]CompactionView, 0) // partSegments is list of chanPartSegments, which is channel-partition organized segments for _, group := range partSegments { - log := log.Ctx(ctx).With(zap.Int64("collectionID", group.collectionID), - zap.Int64("partitionID", group.partitionID), - zap.String("channel", group.channelName)) - + log := log.With(zap.Int64("partitionID", group.partitionID), zap.String("channel", group.channelName)) if !policy.checkAllL2SegmentsContains(ctx, group.collectionID, group.partitionID, group.channelName) { log.Warn("clustering compaction cannot be done, otherwise the performance will fall back") continue @@ -184,7 +184,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte views = append(views, view) } - log.Info("trigger collection clustering compaction", zap.Int64("collectionID", collectionID), zap.Int("viewNum", len(views))) + log.Info("finish trigger collection clustering compaction", zap.Int("viewNum", len(views))) return views, newTriggerID, nil } diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 353e520da5..413a404e59 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -30,6 +30,7 @@ func (policy *l0CompactionPolicy) Enable() bool { } func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { + log.Info("start trigger l0CompactionPolicy...") // support config hot refresh events := policy.generateEventForLevelZeroViewChange() if len(events) != 0 { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 845d0d718f..fc19a1a120 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2824,7 +2824,7 @@ user-task-polling: Key: "queryNode.enableSegmentPrune", Version: "2.3.4", DefaultValue: "false", - Doc: "use partition prune function on shard delegator", + Doc: "use partition stats to prune data in search/query on shard delegator", Export: true, } p.EnableSegmentPrune.Init(base.mgr) @@ -3369,6 +3369,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.clustering.triggerInterval", Version: "2.4.6", DefaultValue: "600", + Doc: "clustering compaction trigger interval in seconds", } p.ClusteringCompactionTriggerInterval.Init(base.mgr) @@ -3414,6 +3415,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.clustering.timeout", Version: "2.4.6", DefaultValue: "3600", + Doc: "timeout in seconds for clustering compaction, the task will stop if timeout", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) @@ -4163,7 +4165,7 @@ if this parameter <= 0, will set it as 10`, p.ClusteringCompactionMemoryBufferRatio = ParamItem{ Key: "dataNode.clusteringCompaction.memoryBufferRatio", Version: "2.4.6", - Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.", + Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.", DefaultValue: "0.1", PanicIfEmpty: false, Export: true,