mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: etcd RPC size limit exceeded when dropping collection (#46414)
issue: https://github.com/milvus-io/milvus/issues/46410 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: etcd metadata and in-memory Segment/TextIndex records must store only compact filenames for text-index files; full object keys are deterministically reconstructed at use-sites from a stable root + common.TextIndexPath + IDs via metautil.BuildTextLogPaths. - Bug & fix (issue #46410): the etcd RPC size overflow was caused by persisting full upload keys in segment/TextIndex metadata. Fix: at upload/creation sites (internal/datanode/compactor/sort_compaction.go and internal/datanode/index/task_stats.go) store only filenames using metautil.ExtractTextLogFilenames; at consumption/use sites (internal/datacoord/garbage_collector.go, internal/querynodev2/segments/segment.go, and other GC/loader code) reconstruct full paths with metautil.BuildTextLogPaths before accessing object storage. - Simplified/removed logic: removed the redundant practice of carrying full object keys through metadata and in-memory structures; callers now persist compact filenames and perform on-demand path reconstruction. This eliminates large payloads in etcd and reduces memory pressure while preserving the same runtime control flow and error handling. - No data loss / no regression: filename extraction is a deterministic suffix operation (metautil.ExtractTextLogFilenames) and reloadFromKV performs backward compatibility (internal/datacoord/meta.go converts existing full-path entries to filenames before caching). All read paths reconstruct full paths at runtime (garbage_collector.getTextLogs, LocalSegment.LoadTextIndex, GC/loader), so no files are modified/deleted and access semantics remain identical. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: sijie-ni-0214 <sijie.ni@zilliz.com>
This commit is contained in:
parent
6c8e11da4f
commit
0a54c93227
@ -743,7 +743,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context, signal <
|
||||
binlog.DecompressBinLogs(cloned.SegmentInfo)
|
||||
|
||||
logs := getLogs(cloned)
|
||||
for key := range getTextLogs(cloned) {
|
||||
for key := range getTextLogs(cloned, gc) {
|
||||
logs[key] = struct{}{}
|
||||
}
|
||||
|
||||
@ -868,11 +868,23 @@ func getLogs(sinfo *SegmentInfo) map[string]struct{} {
|
||||
return logs
|
||||
}
|
||||
|
||||
func getTextLogs(sinfo *SegmentInfo) map[string]struct{} {
|
||||
func getTextLogs(sinfo *SegmentInfo, gc *garbageCollector) map[string]struct{} {
|
||||
textLogs := make(map[string]struct{})
|
||||
for _, flog := range sinfo.GetTextStatsLogs() {
|
||||
for _, file := range flog.GetFiles() {
|
||||
textLogs[file] = struct{}{}
|
||||
// Reconstruct full paths from filenames
|
||||
// Files stored in TextIndexStats only contain filenames to save space
|
||||
fullPaths := metautil.BuildTextLogPaths(
|
||||
gc.option.cli.RootPath(),
|
||||
flog.GetBuildID(),
|
||||
flog.GetVersion(),
|
||||
sinfo.GetCollectionID(),
|
||||
sinfo.GetPartitionID(),
|
||||
sinfo.GetID(),
|
||||
flog.GetFieldID(),
|
||||
flog.GetFiles(),
|
||||
)
|
||||
for _, fullPath := range fullPaths {
|
||||
textLogs[fullPath] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -275,6 +275,12 @@ func (m *meta) reloadFromKV(ctx context.Context, broker broker.Broker) error {
|
||||
for _, segments := range collectionSegments {
|
||||
numSegments += len(segments)
|
||||
for _, segment := range segments {
|
||||
// Convert old text log paths (full paths) to filenames to save memory
|
||||
// This handles backward compatibility during recovery
|
||||
for _, textStatsLog := range segment.GetTextStatsLogs() {
|
||||
textStatsLog.Files = metautil.ExtractTextLogFilenames(textStatsLog.GetFiles())
|
||||
}
|
||||
|
||||
// segments from catalog.ListSegments will not have logPath
|
||||
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
|
||||
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc()
|
||||
|
||||
@ -472,13 +472,15 @@ func (t *sortCompactionTask) createTextIndex(ctx context.Context,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Extract only filenames from full paths to save space
|
||||
filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded))
|
||||
|
||||
mu.Lock()
|
||||
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
|
||||
FieldID: field.GetFieldID(),
|
||||
Version: 0,
|
||||
BuildID: taskID,
|
||||
Files: lo.Keys(uploaded),
|
||||
Files: filenames,
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
|
||||
@ -503,13 +503,15 @@ func (st *statsTask) createTextIndex(ctx context.Context,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Extract only filenames from full paths to save space
|
||||
filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded))
|
||||
|
||||
mu.Lock()
|
||||
textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{
|
||||
FieldID: field.GetFieldID(),
|
||||
Version: version,
|
||||
BuildID: taskID,
|
||||
Files: lo.Keys(uploaded),
|
||||
Files: filenames,
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
|
||||
@ -43,6 +43,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments/state"
|
||||
@ -1189,11 +1190,25 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI
|
||||
|
||||
// Text match index mmap config is based on the raw data mmap.
|
||||
enableMmap := isDataMmapEnable(f)
|
||||
|
||||
// Reconstruct full paths from filenames
|
||||
// Files stored in TextIndexStats only contain filenames to save space
|
||||
fullPaths := metautil.BuildTextLogPaths(
|
||||
binlog.GetRootPath(),
|
||||
textLogs.GetBuildID(),
|
||||
textLogs.GetVersion(),
|
||||
s.Collection(),
|
||||
s.Partition(),
|
||||
s.ID(),
|
||||
textLogs.GetFieldID(),
|
||||
textLogs.GetFiles(),
|
||||
)
|
||||
|
||||
cgoProto := &indexcgopb.LoadTextIndexInfo{
|
||||
FieldID: textLogs.GetFieldID(),
|
||||
Version: textLogs.GetVersion(),
|
||||
BuildID: textLogs.GetBuildID(),
|
||||
Files: textLogs.GetFiles(),
|
||||
Files: fullPaths,
|
||||
Schema: f,
|
||||
CollectionID: s.Collection(),
|
||||
PartitionID: s.Partition(),
|
||||
|
||||
@ -92,3 +92,39 @@ func JoinIDPath(ids ...typeutil.UniqueID) string {
|
||||
}
|
||||
return path.Join(idStr...)
|
||||
}
|
||||
|
||||
// ExtractTextLogFilenames extracts only filenames from full paths to save space.
|
||||
// It takes a slice of full paths and returns a slice of filenames.
|
||||
func ExtractTextLogFilenames(files []string) []string {
|
||||
filenames := make([]string, 0, len(files))
|
||||
for _, fullPath := range files {
|
||||
idx := strings.LastIndex(fullPath, pathSep)
|
||||
if idx < 0 {
|
||||
filenames = append(filenames, fullPath)
|
||||
} else {
|
||||
filenames = append(filenames, fullPath[idx+1:])
|
||||
}
|
||||
}
|
||||
return filenames
|
||||
}
|
||||
|
||||
// BuildTextLogPaths reconstructs full paths from filenames for text index logs.
|
||||
// Files stored in TextIndexStats only contain filenames to save space.
|
||||
func BuildTextLogPaths(rootPath string, buildID, version, collectionID, partitionID, segmentID, fieldID typeutil.UniqueID, filenames []string) []string {
|
||||
prefix := path.Join(
|
||||
rootPath,
|
||||
common.TextIndexPath,
|
||||
strconv.FormatInt(buildID, 10),
|
||||
strconv.FormatInt(version, 10),
|
||||
strconv.FormatInt(collectionID, 10),
|
||||
strconv.FormatInt(partitionID, 10),
|
||||
strconv.FormatInt(segmentID, 10),
|
||||
strconv.FormatInt(fieldID, 10),
|
||||
)
|
||||
|
||||
fullPaths := make([]string, 0, len(filenames))
|
||||
for _, filename := range filenames {
|
||||
fullPaths = append(fullPaths, path.Join(prefix, filename))
|
||||
}
|
||||
return fullPaths
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package metautil
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
@ -138,3 +139,113 @@ func TestParseInsertLogPath(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractTextLogFilenames(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
files []string
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "test extract filenames from full paths",
|
||||
files: []string{
|
||||
"files/text_log/123/0/456/789/101112/131415/test_file.pos_0",
|
||||
"files/text_log/123/0/456/789/101112/131415/test_file.pos_1",
|
||||
"files/text_log/123/0/456/789/101112/131416/another_file.pos_0",
|
||||
},
|
||||
want: []string{
|
||||
"test_file.pos_0",
|
||||
"test_file.pos_1",
|
||||
"another_file.pos_0",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test extract filename without path",
|
||||
files: []string{
|
||||
"filename.txt",
|
||||
},
|
||||
want: []string{
|
||||
"filename.txt",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test empty slice",
|
||||
files: []string{},
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "test single file",
|
||||
files: []string{
|
||||
"root/path/to/file.log",
|
||||
},
|
||||
want: []string{
|
||||
"file.log",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := ExtractTextLogFilenames(tt.files)
|
||||
// Sort both slices for comparison
|
||||
sort.Strings(got)
|
||||
sort.Strings(tt.want)
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("ExtractTextLogFilenames() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildTextLogPaths(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
rootPath string
|
||||
buildID typeutil.UniqueID
|
||||
version typeutil.UniqueID
|
||||
collectionID typeutil.UniqueID
|
||||
partitionID typeutil.UniqueID
|
||||
segmentID typeutil.UniqueID
|
||||
fieldID typeutil.UniqueID
|
||||
filenames []string
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "test build text log paths with multiple files",
|
||||
rootPath: "files",
|
||||
buildID: 123,
|
||||
version: 0,
|
||||
collectionID: 456,
|
||||
partitionID: 789,
|
||||
segmentID: 101112,
|
||||
fieldID: 131415,
|
||||
filenames: []string{"test_file.pos_0", "test_file.pos_1", "another_file.pos_0"},
|
||||
want: []string{
|
||||
"files/text_log/123/0/456/789/101112/131415/test_file.pos_0",
|
||||
"files/text_log/123/0/456/789/101112/131415/test_file.pos_1",
|
||||
"files/text_log/123/0/456/789/101112/131415/another_file.pos_0",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test build text log paths with empty filenames",
|
||||
rootPath: "files",
|
||||
buildID: 123,
|
||||
version: 0,
|
||||
collectionID: 456,
|
||||
partitionID: 789,
|
||||
segmentID: 101112,
|
||||
fieldID: 131415,
|
||||
filenames: []string{},
|
||||
want: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := BuildTextLogPaths(tt.rootPath, tt.buildID, tt.version, tt.collectionID, tt.partitionID, tt.segmentID, tt.fieldID, tt.filenames)
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("BuildTextLogPaths() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user