enhance: Deprecate disk params about indexing (#41045)

issue: #40863

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-04-07 11:36:34 +08:00 committed by GitHub
parent 0a378dc308
commit 05e25431d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 8 additions and 245 deletions

View File

@ -507,8 +507,6 @@ indexCoord:
indexNode: indexNode:
scheduler: scheduler:
buildParallel: 1 buildParallel: 1
enableDisk: true # enable build disk vector index
maxDiskUsagePercentage: 95
dataCoord: dataCoord:
channel: channel:

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/parser/planparserv2" "github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/util/indexparamcheck" "github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
@ -340,13 +339,6 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil return merr.Status(err), nil
} }
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all DataNodes do not support disk indexes, please verify"
log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
} }
// exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param // exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey}) typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})

View File

@ -44,7 +44,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -259,7 +258,7 @@ func TestServer_CreateIndex(t *testing.T) {
assert.Error(t, merr.CheckRPCCall(resp, err)) assert.Error(t, merr.CheckRPCCall(resp, err))
}) })
t.Run("not support disk index", func(t *testing.T) { t.Run("disk index", func(t *testing.T) {
s.allocator = mock0Allocator s.allocator = mock0Allocator
s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{} s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{}
req.IndexParams = []*commonpb.KeyValuePair{ req.IndexParams = []*commonpb.KeyValuePair{
@ -270,7 +269,7 @@ func TestServer_CreateIndex(t *testing.T) {
} }
s.indexNodeManager = session.NewNodeManager(ctx, defaultDataNodeCreatorFunc) s.indexNodeManager = session.NewNodeManager(ctx, defaultDataNodeCreatorFunc)
resp, err := s.CreateIndex(ctx, req) resp, err := s.CreateIndex(ctx, req)
assert.Error(t, merr.CheckRPCCall(resp, err)) assert.NoError(t, merr.CheckRPCCall(resp, err))
}) })
t.Run("disk index with mmap", func(t *testing.T) { t.Run("disk index with mmap", func(t *testing.T) {
@ -290,10 +289,6 @@ func TestServer_CreateIndex(t *testing.T) {
s.indexNodeManager = nodeManager s.indexNodeManager = nodeManager
mockNode := mocks.NewMockDataNodeClient(t) mockNode := mocks.NewMockDataNodeClient(t)
nodeManager.SetClient(1001, mockNode) nodeManager.SetClient(1001, mockNode)
mockNode.EXPECT().GetJobStats(mock.Anything, mock.Anything).Return(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
EnableDisk: true,
}, nil)
resp, err := s.CreateIndex(ctx, req) resp, err := s.CreateIndex(ctx, req)
assert.Error(t, merr.CheckRPCCall(resp, err)) assert.Error(t, merr.CheckRPCCall(resp, err))

View File

@ -150,7 +150,7 @@ type Server struct {
// indexCoord types.IndexCoord // indexCoord types.IndexCoord
// segReferManager *SegmentReferenceManager // segReferManager *SegmentReferenceManager
indexNodeManager *session.IndexNodeManager indexNodeManager session.WorkerManager
indexEngineVersionManager IndexEngineVersionManager indexEngineVersionManager IndexEngineVersionManager
taskScheduler *taskScheduler taskScheduler *taskScheduler

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -45,7 +44,6 @@ type WorkerManager interface {
StoppingNode(nodeID typeutil.UniqueID) StoppingNode(nodeID typeutil.UniqueID)
PickClient() (typeutil.UniqueID, types.DataNodeClient) PickClient() (typeutil.UniqueID, types.DataNodeClient)
QuerySlots() map[int64]int64 QuerySlots() map[int64]int64
ClientSupportDisk() bool
GetAllClients() map[typeutil.UniqueID]types.DataNodeClient GetAllClients() map[typeutil.UniqueID]types.DataNodeClient
GetClientByID(nodeID typeutil.UniqueID) (types.DataNodeClient, bool) GetClientByID(nodeID typeutil.UniqueID) (types.DataNodeClient, bool)
} }
@ -205,57 +203,6 @@ func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.DataNodeClien
return 0, nil return 0, nil
} }
func (nm *IndexNodeManager) ClientSupportDisk() bool {
log := log.Ctx(nm.ctx)
log.Debug("check if client support disk index")
allClients := nm.GetAllClients()
if len(allClients) == 0 {
log.Warn("there is no IndexNode online")
return false
}
// Note: In order to quickly end other goroutines, an error is returned when the client is successfully selected
ctx, cancel := context.WithCancel(nm.ctx)
var (
enableDisk = false
nodeMutex = lock.Mutex{}
wg = sync.WaitGroup{}
)
for nodeID, client := range allClients {
nodeID := nodeID
client := client
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
log.Debug("get job stats success", zap.Int64("nodeID", nodeID), zap.Bool("enable disk", resp.GetEnableDisk()))
if resp.GetEnableDisk() {
nodeMutex.Lock()
defer nodeMutex.Unlock()
cancel()
if !enableDisk {
enableDisk = true
}
return
}
}()
}
wg.Wait()
cancel()
if enableDisk {
log.Info("IndexNode support disk index")
return true
}
log.Error("all IndexNodes do not support disk indexes")
return false
}
func (nm *IndexNodeManager) GetAllClients() map[typeutil.UniqueID]types.DataNodeClient { func (nm *IndexNodeManager) GetAllClients() map[typeutil.UniqueID]types.DataNodeClient {
nm.lock.RLock() nm.lock.RLock()
defer nm.lock.RUnlock() defer nm.lock.RUnlock()

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
typeutil "github.com/milvus-io/milvus/pkg/v2/util/typeutil" typeutil "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -100,95 +99,6 @@ func TestIndexNodeManager_PickClient(t *testing.T) {
}) })
} }
func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
paramtable.Init()
getMockedGetJobStatsClient := func(resp *workerpb.GetJobStatsResponse, err error) types.DataNodeClient {
ic := mocks.NewMockDataNodeClient(t)
ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything).Return(resp, err)
return ic
}
err := errors.New("error")
t.Run("support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.DataNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
TaskSlots: 1,
JobInfos: nil,
EnableDisk: true,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.True(t, support)
})
t.Run("not support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.DataNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
TaskSlots: 1,
JobInfos: nil,
EnableDisk: false,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("no indexnode", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.DataNodeClient{},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("error", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.DataNodeClient{
1: getMockedGetJobStatsClient(nil, err),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("fail reason", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.DataNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Status(err),
TaskSlots: 0,
JobInfos: nil,
EnableDisk: false,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
}
func TestNodeManager_StoppingNode(t *testing.T) { func TestNodeManager_StoppingNode(t *testing.T) {
paramtable.Init() paramtable.Init()
nm := NewNodeManager(context.Background(), defaultIndexNodeCreatorFunc) nm := NewNodeManager(context.Background(), defaultIndexNodeCreatorFunc)

View File

@ -23,7 +23,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/cockroachdb/errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -219,35 +218,13 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
indexType := it.newIndexParams[common.IndexTypeKey] indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64 var fieldDataSize uint64
var err error
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) { if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !paramtable.Get().DataNodeCfg.EnableDisk.GetAsBool() {
log.Warn("don't support build disk index",
zap.String("index type", it.newIndexParams[common.IndexTypeKey]),
zap.Bool("enable disk", paramtable.Get().DataNodeCfg.EnableDisk.GetAsBool()))
return errors.New("index node don't support build disk index")
}
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Warn("get local used size failed")
return err
}
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType()) fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil { if err != nil {
log.Warn("get local used size failed") log.Warn("get local used size failed")
return err return err
} }
usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*DiskUsageRatio) + localUsedSize
maxUsedLocalSize := int64(paramtable.Get().DataNodeCfg.DiskCapacityLimit.GetAsFloat() * paramtable.Get().DataNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if usedLocalSizeWhenBuild > maxUsedLocalSize {
log.Warn("don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return errors.New("index node don't has enough disk size to build disk ann index")
}
err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize)) err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize))
if err != nil { if err != nil {
@ -314,8 +291,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
LackBinlogRows: it.req.GetLackBinlogRows(), LackBinlogRows: it.req.GetLackBinlogRows(),
} }
log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams)) log.Info("create index", zap.Any("buildIndexParams", buildIndexParams))
var err error
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams) it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil { if err != nil {

View File

@ -218,7 +218,6 @@ func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStats
InProgressJobNum: int64(active), InProgressJobNum: int64(active),
EnqueueJobNum: int64(unissued), EnqueueJobNum: int64(unissued),
TaskSlots: int64(slots), TaskSlots: int64(slots),
EnableDisk: paramtable.Get().DataNodeCfg.EnableDisk.GetAsBool(),
}, nil }, nil
} }

