diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0967f87aa6..8db2e57242 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -569,6 +569,11 @@ dataCoord: # exceeds this threshold, the earliest growing segments will be sealed. blockingL0SizeInMB: 64 autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version + forceRebuildSegmentIndex: false # force rebuild segment index to specify index engine's version + # if param forceRebuildSegmentIndex is enabled, the vector index will be rebuilt to aligned with targetVecIndexVersion. + # if param forceRebuildSegmentIndex is not enabled, the newly created vector index will be aligned with the newer one of index engine's version and targetVecIndexVersion. + # if param targetVecIndexVersion is not set, the default value is -1, which means no target vec index version, then the vector index will be aligned with index engine's version + targetVecIndexVersion: -1 segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment # Switch value to control if to enable segment compaction. # Compaction merges small-size segments into a large segment, and clears the entities deleted beyond the rentention duration of Time Travel. diff --git a/internal/core/src/config/ConfigKnowhere.cpp b/internal/core/src/config/ConfigKnowhere.cpp index 29d0f1134e..f1f6991f77 100644 --- a/internal/core/src/config/ConfigKnowhere.cpp +++ b/internal/core/src/config/ConfigKnowhere.cpp @@ -120,4 +120,9 @@ GetCurrentIndexVersion() { return knowhere::Version::GetCurrentVersion().VersionNumber(); } +int32_t +GetMaximumIndexVersion() { + return knowhere::Version::GetMaximumVersion().VersionNumber(); +} + } // namespace milvus::config diff --git a/internal/core/src/config/ConfigKnowhere.h b/internal/core/src/config/ConfigKnowhere.h index 57a0713014..18239c4a37 100644 --- a/internal/core/src/config/ConfigKnowhere.h +++ b/internal/core/src/config/ConfigKnowhere.h @@ -41,6 +41,9 @@ GetMinimalIndexVersion(); int32_t GetCurrentIndexVersion(); +int32_t +GetMaximumIndexVersion(); + void KnowhereInitGPUMemoryPool(const uint32_t init_size, const uint32_t max_size); diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index eadb96a8bd..6c2fe5516b 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -107,6 +107,11 @@ GetMinimalIndexVersion() { return milvus::config::GetMinimalIndexVersion(); } +extern "C" int32_t +GetMaximumIndexVersion() { + return milvus::config::GetMaximumIndexVersion(); +} + extern "C" void SetThreadName(const char* name) { #ifdef __linux__ diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index 1c1cf0dac2..8dcc6c7e0d 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -59,6 +59,9 @@ GetCurrentIndexVersion(); int32_t GetMinimalIndexVersion(); +int32_t +GetMaximumIndexVersion(); + void SetThreadName(const char*); diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 942bc2d9a0..418b73c90a 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -14,10 +14,10 @@ # Update KNOWHERE_VERSION for the first occurrence milvus_add_pkg_config("knowhere") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( KNOWHERE_VERSION 810a0c8d ) +set( KNOWHERE_VERSION f8c4269) set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git") message(STATUS "Knowhere repo: ${GIT_REPOSITORY}") -message(STATUS "Knowhere version: ${KNOWHERE_VERSION}") + message(STATUS "Knowhere version: ${KNOWHERE_VERSION}") message(STATUS "Building knowhere-${KNOWHERE_SOURCE_VER} from source") message(STATUS ${CMAKE_BUILD_TYPE}) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index dd291abb8b..1b1727230e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -700,6 +700,14 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa return true } + if t.ShouldRebuildSegmentIndex(segment) { + return true + } + + return false +} + +func (t *compactionTrigger) ShouldRebuildSegmentIndex(segment *SegmentInfo) bool { if Params.DataCoordCfg.AutoUpgradeSegmentIndex.GetAsBool() { // index version of segment lower than current version and IndexFileKeys should have value, trigger compaction indexIDToSegIdxes := t.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID) @@ -717,6 +725,24 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa } } + // enable force rebuild index with target index version + if Params.DataCoordCfg.ForceRebuildSegmentIndex.GetAsBool() && Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt64() != -1 { + // index version of segment lower than current version and IndexFileKeys should have value, trigger compaction + indexIDToSegIdxes := t.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID) + for _, index := range indexIDToSegIdxes { + if index.CurrentIndexVersion != Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32() && + len(index.IndexFileKeys) > 0 { + log.Info("index version is not equal to target vec index version, trigger compaction", + zap.Int64("segmentID", segment.ID), + zap.Int64("indexID", index.IndexID), + zap.Strings("indexFileKeys", index.IndexFileKeys), + zap.Int32("currentIndexVersion", index.CurrentIndexVersion), + zap.Int32("targetIndexVersion", Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32())) + return true + } + } + } + return false } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 30c3379918..192cdfb409 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -2062,6 +2062,13 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}) assert.False(t, couldDo) + Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "true") + defer Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "false") + Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "5") + defer Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "-1") + couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}) + assert.True(t, couldDo) + indexMeta.updateSegmentIndex(&model.SegmentIndex{ SegmentID: 1, IndexID: 101, diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 6c76d3a22b..b2531bd5e7 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -253,6 +253,19 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule Key: common.IndexNonEncoding, Value: indexNonEncoding, }) + + currentVecIndexVersion := dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion() + // if specify target vec index version, use it with high priority + if Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt64() != -1 { + // if force rebuild segment index is true, use target vec index version directly + if Params.DataCoordCfg.ForceRebuildSegmentIndex.GetAsBool() { + currentVecIndexVersion = Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32() + } else { + // if force rebuild segment index is not enabled, use newer index version between current index version and target index version + currentVecIndexVersion = max(currentVecIndexVersion, Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32()) + } + } + it.req = &workerpb.CreateJobRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), @@ -262,7 +275,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule IndexParams: indexParams, TypeParams: typeParams, NumRows: segIndex.NumRows, - CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + CurrentIndexVersion: currentVecIndexVersion, CurrentScalarIndexVersion: dependency.indexEngineVersionManager.GetCurrentScalarIndexEngineVersion(), CollectionID: segment.GetCollectionID(), PartitionID: segment.GetPartitionID(), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index aaa3799d37..2bb2c313a3 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3610,6 +3610,8 @@ type dataCoordConfig struct { SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"` GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"` AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` + ForceRebuildSegmentIndex ParamItem `refreshable:"true"` + TargetVecIndexVersion ParamItem `refreshable:"true"` SegmentFlushInterval ParamItem `refreshable:"true"` BlockingL0EntryNum ParamItem `refreshable:"true"` BlockingL0SizeInMB ParamItem `refreshable:"true"` @@ -4477,6 +4479,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.AutoUpgradeSegmentIndex.Init(base.mgr) + p.ForceRebuildSegmentIndex = ParamItem{ + Key: "dataCoord.forceRebuildSegmentIndex", + Version: "2.5.10", + DefaultValue: "false", + PanicIfEmpty: true, + Doc: "force rebuild segment index to specify index engine's version", + Export: true, + } + p.ForceRebuildSegmentIndex.Init(base.mgr) + + p.TargetVecIndexVersion = ParamItem{ + Key: "dataCoord.targetVecIndexVersion", + Version: "2.5.10", + DefaultValue: "-1", + PanicIfEmpty: true, + Doc: `if param forceRebuildSegmentIndex is enabled, the vector index will be rebuilt to aligned with targetVecIndexVersion. +if param forceRebuildSegmentIndex is not enabled, the newly created vector index will be aligned with the newer one of index engine's version and targetVecIndexVersion. +if param targetVecIndexVersion is not set, the default value is -1, which means no target vec index version, then the vector index will be aligned with index engine's version `, + Export: true, + } + p.TargetVecIndexVersion.Init(base.mgr) + p.SegmentFlushInterval = ParamItem{ Key: "dataCoord.segmentFlushInterval", Version: "2.4.6",