enhance: add force rebuild index configuration (#41473)

issue: #41431

Signed-off-by: xianliang.li <xianliang.li@zilliz.com>
This commit is contained in:
foxspy 2025-04-29 16:20:56 +08:00 committed by GitHub
parent b6f3fd0de1
commit 1d99f8bd67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 94 additions and 3 deletions

View File

@ -569,6 +569,11 @@ dataCoord:
# exceeds this threshold, the earliest growing segments will be sealed.
blockingL0SizeInMB: 64
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
forceRebuildSegmentIndex: false # force rebuild segment index to specify index engine's version
# if param forceRebuildSegmentIndex is enabled, the vector index will be rebuilt to aligned with targetVecIndexVersion.
# if param forceRebuildSegmentIndex is not enabled, the newly created vector index will be aligned with the newer one of index engine's version and targetVecIndexVersion.
# if param targetVecIndexVersion is not set, the default value is -1, which means no target vec index version, then the vector index will be aligned with index engine's version
targetVecIndexVersion: -1
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
# Switch value to control if to enable segment compaction.
# Compaction merges small-size segments into a large segment, and clears the entities deleted beyond the rentention duration of Time Travel.

View File

@ -120,4 +120,9 @@ GetCurrentIndexVersion() {
return knowhere::Version::GetCurrentVersion().VersionNumber();
}
int32_t
GetMaximumIndexVersion() {
return knowhere::Version::GetMaximumVersion().VersionNumber();
}
} // namespace milvus::config

View File

@ -41,6 +41,9 @@ GetMinimalIndexVersion();
int32_t
GetCurrentIndexVersion();
int32_t
GetMaximumIndexVersion();
void
KnowhereInitGPUMemoryPool(const uint32_t init_size, const uint32_t max_size);

View File

@ -107,6 +107,11 @@ GetMinimalIndexVersion() {
return milvus::config::GetMinimalIndexVersion();
}
extern "C" int32_t
GetMaximumIndexVersion() {
return milvus::config::GetMaximumIndexVersion();
}
extern "C" void
SetThreadName(const char* name) {
#ifdef __linux__

View File

@ -59,6 +59,9 @@ GetCurrentIndexVersion();
int32_t
GetMinimalIndexVersion();
int32_t
GetMaximumIndexVersion();
void
SetThreadName(const char*);

View File

@ -14,10 +14,10 @@
# Update KNOWHERE_VERSION for the first occurrence
milvus_add_pkg_config("knowhere")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( KNOWHERE_VERSION 810a0c8d )
set( KNOWHERE_VERSION f8c4269)
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")
message(STATUS "Building knowhere-${KNOWHERE_SOURCE_VER} from source")
message(STATUS ${CMAKE_BUILD_TYPE})

View File

@ -700,6 +700,14 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
return true
}
if t.ShouldRebuildSegmentIndex(segment) {
return true
}
return false
}
func (t *compactionTrigger) ShouldRebuildSegmentIndex(segment *SegmentInfo) bool {
if Params.DataCoordCfg.AutoUpgradeSegmentIndex.GetAsBool() {
// index version of segment lower than current version and IndexFileKeys should have value, trigger compaction
indexIDToSegIdxes := t.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
@ -717,6 +725,24 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
}
}
// enable force rebuild index with target index version
if Params.DataCoordCfg.ForceRebuildSegmentIndex.GetAsBool() && Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt64() != -1 {
// index version of segment lower than current version and IndexFileKeys should have value, trigger compaction
indexIDToSegIdxes := t.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexIDToSegIdxes {
if index.CurrentIndexVersion != Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32() &&
len(index.IndexFileKeys) > 0 {
log.Info("index version is not equal to target vec index version, trigger compaction",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID),
zap.Strings("indexFileKeys", index.IndexFileKeys),
zap.Int32("currentIndexVersion", index.CurrentIndexVersion),
zap.Int32("targetIndexVersion", Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32()))
return true
}
}
}
return false
}

View File

@ -2062,6 +2062,13 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300})
assert.False(t, couldDo)
Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "true")
defer Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "false")
Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "5")
defer Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "-1")
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300})
assert.True(t, couldDo)
indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: 1,
IndexID: 101,

View File

@ -253,6 +253,19 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
Key: common.IndexNonEncoding,
Value: indexNonEncoding,
})
currentVecIndexVersion := dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion()
// if specify target vec index version, use it with high priority
if Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt64() != -1 {
// if force rebuild segment index is true, use target vec index version directly
if Params.DataCoordCfg.ForceRebuildSegmentIndex.GetAsBool() {
currentVecIndexVersion = Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32()
} else {
// if force rebuild segment index is not enabled, use newer index version between current index version and target index version
currentVecIndexVersion = max(currentVecIndexVersion, Params.DataCoordCfg.TargetVecIndexVersion.GetAsInt32())
}
}
it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
@ -262,7 +275,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: segIndex.NumRows,
CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
CurrentIndexVersion: currentVecIndexVersion,
CurrentScalarIndexVersion: dependency.indexEngineVersionManager.GetCurrentScalarIndexEngineVersion(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),

View File

@ -3610,6 +3610,8 @@ type dataCoordConfig struct {
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
ForceRebuildSegmentIndex ParamItem `refreshable:"true"`
TargetVecIndexVersion ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`
BlockingL0EntryNum ParamItem `refreshable:"true"`
BlockingL0SizeInMB ParamItem `refreshable:"true"`
@ -4477,6 +4479,28 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.AutoUpgradeSegmentIndex.Init(base.mgr)
p.ForceRebuildSegmentIndex = ParamItem{
Key: "dataCoord.forceRebuildSegmentIndex",
Version: "2.5.10",
DefaultValue: "false",
PanicIfEmpty: true,
Doc: "force rebuild segment index to specify index engine's version",
Export: true,
}
p.ForceRebuildSegmentIndex.Init(base.mgr)
p.TargetVecIndexVersion = ParamItem{
Key: "dataCoord.targetVecIndexVersion",
Version: "2.5.10",
DefaultValue: "-1",
PanicIfEmpty: true,
Doc: `if param forceRebuildSegmentIndex is enabled, the vector index will be rebuilt to aligned with targetVecIndexVersion.
if param forceRebuildSegmentIndex is not enabled, the newly created vector index will be aligned with the newer one of index engine's version and targetVecIndexVersion.
if param targetVecIndexVersion is not set, the default value is -1, which means no target vec index version, then the vector index will be aligned with index engine's version `,
Export: true,
}
p.TargetVecIndexVersion.Init(base.mgr)
p.SegmentFlushInterval = ParamItem{
Key: "dataCoord.segmentFlushInterval",
Version: "2.4.6",