milvus/pkg/util/metautil/binlog.go
sijie-ni-0214 0a54c93227
fix: etcd RPC size limit exceeded when dropping collection (#46414)
issue: https://github.com/milvus-io/milvus/issues/46410

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: etcd metadata and in-memory Segment/TextIndex records
must store only compact filenames for text-index files; full object keys
are deterministically reconstructed at use-sites from a stable root +
common.TextIndexPath + IDs via metautil.BuildTextLogPaths.

- Bug & fix (issue #46410): the etcd RPC size overflow was caused by
persisting full upload keys in segment/TextIndex metadata. Fix: at
upload/creation sites (internal/datanode/compactor/sort_compaction.go
and internal/datanode/index/task_stats.go) store only filenames using
metautil.ExtractTextLogFilenames; at consumption/use sites
(internal/datacoord/garbage_collector.go,
internal/querynodev2/segments/segment.go, and other GC/loader code)
reconstruct full paths with metautil.BuildTextLogPaths before accessing
object storage.

- Simplified/removed logic: removed the redundant practice of carrying
full object keys through metadata and in-memory structures; callers now
persist compact filenames and perform on-demand path reconstruction.
This eliminates large payloads in etcd and reduces memory pressure while
preserving the same runtime control flow and error handling.

- No data loss / no regression: filename extraction is a deterministic
suffix operation (metautil.ExtractTextLogFilenames) and reloadFromKV
performs backward compatibility (internal/datacoord/meta.go converts
existing full-path entries to filenames before caching). All read paths
reconstruct full paths at runtime (garbage_collector.getTextLogs,
LocalSegment.LoadTextIndex, GC/loader), so no files are modified/deleted
and access semantics remain identical.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: sijie-ni-0214 <sijie.ni@zilliz.com>
2025-12-28 15:31:19 +08:00

131 lines
4.0 KiB
Go

package metautil
import (
"path"
"strconv"
"strings"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const pathSep = "/"
func BuildInsertLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string {
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID)
return path.Join(rootPath, common.SegmentInsertLogPath, k)
}
func ParseInsertLogPath(path string) (collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID, ok bool) {
infos := strings.Split(path, pathSep)
l := len(infos)
if l < 6 {
ok = false
return
}
var err error
if collectionID, err = strconv.ParseInt(infos[l-5], 10, 64); err != nil {
return 0, 0, 0, 0, 0, false
}
if partitionID, err = strconv.ParseInt(infos[l-4], 10, 64); err != nil {
return 0, 0, 0, 0, 0, false
}
if segmentID, err = strconv.ParseInt(infos[l-3], 10, 64); err != nil {
return 0, 0, 0, 0, 0, false
}
if fieldID, err = strconv.ParseInt(infos[l-2], 10, 64); err != nil {
return 0, 0, 0, 0, 0, false
}
if logID, err = strconv.ParseInt(infos[l-1], 10, 64); err != nil {
return 0, 0, 0, 0, 0, false
}
ok = true
return
}
func GetSegmentIDFromInsertLogPath(logPath string) typeutil.UniqueID {
return getSegmentIDFromPath(logPath, 3)
}
func BuildStatsLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string {
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID)
return path.Join(rootPath, common.SegmentStatslogPath, k)
}
func BuildBm25LogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string {
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID)
return path.Join(rootPath, common.SegmentBm25LogPath, k)
}
func GetSegmentIDFromStatsLogPath(logPath string) typeutil.UniqueID {
return getSegmentIDFromPath(logPath, 3)
}
func BuildDeltaLogPath(rootPath string, collectionID, partitionID, segmentID, logID typeutil.UniqueID) string {
k := JoinIDPath(collectionID, partitionID, segmentID, logID)
return path.Join(rootPath, common.SegmentDeltaLogPath, k)
}
func GetSegmentIDFromDeltaLogPath(logPath string) typeutil.UniqueID {
return getSegmentIDFromPath(logPath, 2)
}
func getSegmentIDFromPath(logPath string, segmentIndex int) typeutil.UniqueID {
infos := strings.Split(logPath, pathSep)
l := len(infos)
if l < segmentIndex {
return 0
}
v, err := strconv.ParseInt(infos[l-segmentIndex], 10, 64)
if err != nil {
return 0
}
return v
}
// JoinIDPath joins ids to path format.
func JoinIDPath(ids ...typeutil.UniqueID) string {
idStr := make([]string, 0, len(ids))
for _, id := range ids {
idStr = append(idStr, strconv.FormatInt(id, 10))
}
return path.Join(idStr...)
}
// ExtractTextLogFilenames extracts only filenames from full paths to save space.
// It takes a slice of full paths and returns a slice of filenames.
func ExtractTextLogFilenames(files []string) []string {
filenames := make([]string, 0, len(files))
for _, fullPath := range files {
idx := strings.LastIndex(fullPath, pathSep)
if idx < 0 {
filenames = append(filenames, fullPath)
} else {
filenames = append(filenames, fullPath[idx+1:])
}
}
return filenames
}
// BuildTextLogPaths reconstructs full paths from filenames for text index logs.
// Files stored in TextIndexStats only contain filenames to save space.
func BuildTextLogPaths(rootPath string, buildID, version, collectionID, partitionID, segmentID, fieldID typeutil.UniqueID, filenames []string) []string {
prefix := path.Join(
rootPath,
common.TextIndexPath,
strconv.FormatInt(buildID, 10),
strconv.FormatInt(version, 10),
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(partitionID, 10),
strconv.FormatInt(segmentID, 10),
strconv.FormatInt(fieldID, 10),
)
fullPaths := make([]string, 0, len(filenames))
for _, filename := range filenames {
fullPaths = append(fullPaths, path.Join(prefix, filename))
}
return fullPaths
}