diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index b97a9882b0..18540b9c99 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -743,7 +743,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context, signal < binlog.DecompressBinLogs(cloned.SegmentInfo) logs := getLogs(cloned) - for key := range getTextLogs(cloned, gc) { + for key := range getTextLogs(cloned) { logs[key] = struct{}{} } @@ -868,23 +868,11 @@ func getLogs(sinfo *SegmentInfo) map[string]struct{} { return logs } -func getTextLogs(sinfo *SegmentInfo, gc *garbageCollector) map[string]struct{} { +func getTextLogs(sinfo *SegmentInfo) map[string]struct{} { textLogs := make(map[string]struct{}) for _, flog := range sinfo.GetTextStatsLogs() { - // Reconstruct full paths from filenames - // Files stored in TextIndexStats only contain filenames to save space - fullPaths := metautil.BuildTextLogPaths( - gc.option.cli.RootPath(), - flog.GetBuildID(), - flog.GetVersion(), - sinfo.GetCollectionID(), - sinfo.GetPartitionID(), - sinfo.GetID(), - flog.GetFieldID(), - flog.GetFiles(), - ) - for _, fullPath := range fullPaths { - textLogs[fullPath] = struct{}{} + for _, file := range flog.GetFiles() { + textLogs[file] = struct{}{} } } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index d86c5e55ed..3b94cd5fe5 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -275,12 +275,6 @@ func (m *meta) reloadFromKV(ctx context.Context, broker broker.Broker) error { for _, segments := range collectionSegments { numSegments += len(segments) for _, segment := range segments { - // Convert old text log paths (full paths) to filenames to save memory - // This handles backward compatibility during recovery - for _, textStatsLog := range segment.GetTextStatsLogs() { - textStatsLog.Files = metautil.ExtractTextLogFilenames(textStatsLog.GetFiles()) - } - // segments from catalog.ListSegments will not have logPath m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc() diff --git a/internal/datanode/compactor/sort_compaction.go b/internal/datanode/compactor/sort_compaction.go index 30a3e8bfcd..3d5e9c97d9 100644 --- a/internal/datanode/compactor/sort_compaction.go +++ b/internal/datanode/compactor/sort_compaction.go @@ -491,15 +491,13 @@ func (t *sortCompactionTask) createTextIndex(ctx context.Context, if err != nil { return err } - // Extract only filenames from full paths to save space - filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded)) mu.Lock() textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ FieldID: field.GetFieldID(), Version: 0, BuildID: taskID, - Files: filenames, + Files: lo.Keys(uploaded), } mu.Unlock() diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index f232a1a5a0..9cae7de3b1 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -503,15 +503,13 @@ func (st *statsTask) createTextIndex(ctx context.Context, if err != nil { return err } - // Extract only filenames from full paths to save space - filenames := metautil.ExtractTextLogFilenames(lo.Keys(uploaded)) mu.Lock() textIndexLogs[field.GetFieldID()] = &datapb.TextIndexStats{ FieldID: field.GetFieldID(), Version: version, BuildID: taskID, - Files: filenames, + Files: lo.Keys(uploaded), } mu.Unlock() diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 8fa89e7ec5..ecd3089058 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -133,6 +134,16 @@ func (kc *Catalog) listSegments(ctx context.Context, collectionID int64) ([]*dat return err } + // Restore full paths for text index logs (compatible with old version) + // segments from etcd may have filenames only in TextStatsLogs + metautil.BuildTextLogPaths( + kc.ChunkManagerRootPath, + segmentInfo.GetCollectionID(), + segmentInfo.GetPartitionID(), + segmentInfo.GetID(), + segmentInfo.GetTextStatsLogs(), + ) + segments = append(segments, segmentInfo) return nil } @@ -382,11 +393,11 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d noBinlogsSegment, _, _, _, _ := CloneSegmentWithExcludeBinlogs(s) // `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`. segmentutil.ReCalcRowCount(s, noBinlogsSegment) - segBytes, err := proto.Marshal(noBinlogsSegment) + segBytes, err := marshalSegmentInfo(noBinlogsSegment) if err != nil { return fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err) } - kvs[key] = string(segBytes) + kvs[key] = segBytes } saveFn := func(partialKvs map[string]string) error { diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index e155f980de..42c1ed6601 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util" + "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -225,6 +226,9 @@ func CloneSegmentWithExcludeBinlogs(segment *datapb.SegmentInfo) (*datapb.Segmen } func marshalSegmentInfo(segment *datapb.SegmentInfo) (string, error) { + // Compress TextStatsLogs paths to filenames before marshaling to save etcd space + metautil.ExtractTextLogFilenames(segment.GetTextStatsLogs()) + segBytes, err := proto.Marshal(segment) if err != nil { return "", fmt.Errorf("failed to marshal segment: %d, err: %w", segment.ID, err) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 93d03036cb..f843a52df5 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments/state" @@ -1190,25 +1189,11 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI // Text match index mmap config is based on the raw data mmap. enableMmap := isDataMmapEnable(f) - - // Reconstruct full paths from filenames - // Files stored in TextIndexStats only contain filenames to save space - fullPaths := metautil.BuildTextLogPaths( - binlog.GetRootPath(), - textLogs.GetBuildID(), - textLogs.GetVersion(), - s.Collection(), - s.Partition(), - s.ID(), - textLogs.GetFieldID(), - textLogs.GetFiles(), - ) - cgoProto := &indexcgopb.LoadTextIndexInfo{ FieldID: textLogs.GetFieldID(), Version: textLogs.GetVersion(), BuildID: textLogs.GetBuildID(), - Files: fullPaths, + Files: textLogs.GetFiles(), Schema: f, CollectionID: s.Collection(), PartitionID: s.Partition(), diff --git a/pkg/util/metautil/binlog.go b/pkg/util/metautil/binlog.go index c6ae1af190..c671f5ad07 100644 --- a/pkg/util/metautil/binlog.go +++ b/pkg/util/metautil/binlog.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -94,37 +95,55 @@ func JoinIDPath(ids ...typeutil.UniqueID) string { } // ExtractTextLogFilenames extracts only filenames from full paths to save space. -// It takes a slice of full paths and returns a slice of filenames. -func ExtractTextLogFilenames(files []string) []string { - filenames := make([]string, 0, len(files)) - for _, fullPath := range files { - idx := strings.LastIndex(fullPath, pathSep) - if idx < 0 { - filenames = append(filenames, fullPath) - } else { - filenames = append(filenames, fullPath[idx+1:]) +// It modifies the TextStatsLogs map in place, compressing full paths to filenames. +func ExtractTextLogFilenames(textStatsLogs map[int64]*datapb.TextIndexStats) { + for _, textStats := range textStatsLogs { + if textStats == nil { + continue } + filenames := make([]string, 0, len(textStats.GetFiles())) + for _, fullPath := range textStats.GetFiles() { + idx := strings.LastIndex(fullPath, pathSep) + if idx < 0 { + filenames = append(filenames, fullPath) + } else { + filenames = append(filenames, fullPath[idx+1:]) + } + } + textStats.Files = filenames } - return filenames } // BuildTextLogPaths reconstructs full paths from filenames for text index logs. -// Files stored in TextIndexStats only contain filenames to save space. -func BuildTextLogPaths(rootPath string, buildID, version, collectionID, partitionID, segmentID, fieldID typeutil.UniqueID, filenames []string) []string { - prefix := path.Join( - rootPath, - common.TextIndexPath, - strconv.FormatInt(buildID, 10), - strconv.FormatInt(version, 10), - strconv.FormatInt(collectionID, 10), - strconv.FormatInt(partitionID, 10), - strconv.FormatInt(segmentID, 10), - strconv.FormatInt(fieldID, 10), - ) +// This function is compatible with both old version (full paths) and new version (filenames only). +func BuildTextLogPaths(rootPath string, collectionID, partitionID, segmentID typeutil.UniqueID, textStatsLogs map[int64]*datapb.TextIndexStats) { + for _, textStats := range textStatsLogs { + if textStats == nil { + continue + } + prefix := path.Join( + rootPath, + common.TextIndexPath, + strconv.FormatInt(textStats.GetBuildID(), 10), + strconv.FormatInt(textStats.GetVersion(), 10), + strconv.FormatInt(collectionID, 10), + strconv.FormatInt(partitionID, 10), + strconv.FormatInt(segmentID, 10), + strconv.FormatInt(textStats.GetFieldID(), 10), + ) - fullPaths := make([]string, 0, len(filenames)) - for _, filename := range filenames { - fullPaths = append(fullPaths, path.Join(prefix, filename)) + filenames := textStats.GetFiles() + fullPaths := make([]string, 0, len(filenames)) + for _, filename := range filenames { + // Check if filename is already a full path (compatible with old version) + // If it contains the text_log path segment, treat it as a full path + if strings.Contains(filename, common.TextIndexPath+pathSep) { + fullPaths = append(fullPaths, filename) + } else { + // New version: filename only, need to join with prefix + fullPaths = append(fullPaths, path.Join(prefix, filename)) + } + } + textStats.Files = fullPaths } - return fullPaths } diff --git a/pkg/util/metautil/binlog_test.go b/pkg/util/metautil/binlog_test.go index 97cab3cc8f..fb0e93a669 100644 --- a/pkg/util/metautil/binlog_test.go +++ b/pkg/util/metautil/binlog_test.go @@ -17,10 +17,12 @@ package metautil import ( + "path" "reflect" - "sort" "testing" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -141,111 +143,55 @@ func TestParseInsertLogPath(t *testing.T) { } func TestExtractTextLogFilenames(t *testing.T) { - tests := []struct { - name string - files []string - want []string - }{ - { - name: "test extract filenames from full paths", - files: []string{ - "files/text_log/123/0/456/789/101112/131415/test_file.pos_0", - "files/text_log/123/0/456/789/101112/131415/test_file.pos_1", - "files/text_log/123/0/456/789/101112/131416/another_file.pos_0", - }, - want: []string{ - "test_file.pos_0", - "test_file.pos_1", - "another_file.pos_0", - }, - }, - { - name: "test extract filename without path", - files: []string{ - "filename.txt", - }, - want: []string{ - "filename.txt", - }, - }, - { - name: "test empty slice", - files: []string{}, - want: []string{}, - }, - { - name: "test single file", - files: []string{ - "root/path/to/file.log", - }, - want: []string{ - "file.log", + textStatsLogs := map[int64]*datapb.TextIndexStats{ + 100: { + FieldID: 100, + Files: []string{ + "/root/text_log/1/2/10/20/30/100/file1.txt", + "/root/text_log/1/2/10/20/30/100/file2.txt", }, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := ExtractTextLogFilenames(tt.files) - // Sort both slices for comparison - sort.Strings(got) - sort.Strings(tt.want) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("ExtractTextLogFilenames() = %v, want %v", got, tt.want) - } - }) + ExtractTextLogFilenames(textStatsLogs) + + wantFiles := []string{"file1.txt", "file2.txt"} + if !reflect.DeepEqual(textStatsLogs[100].Files, wantFiles) { + t.Errorf("ExtractTextLogFilenames() Files = %v, want %v", textStatsLogs[100].Files, wantFiles) } } func TestBuildTextLogPaths(t *testing.T) { - tests := []struct { - name string - rootPath string - buildID typeutil.UniqueID - version typeutil.UniqueID - collectionID typeutil.UniqueID - partitionID typeutil.UniqueID - segmentID typeutil.UniqueID - fieldID typeutil.UniqueID - filenames []string - want []string - }{ - { - name: "test build text log paths with multiple files", - rootPath: "files", - buildID: 123, - version: 0, - collectionID: 456, - partitionID: 789, - segmentID: 101112, - fieldID: 131415, - filenames: []string{"test_file.pos_0", "test_file.pos_1", "another_file.pos_0"}, - want: []string{ - "files/text_log/123/0/456/789/101112/131415/test_file.pos_0", - "files/text_log/123/0/456/789/101112/131415/test_file.pos_1", - "files/text_log/123/0/456/789/101112/131415/another_file.pos_0", - }, - }, - { - name: "test build text log paths with empty filenames", - rootPath: "files", - buildID: 123, - version: 0, - collectionID: 456, - partitionID: 789, - segmentID: 101112, - fieldID: 131415, - filenames: []string{}, - want: []string{}, + rootPath := "/root" + collectionID := typeutil.UniqueID(10) + partitionID := typeutil.UniqueID(20) + segmentID := typeutil.UniqueID(30) + + // Test building paths from filenames (new version) + textStatsLogs := map[int64]*datapb.TextIndexStats{ + 100: { + FieldID: 100, + BuildID: 1, + Version: 2, + Files: []string{"file1.txt", "file2.txt"}, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := BuildTextLogPaths(tt.rootPath, tt.buildID, tt.version, tt.collectionID, tt.partitionID, tt.segmentID, tt.fieldID, tt.filenames) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("BuildTextLogPaths() = %v, want %v", got, tt.want) - } - }) + BuildTextLogPaths(rootPath, collectionID, partitionID, segmentID, textStatsLogs) + + wantFiles := []string{ + path.Join(rootPath, common.TextIndexPath, "1", "2", "10", "20", "30", "100", "file1.txt"), + path.Join(rootPath, common.TextIndexPath, "1", "2", "10", "20", "30", "100", "file2.txt"), + } + if !reflect.DeepEqual(textStatsLogs[100].Files, wantFiles) { + t.Errorf("BuildTextLogPaths() Files = %v, want %v", textStatsLogs[100].Files, wantFiles) + } + + // Test old version compatibility (already full paths) + fullPath := path.Join(rootPath, common.TextIndexPath, "1", "2", "10", "20", "30", "100", "file3.txt") + textStatsLogs[100].Files = []string{fullPath} + BuildTextLogPaths(rootPath, collectionID, partitionID, segmentID, textStatsLogs) + if textStatsLogs[100].Files[0] != fullPath { + t.Errorf("BuildTextLogPaths() should keep full path unchanged, got %v", textStatsLogs[100].Files[0]) } }