View File

@ -104,7 +104,6 @@ type SessionRaw struct {
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"` LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
HostName string `json:"HostName,omitempty"` HostName string `json:"HostName,omitempty"`
EnableDisk bool `json:"EnableDisk,omitempty"`
ServerLabels map[string]string `json:"ServerLabels,omitempty"` ServerLabels map[string]string `json:"ServerLabels,omitempty"`
} }
@ -193,12 +192,6 @@ func WithScalarIndexEngineVersion(minimal, current int32) SessionOption {
} }
} }
func WithEnableDisk(enableDisk bool) SessionOption {
return func(s *Session) {
s.EnableDisk = enableDisk
}
}
func (s *Session) apply(opts ...SessionOption) { func (s *Session) apply(opts ...SessionOption) {
for _, opt := range opts { for _, opt := range opts {
opt(s) opt(s)

View File

@ -86,6 +86,7 @@ message GetJobStatsResponse {
int64 enqueue_job_num = 4; int64 enqueue_job_num = 4;
int64 task_slots = 5; int64 task_slots = 5;
repeated index.JobInfo job_infos = 6; repeated index.JobInfo job_infos = 6;
// deprecated
bool enable_disk = 7; bool enable_disk = 7;
} }

View File

@ -511,6 +511,7 @@ type GetJobStatsResponse struct {
EnqueueJobNum int64 `protobuf:"varint,4,opt,name=enqueue_job_num,json=enqueueJobNum,proto3" json:"enqueue_job_num,omitempty"` EnqueueJobNum int64 `protobuf:"varint,4,opt,name=enqueue_job_num,json=enqueueJobNum,proto3" json:"enqueue_job_num,omitempty"`
TaskSlots int64 `protobuf:"varint,5,opt,name=task_slots,json=taskSlots,proto3" json:"task_slots,omitempty"` TaskSlots int64 `protobuf:"varint,5,opt,name=task_slots,json=taskSlots,proto3" json:"task_slots,omitempty"`
JobInfos []*indexpb.JobInfo `protobuf:"bytes,6,rep,name=job_infos,json=jobInfos,proto3" json:"job_infos,omitempty"` JobInfos []*indexpb.JobInfo `protobuf:"bytes,6,rep,name=job_infos,json=jobInfos,proto3" json:"job_infos,omitempty"`
// deprecated
EnableDisk bool `protobuf:"varint,7,opt,name=enable_disk,json=enableDisk,proto3" json:"enable_disk,omitempty"` EnableDisk bool `protobuf:"varint,7,opt,name=enable_disk,json=enableDisk,proto3" json:"enable_disk,omitempty"`
} }

View File

@ -5039,52 +5039,6 @@ if this parameter <= 0, will set it as 10`,
Export: true, Export: true,
} }
p.BuildParallel.Init(base.mgr) p.BuildParallel.Init(base.mgr)
p.EnableDisk = ParamItem{
Key: "indexNode.enableDisk",
Version: "2.2.0",
DefaultValue: "false",
PanicIfEmpty: true,
Doc: "enable build disk vector index",
Export: true,
}
p.EnableDisk.Init(base.mgr)
p.DiskCapacityLimit = ParamItem{
Key: "LOCAL_STORAGE_SIZE",
Version: "2.2.0",
Formatter: func(v string) string {
if len(v) == 0 {
// use local storage path to check correct device
localStoragePath := base.Get("localStorage.path")
if _, err := os.Stat(localStoragePath); os.IsNotExist(err) {
if err := os.MkdirAll(localStoragePath, os.ModePerm); err != nil {
log.Fatal("failed to mkdir", zap.String("localStoragePath", localStoragePath), zap.Error(err))
}
}
diskUsage, err := disk.Usage(localStoragePath)
if err != nil {
log.Fatal("failed to get disk usage", zap.String("localStoragePath", localStoragePath), zap.Error(err))
}
return strconv.FormatUint(diskUsage.Total, 10)
}
diskSize := getAsInt64(v)
return strconv.FormatInt(diskSize*1024*1024*1024, 10)
},
}
p.DiskCapacityLimit.Init(base.mgr)
p.MaxDiskUsagePercentage = ParamItem{
Key: "indexNode.maxDiskUsagePercentage",
Version: "2.2.0",
DefaultValue: "95",
PanicIfEmpty: true,
Formatter: func(v string) string {
return fmt.Sprintf("%f", getAsFloat(v)/100)
},
Export: true,
}
p.MaxDiskUsagePercentage.Init(base.mgr)
} }
type streamingConfig struct { type streamingConfig struct {

View File

@ -689,9 +689,6 @@ func TestCachedParam(t *testing.T) {
Init() Init()
params := Get() params := Get()
assert.True(t, params.DataNodeCfg.EnableDisk.GetAsBool())
assert.True(t, params.DataNodeCfg.EnableDisk.GetAsBool())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt()) assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt()) assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())