diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index dddc526e52..937974f000 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -21,7 +21,6 @@ import ( "fmt" "os" "os/signal" - "path/filepath" "runtime/debug" "strings" "sync" @@ -43,6 +42,7 @@ import ( kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/initcore" internalmetrics "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/log" @@ -85,11 +85,6 @@ type component interface { Stop() error } -const ( - TmpInvertedIndexPrefix = "/tmp/milvus/inverted-index/" - TmpTextLogPrefix = "/tmp/milvus/text-log/" -) - func cleanLocalDir(path string) { _, statErr := os.Stat(path) // path exist, but stat error @@ -193,17 +188,12 @@ func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync. func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) // clear local storage - rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() - queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole) - cleanLocalDir(queryDataLocalPath) - // clear mmap dir - mmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() - if len(mmapDir) > 0 { - cleanLocalDir(mmapDir) + queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0) + if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { + // under non-posix mode, we need to clean local storage when starting query node + // under posix mode, this clean task will be done by mixcoord + cleanLocalDir(queryDataLocalPath) } - cleanLocalDir(TmpInvertedIndexPrefix) - cleanLocalDir(TmpTextLogPrefix) - return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) } diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 36d4c19435..84496fc322 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1016,6 +1016,7 @@ common: enabledJSONKeyStats: false # Indicates sealedsegment whether to enable JSON key stats enabledGrowingSegmentJSONKeyStats: false # Indicates growingsegment whether to enable JSON key stats enableConfigParamTypeCheck: true # Indicates whether to enable config param type check + enablePosixMode: false # Specifies whether to run in POSIX mode for enhanced file system compatibility # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/coordinator/mix_coord.go b/internal/coordinator/mix_coord.go index 520e7ad329..dc39136dc7 100644 --- a/internal/coordinator/mix_coord.go +++ b/internal/coordinator/mix_coord.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "os" + "path/filepath" + "strconv" "sync" "time" @@ -24,6 +26,7 @@ import ( streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/common" @@ -78,6 +81,12 @@ type mixCoordImpl struct { metaKVCreator func() kv.MetaKv mixCoordClient types.MixCoordClient + + // POSIX directory cleanup task + posixCleanupCancel context.CancelFunc + posixCleanupWg sync.WaitGroup + posixCleanupStartOnce sync.Once + posixCleanupStopOnce sync.Once } func NewMixCoordServer(c context.Context, factory dependency.Factory) (*mixCoordImpl, error) { @@ -221,12 +230,108 @@ func (s *mixCoordImpl) initKVCreator() { func (s *mixCoordImpl) Start() error { s.UpdateStateCode(commonpb.StateCode_Healthy) + s.startPosixCleanupTask() + var startErr error return startErr } +func (s *mixCoordImpl) IsServerActive(serverID int64) bool { + return s.queryCoordServer.ServerExist(serverID) || s.datacoordServer.ServerExist(serverID) +} + +func (s *mixCoordImpl) checkExpiredPOSIXDIR() { + if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { + return + } + log := log.Ctx(s.ctx) + rootCachePath := pathutil.GetPath(pathutil.RootCachePath, 0) + var entries []os.DirEntry + var err error + if entries, err = os.ReadDir(rootCachePath); err != nil { + log.Warn("failed to read root cache directory", zap.String("path", rootCachePath), zap.String("error", err.Error())) + return + } + var subdirs []string + var removedDirs []string + + for _, entry := range entries { + if entry.IsDir() { + subdirs = append(subdirs, entry.Name()) + if nodeID, err := strconv.ParseInt(entry.Name(), 10, 64); err != nil { + log.Warn("invalid node directory name", zap.String("dirName", entry.Name()), zap.String("error", err.Error())) + } else { + if !s.IsServerActive(nodeID) { + expiredDirPath := filepath.Join(rootCachePath, entry.Name()) + if err := os.RemoveAll(expiredDirPath); err != nil { + log.Error("failed to remove expired node directory", + zap.String("path", expiredDirPath), + zap.Int64("nodeID", nodeID), + zap.String("error", err.Error())) + } else { + log.Info("removed expired node directory", + zap.String("path", expiredDirPath), + zap.Int64("nodeID", nodeID)) + removedDirs = append(removedDirs, entry.Name()) + } + } + } + } + } + if len(removedDirs) > 0 { + log.Info("root cache directory cleanup completed", + zap.String("path", rootCachePath), + zap.Strings("allSubdirectories", subdirs), + zap.Strings("removedDirectories", removedDirs), + zap.Int("totalDirs", len(subdirs)), + zap.Int("removedDirs", len(removedDirs))) + } +} + +func (s *mixCoordImpl) startPosixCleanupTask() { + s.posixCleanupStartOnce.Do(func() { + ctx, cancel := context.WithCancel(s.ctx) + s.posixCleanupCancel = cancel + + s.posixCleanupWg.Add(1) + go s.posixCleanupLoop(ctx) + }) +} + +func (s *mixCoordImpl) stopPosixCleanupTask() { + s.posixCleanupStopOnce.Do(func() { + if s.posixCleanupCancel != nil { + s.posixCleanupCancel() + } + s.posixCleanupWg.Wait() + }) +} + +func (s *mixCoordImpl) posixCleanupLoop(ctx context.Context) { + defer s.posixCleanupWg.Done() + + log := log.Ctx(ctx) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + log.Info("POSIX directory cleanup task started") + + for { + select { + case <-ctx.Done(): + log.Info("POSIX directory cleanup task stopped") + return + case <-ticker.C: + s.checkExpiredPOSIXDIR() + } + } +} + func (s *mixCoordImpl) Stop() error { log.Info("graceful stop") + + s.stopPosixCleanupTask() + s.GracefulStop() log.Info("graceful stop done") diff --git a/internal/coordinator/mix_coord_test.go b/internal/coordinator/mix_coord_test.go index 5cb5f167dc..bfc3722074 100644 --- a/internal/coordinator/mix_coord_test.go +++ b/internal/coordinator/mix_coord_test.go @@ -19,6 +19,8 @@ import ( "context" "fmt" "math/rand" + "os" + "path/filepath" "testing" "time" @@ -29,8 +31,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datacoord" + "github.com/milvus-io/milvus/internal/querycoordv2" "github.com/milvus-io/milvus/internal/util/dependency" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/etcd" @@ -189,3 +193,255 @@ func TestMixCoord_FlushAll(t *testing.T) { }) }) } + +func TestMixCoord_checkExpiredPOSIXDIR(t *testing.T) { + t.Run("POSIX mode disabled", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "false") + + // Create temporary directory for testing + tempDir := t.TempDir() + rootCachePath := filepath.Join(tempDir, "cache") + err := os.MkdirAll(rootCachePath, 0o755) + assert.NoError(t, err) + + // Create some directories + nodeIDs := []int64{1001, 1002, 2001} + for _, nodeID := range nodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + err := os.MkdirAll(nodeDir, 0o755) + assert.NoError(t, err) + } + + coord := &mixCoordImpl{ + ctx: context.Background(), + } + + // Should not remove any directories when POSIX mode is disabled + coord.checkExpiredPOSIXDIR() + + // Verify all directories still exist + for _, nodeID := range nodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + assert.DirExists(t, nodeDir) + } + }) + + t.Run("POSIX mode enabled - no expired directories", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "true") + + // Create temporary directory for testing + tempDir := t.TempDir() + rootCachePath := filepath.Join(tempDir, "cache") + err := os.MkdirAll(rootCachePath, 0o755) + assert.NoError(t, err) + + // Create some valid node directories + activeNodeIDs := []int64{1001, 1002, 2001} + for _, nodeID := range activeNodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + err := os.MkdirAll(nodeDir, 0o755) + assert.NoError(t, err) + } + + // Mock queryCoord and dataCoord servers + mockQueryCoord := &querycoordv2.Server{} + mockDataCoord := &datacoord.Server{} + + coord := &mixCoordImpl{ + ctx: context.Background(), + queryCoordServer: mockQueryCoord, + datacoordServer: mockDataCoord, + } + + // Mock ServerExist methods + mockey.PatchConvey("test POSIX cleanup with no expired dirs", t, func() { + // Mock ServerExist to return true for QueryCoord, false for DataCoord + mockey.Mock((*querycoordv2.Server).ServerExist).Return(true).Build() + mockey.Mock((*datacoord.Server).ServerExist).Return(false).Build() + mockey.Mock(pathutil.GetPath).Return(rootCachePath).Build() + + // Should not remove any directories + coord.checkExpiredPOSIXDIR() + + // Verify all directories still exist + for _, nodeID := range activeNodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + assert.DirExists(t, nodeDir) + } + }) + }) + + t.Run("POSIX mode enabled - with expired directories", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "true") + + // Create temporary directory for testing + tempDir := t.TempDir() + rootCachePath := filepath.Join(tempDir, "cache") + err := os.MkdirAll(rootCachePath, 0o755) + assert.NoError(t, err) + + // Create node directories (some active, some expired) + activeNodeIDs := []int64{1001, 1002} + expiredNodeIDs := []int64{1003, 2002} + allNodeIDs := append(activeNodeIDs, expiredNodeIDs...) + + for _, nodeID := range allNodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + err := os.MkdirAll(nodeDir, 0o755) + assert.NoError(t, err) + } + + // Mock queryCoord and dataCoord servers + mockQueryCoord := &querycoordv2.Server{} + mockDataCoord := &datacoord.Server{} + + coord := &mixCoordImpl{ + ctx: context.Background(), + queryCoordServer: mockQueryCoord, + datacoordServer: mockDataCoord, + } + + // Mock ServerExist methods - return false for all nodes (simulating expired state) + mockey.PatchConvey("test POSIX cleanup with expired dirs", t, func() { + mockey.Mock((*querycoordv2.Server).ServerExist).Return(false).Build() + mockey.Mock((*datacoord.Server).ServerExist).Return(false).Build() + mockey.Mock(pathutil.GetPath).Return(rootCachePath).Build() + + // Should remove all directories since all nodes are expired + coord.checkExpiredPOSIXDIR() + + // Verify all directories are removed + for _, nodeID := range allNodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + assert.NoDirExists(t, nodeDir) + } + }) + }) + + t.Run("POSIX mode enabled - all nodes active", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "true") + + // Create temporary directory for testing + tempDir := t.TempDir() + rootCachePath := filepath.Join(tempDir, "cache") + err := os.MkdirAll(rootCachePath, 0o755) + assert.NoError(t, err) + + // Create node directories + nodeIDs := []int64{1001, 1002, 2001, 2002} + + for _, nodeID := range nodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + err := os.MkdirAll(nodeDir, 0o755) + assert.NoError(t, err) + } + + // Mock queryCoord and dataCoord servers + mockQueryCoord := &querycoordv2.Server{} + mockDataCoord := &datacoord.Server{} + + coord := &mixCoordImpl{ + ctx: context.Background(), + queryCoordServer: mockQueryCoord, + datacoordServer: mockDataCoord, + } + + // Mock ServerExist methods + mockey.PatchConvey("test POSIX cleanup with all nodes active", t, func() { + // Mock ServerExist to return true for both QueryCoord and DataCoord (all nodes active) + mockey.Mock((*querycoordv2.Server).ServerExist).Return(true).Build() + mockey.Mock((*datacoord.Server).ServerExist).Return(true).Build() + mockey.Mock(pathutil.GetPath).Return(rootCachePath).Build() + + // Should not remove any directories + coord.checkExpiredPOSIXDIR() + + // Verify all directories still exist + for _, nodeID := range nodeIDs { + nodeDir := filepath.Join(rootCachePath, fmt.Sprintf("%d", nodeID)) + assert.DirExists(t, nodeDir) + } + }) + }) + + t.Run("POSIX mode enabled - invalid directory names", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "true") + + // Create temporary directory for testing + tempDir := t.TempDir() + rootCachePath := filepath.Join(tempDir, "cache") + err := os.MkdirAll(rootCachePath, 0o755) + assert.NoError(t, err) + + // Create directories with invalid names + invalidDirs := []string{"invalid_dir", "node_abc", "123abc"} + for _, dirName := range invalidDirs { + invalidDir := filepath.Join(rootCachePath, dirName) + err := os.MkdirAll(invalidDir, 0o755) + assert.NoError(t, err) + } + + // Create one valid directory + validDir := filepath.Join(rootCachePath, "1001") + err = os.MkdirAll(validDir, 0o755) + assert.NoError(t, err) + + // Mock queryCoord and dataCoord servers + mockQueryCoord := &querycoordv2.Server{} + mockDataCoord := &datacoord.Server{} + + coord := &mixCoordImpl{ + ctx: context.Background(), + queryCoordServer: mockQueryCoord, + datacoordServer: mockDataCoord, + } + + // Mock ServerExist methods + mockey.PatchConvey("test POSIX cleanup with invalid dir names", t, func() { + // Mock ServerExist for the valid node + mockey.Mock((*querycoordv2.Server).ServerExist).Return(true).Build() + mockey.Mock((*datacoord.Server).ServerExist).Return(false).Build() + mockey.Mock(pathutil.GetPath).Return(rootCachePath).Build() + + // Should handle invalid directory names gracefully + coord.checkExpiredPOSIXDIR() + + // Verify valid directory still exists + assert.DirExists(t, validDir) + + // Verify invalid directories are not removed (they should be ignored) + for _, dirName := range invalidDirs { + invalidDir := filepath.Join(rootCachePath, dirName) + assert.DirExists(t, invalidDir) + } + }) + }) + + t.Run("POSIX mode enabled - read directory error", func(t *testing.T) { + paramtable.Init() + paramtable.Get().Save(Params.CommonCfg.EnablePosixMode.Key, "true") + + // Mock queryCoord and dataCoord servers + mockQueryCoord := &querycoordv2.Server{} + mockDataCoord := &datacoord.Server{} + + coord := &mixCoordImpl{ + ctx: context.Background(), + queryCoordServer: mockQueryCoord, + datacoordServer: mockDataCoord, + } + + // Mock ServerExist methods + mockey.PatchConvey("test POSIX cleanup with read dir error", t, func() { + mockey.Mock(pathutil.GetPath).Return("/non/existent/path").Build() + + // Should handle read directory error gracefully + coord.checkExpiredPOSIXDIR() + }) + }) +} diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 6e4dc1f76a..5bac1c15eb 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -36,9 +36,6 @@ struct FieldBinlogInfo { struct LoadFieldDataInfo { std::map field_infos; - // Set empty to disable mmap, - // mmap file path will be {mmap_dir_path}/{segment_id}/{field_id} - std::string mmap_dir_path = ""; int64_t storage_version = 0; milvus::proto::common::LoadPriority load_priority = milvus::proto::common::LoadPriority::HIGH; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index b1c020e138..d765ee12f0 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -68,6 +68,7 @@ #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" #include "cachinglayer/CacheSlot.h" +#include "storage/LocalChunkManagerSingleton.h" namespace milvus::segcore { using namespace milvus::cachinglayer; @@ -292,19 +293,24 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( schema_->ShouldLoadField(milvus_field_ids[i]); } + auto mmap_dir_path = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager() + ->GetRootPath(); auto column_group_info = FieldDataInfo(column_group_id.get(), num_rows, - load_info.mmap_dir_path, + mmap_dir_path, merged_in_load_list); LOG_INFO( "[StorageV2] segment {} loads column group {} with field ids {} " "with " "num_rows " - "{}", + "{} mmap_dir_path={}", this->get_segment_id(), column_group_id.get(), field_id_list.ToString(), - num_rows); + num_rows, + mmap_dir_path); auto field_metas = schema_->get_field_metas(milvus_field_ids); @@ -353,10 +359,14 @@ ChunkedSegmentSealedImpl::load_field_data_internal( auto field_id = FieldId(id); + auto mmap_dir_path = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager() + ->GetRootPath(); auto field_data_info = FieldDataInfo(field_id.get(), num_rows, - load_info.mmap_dir_path, + mmap_dir_path, schema_->ShouldLoadField(field_id)); LOG_INFO("segment {} loads field {} with num_rows {}, sorted by pk {}", this->get_segment_id(), diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 1dad33c598..c14d4b7d8f 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -445,9 +445,8 @@ SegmentGrowingImpl::load_column_group_data_internal( storage::SortByPath(insert_files); auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto column_group_info = - FieldDataInfo(column_group_id.get(), num_rows, infos.mmap_dir_path); + FieldDataInfo(column_group_id.get(), num_rows, ""); column_group_info.arrow_reader_channel->set_capacity(parallel_degree); LOG_INFO( diff --git a/internal/core/src/segcore/load_field_data_c.cpp b/internal/core/src/segcore/load_field_data_c.cpp index f844dade04..6d9b3c9ade 100644 --- a/internal/core/src/segcore/load_field_data_c.cpp +++ b/internal/core/src/segcore/load_field_data_c.cpp @@ -95,16 +95,6 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info, } } -void -AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info, - const char* c_dir_path) { - SCOPE_CGO_CALL_METRIC(); - - auto load_field_data_info = - static_cast(c_load_field_data_info); - load_field_data_info->mmap_dir_path = std::string(c_dir_path); -} - void SetStorageVersion(CLoadFieldDataInfo c_load_field_data_info, int64_t storage_version) { diff --git a/internal/core/src/segcore/load_field_data_c.h b/internal/core/src/segcore/load_field_data_c.h index c4a5916dc0..47775aa057 100644 --- a/internal/core/src/segcore/load_field_data_c.h +++ b/internal/core/src/segcore/load_field_data_c.h @@ -44,10 +44,6 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info, int64_t memory_size, const char* file_path); -void -AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info, - const char* dir_path); - void SetStorageVersion(CLoadFieldDataInfo c_load_field_data_info, int64_t storage_version); diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 9aa7453354..ef324ab2ec 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -316,14 +316,15 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { LOG_INFO( "[collection={}][segment={}][field={}][enable_mmap={}][load_" - "priority={}] load index " - "{}", + "priority={}] load index {}, " + "mmap_dir_path={}", load_index_info->collection_id, load_index_info->segment_id, load_index_info->field_id, load_index_info->enable_mmap, load_priority_str, - load_index_info->index_id); + load_index_info->index_id, + load_index_info->mmap_dir_path); // get index type AssertInfo(index_params.find("index_type") != index_params.end(), @@ -370,10 +371,6 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { load_index_info->field_id, load_index_info->index_build_id, load_index_info->index_version}; - auto remote_chunk_manager = - milvus::storage::RemoteChunkManagerSingleton::GetInstance() - .GetRemoteChunkManager(); - config[milvus::index::INDEX_FILES] = load_index_info->index_files; if (load_index_info->field_type == milvus::DataType::JSON) { @@ -381,6 +378,9 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { config.at(JSON_CAST_TYPE).get()); index_info.json_path = config.at(JSON_PATH).get(); } + auto remote_chunk_manager = + milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); milvus::storage::FileManagerContext fileManagerContext( field_meta, index_meta, remote_chunk_manager); fileManagerContext.set_for_loading_index(true); @@ -400,12 +400,13 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { LOG_INFO( "[collection={}][segment={}][field={}][enable_mmap={}] load index " - "{} done", + "{} done, mmap_dir_path={}", load_index_info->collection_id, load_index_info->segment_id, load_index_info->field_id, load_index_info->enable_mmap, - load_index_info->index_id); + load_index_info->index_id, + load_index_info->mmap_dir_path); auto status = CStatus(); status.error_code = milvus::Success; @@ -552,7 +553,6 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, load_index_info->element_type = static_cast( info_proto->field().element_type()); load_index_info->enable_mmap = info_proto->enable_mmap(); - load_index_info->mmap_dir_path = info_proto->mmap_dir_path(); load_index_info->index_id = info_proto->indexid(); load_index_info->index_build_id = info_proto->index_buildid(); load_index_info->index_version = info_proto->index_version(); @@ -569,6 +569,14 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, info_proto->index_engine_version(); load_index_info->schema = info_proto->field(); load_index_info->index_size = info_proto->index_file_size(); + + auto remote_chunk_manager = + milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + load_index_info->mmap_dir_path = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager() + ->GetRootPath(); } auto status = CStatus(); status.error_code = milvus::Success; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index c17a8721e2..27ff34960f 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -598,13 +598,30 @@ GenIndexPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id, bool is_temp) { + return GenIndexPathPrefixByType(cm, + build_id, + index_version, + segment_id, + field_id, + INDEX_ROOT_PATH, + is_temp); +} + +std::string +GenIndexPathPrefixByType(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id, + const std::string& index_type, + bool is_temp) { boost::filesystem::path prefix = cm->GetRootPath(); if (is_temp) { prefix = prefix / TEMP; } - boost::filesystem::path path = std::string(INDEX_ROOT_PATH); + boost::filesystem::path path = std::string(index_type); boost::filesystem::path path1 = GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); return (prefix / path / path1).string(); @@ -617,16 +634,13 @@ GenTextIndexPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id, bool is_temp) { - boost::filesystem::path prefix = cm->GetRootPath(); - - if (is_temp) { - prefix = prefix / TEMP; - } - - boost::filesystem::path path = std::string(TEXT_LOG_ROOT_PATH); - boost::filesystem::path path1 = - GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); - return (prefix / path / path1).string(); + return GenIndexPathPrefixByType(cm, + build_id, + index_version, + segment_id, + field_id, + TEXT_LOG_ROOT_PATH, + is_temp); } std::string @@ -636,16 +650,13 @@ GenJsonKeyIndexPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id, bool is_temp) { - boost::filesystem::path prefix = cm->GetRootPath(); - - if (is_temp) { - prefix = prefix / TEMP; - } - - boost::filesystem::path path = std::string(JSON_KEY_INDEX_LOG_ROOT_PATH); - boost::filesystem::path path1 = - GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); - return (prefix / path / path1).string(); + return GenIndexPathPrefixByType(cm, + build_id, + index_version, + segment_id, + field_id, + JSON_KEY_INDEX_LOG_ROOT_PATH, + is_temp); } std::string @@ -686,16 +697,13 @@ GenNgramIndexPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id, bool is_temp) { - boost::filesystem::path prefix = cm->GetRootPath(); - - if (is_temp) { - prefix = prefix / TEMP; - } - - boost::filesystem::path path = std::string(NGRAM_LOG_ROOT_PATH); - boost::filesystem::path path1 = - GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); - return (prefix / path / path1).string(); + return GenIndexPathPrefixByType(cm, + build_id, + index_version, + segment_id, + field_id, + NGRAM_LOG_ROOT_PATH, + is_temp); } std::string diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index ff5d27a30c..c86b07db32 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -94,6 +94,17 @@ GenIndexPathIdentifier(int64_t build_id, int64_t segment_id, int64_t field_id); +// is_temp: true for temporary path used during index building, +// false for path to store pre-built index contents downloaded from remote storage +std::string +GenIndexPathPrefixByType(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id, + const std::string& index_type, + bool is_temp); + // is_temp: true for temporary path used during index building, // false for path to store pre-built index contents downloaded from remote storage std::string diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 4af2cf7cfc..44cd860650 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -1896,7 +1896,7 @@ TEST(CApiTest, LoadIndexInfo) { c_load_index_info, index_param_key2.data(), index_param_value2.data()); ASSERT_EQ(status.error_code, Success); std::string field_name = "field0"; - status = AppendFieldInfo( + status = AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 0, CDataType::FloatVector, false, ""); ASSERT_EQ(status.error_code, Success); AppendIndexEngineVersionToLoadInfo( @@ -2074,7 +2074,7 @@ Test_Indexing_Without_Predicate() { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, TraitType::c_data_type, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -2230,7 +2230,7 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -2408,7 +2408,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -2588,7 +2588,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -2760,7 +2760,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -2933,7 +2933,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -3114,7 +3114,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -3295,7 +3295,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -3470,7 +3470,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -3668,7 +3668,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -3823,7 +3823,7 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, @@ -4054,7 +4054,7 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo( + AppendFieldInfoForTest( c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, false, ""); AppendIndexEngineVersionToLoadInfo( c_load_index_info, diff --git a/internal/core/unittest/test_chunked_segment_storage_v2.cpp b/internal/core/unittest/test_chunked_segment_storage_v2.cpp index a2d4206f47..98a25096af 100644 --- a/internal/core/unittest/test_chunked_segment_storage_v2.cpp +++ b/internal/core/unittest/test_chunked_segment_storage_v2.cpp @@ -191,13 +191,11 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { std::vector(chunk_num * test_data_count * 4), false, std::vector({paths[2]})}); - load_info.mmap_dir_path = ""; load_info.storage_version = 2; segment->AddFieldDataInfoForSealed(load_info); for (auto& [id, info] : load_info.field_infos) { LoadFieldDataInfo load_field_info; load_field_info.storage_version = 2; - load_field_info.mmap_dir_path = ""; load_field_info.field_infos.emplace(id, info); segment->LoadFieldData(load_field_info); } diff --git a/internal/core/unittest/test_utils/c_api_test_utils.h b/internal/core/unittest/test_utils/c_api_test_utils.h index 3ca4c9d7ce..a147cee68c 100644 --- a/internal/core/unittest/test_utils/c_api_test_utils.h +++ b/internal/core/unittest/test_utils/c_api_test_utils.h @@ -39,9 +39,9 @@ using namespace milvus; using namespace milvus::segcore; -// Test utility function for AppendFieldInfo +// Test utility function for AppendFieldInfoForTest inline CStatus -AppendFieldInfo(CLoadIndexInfo c_load_index_info, +AppendFieldInfoForTest(CLoadIndexInfo c_load_index_info, int64_t collection_id, int64_t partition_id, int64_t segment_id, diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index ba505953d7..da03162f18 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -76,7 +76,6 @@ PrepareInsertBinlog(int64_t collection_id, std::vector excluded_field_ids = {}) { bool enable_mmap = !mmap_dir_path.empty(); LoadFieldDataInfo load_info; - load_info.mmap_dir_path = mmap_dir_path; auto row_count = dataset.row_ids_.size(); const std::string prefix = TestRemotePath; @@ -148,7 +147,6 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id, const std::string& mmap_dir_path = "") { bool enable_mmap = !mmap_dir_path.empty(); LoadFieldDataInfo load_info; - load_info.mmap_dir_path = mmap_dir_path; std::vector files; files.reserve(field_datas.size()); std::vector row_counts; diff --git a/internal/core/unittest/test_vector_array_storage_v2.cpp b/internal/core/unittest/test_vector_array_storage_v2.cpp index 00ec3ac442..794b4e43d0 100644 --- a/internal/core/unittest/test_vector_array_storage_v2.cpp +++ b/internal/core/unittest/test_vector_array_storage_v2.cpp @@ -208,13 +208,11 @@ class TestVectorArrayStorageV2 : public testing::Test { false, std::vector({paths[1]})}); - load_info.mmap_dir_path = ""; load_info.storage_version = 2; segment_->AddFieldDataInfoForSealed(load_info); for (auto& [id, info] : load_info.field_infos) { LoadFieldDataInfo load_field_info; load_field_info.storage_version = 2; - load_field_info.mmap_dir_path = ""; load_field_info.field_infos.emplace(id, info); segment_->LoadFieldData(load_field_info); } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 5d3c3f84fe..6d8beb474b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -240,6 +240,19 @@ func (s *Server) Register() error { return nil } +func (s *Server) ServerExist(serverID int64) bool { + sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole) + if err != nil { + log.Ctx(s.ctx).Warn("failed to get sessions", zap.Error(err)) + return false + } + sessionMap := lo.MapKeys(sessions, func(s *sessionutil.Session, _ string) int64 { + return s.ServerID + }) + _, exists := sessionMap[serverID] + return exists +} + // Init change server state to Initializing func (s *Server) Init() error { s.registerMetricsRequest() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 0f97a4fc69..f37a21ea8d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -204,7 +204,7 @@ func (node *DataNode) Init() error { node.importTaskMgr = importv2.NewTaskManager() node.importScheduler = importv2.NewScheduler(node.importTaskMgr) - err := index.InitSegcore() + err := index.InitSegcore(serverID) if err != nil { initError = err } diff --git a/internal/datanode/index/init_segcore.go b/internal/datanode/index/init_segcore.go index c8b8dc701b..59b9edaba3 100644 --- a/internal/datanode/index/init_segcore.go +++ b/internal/datanode/index/init_segcore.go @@ -29,16 +29,16 @@ import "C" import ( "path" - "path/filepath" "unsafe" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -func InitSegcore() error { +func InitSegcore(nodeID int64) error { cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) C.IndexBuilderInit(cGlogConf) C.free(unsafe.Pointer(cGlogConf)) @@ -76,7 +76,7 @@ func InitSegcore() error { } C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereThreadPoolSize) - localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole) + localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, nodeID) initcore.InitLocalChunkManager(localDataRootPath) cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 0a2d941f49..1b130f0958 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -157,6 +157,19 @@ func (s *Server) SetSession(session sessionutil.SessionInterface) error { return nil } +func (s *Server) ServerExist(serverID int64) bool { + sessions, _, err := s.session.GetSessions(typeutil.QueryNodeRole) + if err != nil { + log.Ctx(s.ctx).Warn("failed to get sessions", zap.Error(err)) + return false + } + sessionMap := lo.MapKeys(sessions, func(s *sessionutil.Session, _ string) int64 { + return s.ServerID + }) + _, exists := sessionMap[serverID] + return exists +} + func (s *Server) registerMetricsRequest() { getSystemInfoAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.getSystemInfoMetrics(ctx, req) diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 6c79744f72..e79c0b41f3 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -86,7 +86,7 @@ func (s *DelegatorDataSuite) SetupSuite() { paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1") localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - initcore.InitMmapManager(paramtable.Get()) + initcore.InitMmapManager(paramtable.Get(), 1) s.collectionID = 1000 s.replicaID = 65535 diff --git a/internal/querynodev2/delegator/idf_oracle.go b/internal/querynodev2/delegator/idf_oracle.go index b170ff5200..03e6005187 100644 --- a/internal/querynodev2/delegator/idf_oracle.go +++ b/internal/querynodev2/delegator/idf_oracle.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" @@ -404,7 +405,7 @@ func (o *idfOracle) syncloop() { func (o *idfOracle) localloop() { pool := conc.NewPool[struct{}](paramtable.Get().QueryNodeCfg.IDFWriteConcurrenct.GetAsInt()) - o.dirPath = path.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), "bm25", fmt.Sprintf("%d", o.collectionID)) + o.dirPath = pathutil.GetPath(pathutil.BM25Path, paramtable.GetNodeID()) defer o.wg.Done() for { diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 30931385e6..5f9050a1ca 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -43,7 +43,7 @@ func (s *ManagerSuite) SetupSuite() { s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0} localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - initcore.InitMmapManager(paramtable.Get()) + initcore.InitMmapManager(paramtable.Get(), 1) } func (s *ManagerSuite) SetupTest() { diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 1772a2636a..b4604dda5f 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -58,7 +58,7 @@ func (suite *SearchSuite) SetupTest() { suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) initcore.InitRemoteChunkManager(paramtable.Get()) initcore.InitLocalChunkManager(suite.T().Name()) - initcore.InitMmapManager(paramtable.Get()) + initcore.InitMmapManager(paramtable.Get(), 1) suite.collectionID = 100 suite.partitionID = 10 diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 500b4da5d9..e4d67235cc 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -745,7 +745,6 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error { ) req := &segcore.LoadFieldDataRequest{ - MMapDir: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), RowCount: rowCount, StorageVersion: loadInfo.StorageVersion, } @@ -803,7 +802,6 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun } mmapEnabled := isDataMmapEnable(fieldSchema) req := &segcore.LoadFieldDataRequest{ - MMapDir: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), Fields: []segcore.LoadFieldDataInfo{{ Field: field, EnableMMap: mmapEnabled, @@ -972,14 +970,12 @@ func GetCLoadInfoWithFunc(ctx context.Context, } enableMmap := isIndexMmapEnable(fieldSchema, indexInfo) - indexInfoProto := &cgopb.LoadIndexInfo{ CollectionID: loadInfo.GetCollectionID(), PartitionID: loadInfo.GetPartitionID(), SegmentID: loadInfo.GetSegmentID(), Field: fieldSchema, EnableMmap: enableMmap, - MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), IndexID: indexInfo.GetIndexID(), IndexBuildID: indexInfo.GetBuildID(), IndexVersion: indexInfo.GetIndexVersion(), diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 708b2c9f61..3c82199bea 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -84,7 +84,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { suite.loader = NewLoader(ctx, suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) initcore.InitLocalChunkManager(suite.rootPath) - initcore.InitMmapManager(paramtable.Get()) + initcore.InitMmapManager(paramtable.Get(), 1) // Data suite.schema = mock_segcore.GenTestCollectionSchema("test", schemapb.DataType_Int64, false) diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index b5b6194f2f..c401ba0754 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -48,7 +48,7 @@ func (suite *SegmentSuite) SetupTest() { initcore.InitRemoteChunkManager(paramtable.Get()) localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - initcore.InitMmapManager(paramtable.Get()) + initcore.InitMmapManager(paramtable.Get(), 1) suite.collectionID = 100 suite.partitionID = 10 diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 4bfeff2715..2aaedd88eb 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -34,7 +34,6 @@ import ( "fmt" "os" "path" - "path/filepath" "plugin" "strings" "sync" @@ -61,6 +60,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/segcore" @@ -321,7 +321,8 @@ func (node *QueryNode) InitSegcore() error { cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64()) C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes) - localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, node.GetNodeID()) + initcore.InitLocalChunkManager(localDataRootPath) err := initcore.InitRemoteChunkManager(paramtable.Get()) @@ -339,7 +340,7 @@ func (node *QueryNode) InitSegcore() error { return err } - err = initcore.InitMmapManager(paramtable.Get()) + err = initcore.InitMmapManager(paramtable.Get(), node.GetNodeID()) if err != nil { return err } @@ -409,6 +410,11 @@ func (node *QueryNode) InitSegcore() error { diskMaxBytes := C.int64_t(diskMaxRatio * float64(osDiskBytes)) evictionEnabled := C.bool(paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool()) + + if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { + panic("tiered storage eviction is not supported in POSIX mode, change config and restart") + } + cacheTouchWindowMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredCacheTouchWindowMs.GetAsInt64()) evictionIntervalMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredEvictionIntervalMs.GetAsInt64()) cacheCellUnaccessedSurvivalTime := C.int64_t(paramtable.Get().QueryNodeCfg.CacheCellUnaccessedSurvivalTime.GetAsInt64()) diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 9123e96446..74533b4025 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -32,7 +32,6 @@ import ( "context" "encoding/base64" "fmt" - "path" "strconv" "sync" "time" @@ -43,6 +42,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/util/hookutil" + "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -250,18 +250,18 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error { return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed") } -func InitMmapManager(params *paramtable.ComponentParam) error { - mmapDirPath := params.QueryNodeCfg.MmapDirPath.GetValue() - cMmapChunkManagerDir := C.CString(path.Join(mmapDirPath, "/mmap_chunk_manager/")) +func InitMmapManager(params *paramtable.ComponentParam, nodeID int64) error { + growingMMapDir := pathutil.GetPath(pathutil.GrowingMMapPath, nodeID) + cGrowingMMapDir := C.CString(growingMMapDir) cCacheReadAheadPolicy := C.CString(params.QueryNodeCfg.ReadAheadPolicy.GetValue()) - defer C.free(unsafe.Pointer(cMmapChunkManagerDir)) + defer C.free(unsafe.Pointer(cGrowingMMapDir)) defer C.free(unsafe.Pointer(cCacheReadAheadPolicy)) diskCapacity := params.QueryNodeCfg.DiskCapacityLimit.GetAsUint64() diskLimit := uint64(float64(params.QueryNodeCfg.MaxMmapDiskPercentageForMmapManager.GetAsUint64()*diskCapacity) * 0.01) mmapFileSize := params.QueryNodeCfg.FixedFileSizeForMmapManager.GetAsFloat() * 1024 * 1024 mmapConfig := C.CMmapConfig{ cache_read_ahead_policy: cCacheReadAheadPolicy, - mmap_path: cMmapChunkManagerDir, + mmap_path: cGrowingMMapDir, disk_limit: C.uint64_t(diskLimit), fix_file_size: C.uint64_t(mmapFileSize), growing_enable_mmap: C.bool(params.QueryNodeCfg.GrowingMmapEnabled.GetAsBool()), diff --git a/internal/util/pathutil/path_util.go b/internal/util/pathutil/path_util.go new file mode 100644 index 0000000000..42e042638a --- /dev/null +++ b/internal/util/pathutil/path_util.go @@ -0,0 +1,44 @@ +package pathutil + +import ( + "fmt" + "path/filepath" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +type PathType int + +const ( + GrowingMMapPath PathType = iota + LocalChunkPath + BM25Path + RootCachePath +) + +const ( + CachePathPrefix = "cache" + GrowingMMapPathPrefix = "growing_mmap" + LocalChunkPathPrefix = "local_chunk" + BM25PathPrefix = "bm25" +) + +func GetPath(pathType PathType, nodeID int64) string { + rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() + + path := filepath.Join(rootPath, CachePathPrefix) + switch pathType { + case GrowingMMapPath: + path = filepath.Join(path, fmt.Sprintf("%d", nodeID), GrowingMMapPathPrefix) + case LocalChunkPath: + path = filepath.Join(path, fmt.Sprintf("%d", nodeID), LocalChunkPathPrefix) + case BM25Path: + path = filepath.Join(path, fmt.Sprintf("%d", nodeID), BM25PathPrefix) + case RootCachePath: + } + log.Info("Get path for", zap.Any("pathType", pathType), zap.Int64("nodeID", nodeID), zap.String("path", path)) + return path +} diff --git a/internal/util/segcore/reduce_test.go b/internal/util/segcore/reduce_test.go index 09205ca8b0..dbdf4f8f6e 100644 --- a/internal/util/segcore/reduce_test.go +++ b/internal/util/segcore/reduce_test.go @@ -63,7 +63,7 @@ func (suite *ReduceSuite) SetupSuite() { func (suite *ReduceSuite) SetupTest() { localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - err := initcore.InitMmapManager(paramtable.Get()) + err := initcore.InitMmapManager(paramtable.Get(), 1) suite.NoError(err) ctx := context.Background() msgLength := 100 diff --git a/internal/util/segcore/requests.go b/internal/util/segcore/requests.go index bf0d8f157f..e31b0bbc3b 100644 --- a/internal/util/segcore/requests.go +++ b/internal/util/segcore/requests.go @@ -80,12 +80,6 @@ func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldD C.EnableMmap(cLoadFieldDataInfo, cFieldID, C.bool(field.EnableMMap)) } - - if len(req.MMapDir) > 0 { - mmapDir := C.CString(req.MMapDir) - defer C.free(unsafe.Pointer(mmapDir)) - C.AppendMMapDirPath(cLoadFieldDataInfo, mmapDir) - } C.SetLoadPriority(cLoadFieldDataInfo, C.int32_t(req.LoadPriority)) return &cLoadFieldDataRequest{ cLoadFieldDataInfo: cLoadFieldDataInfo, diff --git a/internal/util/segcore/segment_test.go b/internal/util/segcore/segment_test.go index 158f97b2ea..bfc770ffc8 100644 --- a/internal/util/segcore/segment_test.go +++ b/internal/util/segcore/segment_test.go @@ -23,7 +23,7 @@ func TestGrowingSegment(t *testing.T) { paramtable.Init() localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - err := initcore.InitMmapManager(paramtable.Get()) + err := initcore.InitMmapManager(paramtable.Get(), 1) assert.NoError(t, err) collectionID := int64(100) diff --git a/pkg/proto/cgo_msg.proto b/pkg/proto/cgo_msg.proto index 1ebc91dcbe..74f5208488 100644 --- a/pkg/proto/cgo_msg.proto +++ b/pkg/proto/cgo_msg.proto @@ -11,7 +11,7 @@ message LoadIndexInfo { int64 segmentID = 3; schema.FieldSchema field = 5; bool enable_mmap = 6; - string mmap_dir_path = 7; + string mmap_dir_path = 7; //Deprecated int64 indexID = 8; int64 index_buildID = 9; int64 index_version = 10; diff --git a/pkg/proto/cgopb/cgo_msg.pb.go b/pkg/proto/cgopb/cgo_msg.pb.go index 90c297cc6b..0c28b06bf5 100644 --- a/pkg/proto/cgopb/cgo_msg.pb.go +++ b/pkg/proto/cgopb/cgo_msg.pb.go @@ -31,7 +31,7 @@ type LoadIndexInfo struct { SegmentID int64 `protobuf:"varint,3,opt,name=segmentID,proto3" json:"segmentID,omitempty"` Field *schemapb.FieldSchema `protobuf:"bytes,5,opt,name=field,proto3" json:"field,omitempty"` EnableMmap bool `protobuf:"varint,6,opt,name=enable_mmap,json=enableMmap,proto3" json:"enable_mmap,omitempty"` - MmapDirPath string `protobuf:"bytes,7,opt,name=mmap_dir_path,json=mmapDirPath,proto3" json:"mmap_dir_path,omitempty"` + MmapDirPath string `protobuf:"bytes,7,opt,name=mmap_dir_path,json=mmapDirPath,proto3" json:"mmap_dir_path,omitempty"` //Deprecated IndexID int64 `protobuf:"varint,8,opt,name=indexID,proto3" json:"indexID,omitempty"` IndexBuildID int64 `protobuf:"varint,9,opt,name=index_buildID,json=indexBuildID,proto3" json:"index_buildID,omitempty"` IndexVersion int64 `protobuf:"varint,10,opt,name=index_version,json=indexVersion,proto3" json:"index_version,omitempty"` diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 804c776293..dfdd02d903 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -315,6 +315,8 @@ type commonConfig struct { EnabledGrowingSegmentJSONKeyStats ParamItem `refreshable:"true"` EnableConfigParamTypeCheck ParamItem `refreshable:"true"` + + EnablePosixMode ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -1154,6 +1156,15 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnableConfigParamTypeCheck.Init(base.mgr) + + p.EnablePosixMode = ParamItem{ + Key: "common.enablePosixMode", + Version: "2.6.0", + DefaultValue: "false", + Doc: "Specifies whether to run in POSIX mode for enhanced file system compatibility", + Export: true, + } + p.EnablePosixMode.Init(base.mgr) } type gpuConfig struct { @@ -2927,7 +2938,8 @@ type queryNodeConfig struct { // cache limit // Deprecated: Never used CacheMemoryLimit ParamItem `refreshable:"false"` - MmapDirPath ParamItem `refreshable:"false"` + // Deprecated: Since 2.6.0, use local storage path instead + MmapDirPath ParamItem `refreshable:"false"` // Deprecated: Since 2.4.7, use `MmapVectorField`/`MmapVectorIndex`/`MmapScalarField`/`MmapScalarIndex` instead MmapEnabled ParamItem `refreshable:"false"` MmapVectorField ParamItem `refreshable:"false"` @@ -3535,7 +3547,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin Version: "2.3.0", DefaultValue: "", FallbackKeys: []string{"queryNode.mmapDirPath"}, - Doc: "The folder that storing data files for mmap, setting to a path will enable Milvus to load data with mmap", + Doc: "Deprecated: The folder that storing data files for mmap, setting to a path will enable Milvus to load data with mmap", Formatter: func(v string) string { if len(v) == 0 { return path.Join(base.Get("localStorage.path"), "mmap")