diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 18540b9c99..b97a9882b0 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -743,7 +743,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context, signal < binlog.DecompressBinLogs(cloned.SegmentInfo) logs := getLogs(cloned) - for key := range getTextLogs(cloned) { + for key := range getTextLogs(cloned, gc) { logs[key] = struct{}{} } @@ -868,11 +868,23 @@ func getLogs(sinfo *SegmentInfo) map[string]struct{} { return logs } -func getTextLogs(sinfo *SegmentInfo) map[string]struct{} { +func getTextLogs(sinfo *SegmentInfo, gc *garbageCollector) map[string]struct{} { textLogs := make(map[string]struct{}) for _, flog := range sinfo.GetTextStatsLogs() { - for _, file := range flog.GetFiles() { - textLogs[file] = struct{}{} + // Reconstruct full paths from filenames + // Files stored in TextIndexStats only contain filenames to save space + fullPaths := metautil.BuildTextLogPaths( + gc.option.cli.RootPath(), + flog.GetBuildID(), + flog.GetVersion(), + sinfo.GetCollectionID(), + sinfo.GetPartitionID(), + sinfo.GetID(), + flog.GetFieldID(), + flog.GetFiles(), + ) + for _, fullPath := range fullPaths { + textLogs[fullPath] = struct{}{} } } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 736fc57fa4..9cd8b87f3b 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -275,6 +275,12 @@ func (m *meta) reloadFromKV(ctx context.Context, broker broker.Broker) error { for _, segments := range collectionSegments { numSegments += len(segments) for _, segment := range segments { + // Convert old text log paths (full paths) to filenames to save memory + // This handles backward compatibility during recovery + for _, textStatsLog := range segment.GetTextStatsLogs() { + textStatsLog.Files = metautil.ExtractTextLogFilenames(textStatsLog.GetFiles()) + } + // segments from catalog.ListSegments will not have logPath m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc() diff --git a/internal/datanode/compactor/sort_compaction.go b/internal/datanode/compactor/sort_compaction.go index 69774a58a6..3557b61fab 100644 --- a/internal/datanode/compactor/sort_compaction.go +++ b/internal/datanode/compactor/sort_compaction.go @@ -472,13 +472,15 @@ func (t *sortCompactionTask) createTextIndex(ctx context.Context, if err != nil { return err } + // Extract only filenames from full paths to save space + filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded)) mu.Lock() textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ FieldID: field.GetFieldID(), Version: 0, BuildID: taskID, - Files: lo.Keys(uploaded), + Files: filenames, } mu.Unlock() diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index 78a24f84c7..a0e0239283 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -503,13 +503,15 @@ func (st *statsTask) createTextIndex(ctx context.Context, if err != nil { return err } + // Extract only filenames from full paths to save space + filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded)) mu.Lock() textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ FieldID: field.GetFieldID(), Version: version, BuildID: taskID, - Files: lo.Keys(uploaded), + Files: filenames, } mu.Unlock() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 8e4562576e..ed1186786a 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments/state" @@ -1189,11 +1190,25 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI // Text match index mmap config is based on the raw data mmap. enableMmap := isDataMmapEnable(f) + + // Reconstruct full paths from filenames + // Files stored in TextIndexStats only contain filenames to save space + fullPaths := metautil.BuildTextLogPaths( + binlog.GetRootPath(), + textLogs.GetBuildID(), + textLogs.GetVersion(), + s.Collection(), + s.Partition(), + s.ID(), + textLogs.GetFieldID(), + textLogs.GetFiles(), + ) + cgoProto := &indexcgopb.LoadTextIndexInfo{ FieldID: textLogs.GetFieldID(), Version: textLogs.GetVersion(), BuildID: textLogs.GetBuildID(), - Files: textLogs.GetFiles(), + Files: fullPaths, Schema: f, CollectionID: s.Collection(), PartitionID: s.Partition(), diff --git a/pkg/util/metautil/binlog.go b/pkg/util/metautil/binlog.go index ab9052e632..c6ae1af190 100644 --- a/pkg/util/metautil/binlog.go +++ b/pkg/util/metautil/binlog.go @@ -92,3 +92,39 @@ func JoinIDPath(ids ...typeutil.UniqueID) string { } return path.Join(idStr...) } + +// ExtractTextLogFilenames extracts only filenames from full paths to save space. +// It takes a slice of full paths and returns a slice of filenames. +func ExtractTextLogFilenames(files []string) []string { + filenames := make([]string, 0, len(files)) + for _, fullPath := range files { + idx := strings.LastIndex(fullPath, pathSep) + if idx < 0 { + filenames = append(filenames, fullPath) + } else { + filenames = append(filenames, fullPath[idx+1:]) + } + } + return filenames +} + +// BuildTextLogPaths reconstructs full paths from filenames for text index logs. +// Files stored in TextIndexStats only contain filenames to save space. +func BuildTextLogPaths(rootPath string, buildID, version, collectionID, partitionID, segmentID, fieldID typeutil.UniqueID, filenames []string) []string { + prefix := path.Join( + rootPath, + common.TextIndexPath, + strconv.FormatInt(buildID, 10), + strconv.FormatInt(version, 10), + strconv.FormatInt(collectionID, 10), + strconv.FormatInt(partitionID, 10), + strconv.FormatInt(segmentID, 10), + strconv.FormatInt(fieldID, 10), + ) + + fullPaths := make([]string, 0, len(filenames)) + for _, filename := range filenames { + fullPaths = append(fullPaths, path.Join(prefix, filename)) + } + return fullPaths +} diff --git a/pkg/util/metautil/binlog_test.go b/pkg/util/metautil/binlog_test.go index 404677875d..97cab3cc8f 100644 --- a/pkg/util/metautil/binlog_test.go +++ b/pkg/util/metautil/binlog_test.go @@ -18,6 +18,7 @@ package metautil import ( "reflect" + "sort" "testing" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -138,3 +139,113 @@ func TestParseInsertLogPath(t *testing.T) { }) } } + +func TestExtractTextLogFilenames(t *testing.T) { + tests := []struct { + name string + files []string + want []string + }{ + { + name: "test extract filenames from full paths", + files: []string{ + "files/text_log/123/0/456/789/101112/131415/test_file.pos_0", + "files/text_log/123/0/456/789/101112/131415/test_file.pos_1", + "files/text_log/123/0/456/789/101112/131416/another_file.pos_0", + }, + want: []string{ + "test_file.pos_0", + "test_file.pos_1", + "another_file.pos_0", + }, + }, + { + name: "test extract filename without path", + files: []string{ + "filename.txt", + }, + want: []string{ + "filename.txt", + }, + }, + { + name: "test empty slice", + files: []string{}, + want: []string{}, + }, + { + name: "test single file", + files: []string{ + "root/path/to/file.log", + }, + want: []string{ + "file.log", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ExtractTextLogFilenames(tt.files) + // Sort both slices for comparison + sort.Strings(got) + sort.Strings(tt.want) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ExtractTextLogFilenames() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBuildTextLogPaths(t *testing.T) { + tests := []struct { + name string + rootPath string + buildID typeutil.UniqueID + version typeutil.UniqueID + collectionID typeutil.UniqueID + partitionID typeutil.UniqueID + segmentID typeutil.UniqueID + fieldID typeutil.UniqueID + filenames []string + want []string + }{ + { + name: "test build text log paths with multiple files", + rootPath: "files", + buildID: 123, + version: 0, + collectionID: 456, + partitionID: 789, + segmentID: 101112, + fieldID: 131415, + filenames: []string{"test_file.pos_0", "test_file.pos_1", "another_file.pos_0"}, + want: []string{ + "files/text_log/123/0/456/789/101112/131415/test_file.pos_0", + "files/text_log/123/0/456/789/101112/131415/test_file.pos_1", + "files/text_log/123/0/456/789/101112/131415/another_file.pos_0", + }, + }, + { + name: "test build text log paths with empty filenames", + rootPath: "files", + buildID: 123, + version: 0, + collectionID: 456, + partitionID: 789, + segmentID: 101112, + fieldID: 131415, + filenames: []string{}, + want: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := BuildTextLogPaths(tt.rootPath, tt.buildID, tt.version, tt.collectionID, tt.partitionID, tt.segmentID, tt.fieldID, tt.filenames) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("BuildTextLogPaths() = %v, want %v", got, tt.want) + } + }) + } +}