mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Clarify the index building process (#15044)
Reduce peak memory usage Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
6323907566
commit
1aafb56694
@ -135,16 +135,17 @@ func (index *CIndex) BuildBinaryVecIndexWithoutIds(vectors []byte) error {
|
||||
return HandleCStatus(&status, "BuildBinaryVecIndexWithoutIds failed")
|
||||
}
|
||||
|
||||
// Delete removes the pointer to build the index in 'C'.
|
||||
// Delete removes the pointer to build the index in 'C'. we can ensure that it is idempotent.
|
||||
func (index *CIndex) Delete() error {
|
||||
/*
|
||||
void
|
||||
DeleteIndex(CIndex index);
|
||||
*/
|
||||
if index.close {
|
||||
return nil
|
||||
}
|
||||
C.DeleteIndex(index.indexPtr)
|
||||
index.close = true
|
||||
// TODO: check if index.indexPtr will be released by golang, though it occupies little memory
|
||||
// C.free(index.indexPtr)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -110,6 +110,12 @@ type IndexBuildTask struct {
|
||||
req *indexpb.CreateIndexRequest
|
||||
nodeID UniqueID
|
||||
serializedSize uint64
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
segmentID UniqueID
|
||||
newTypeParams map[string]string
|
||||
newIndexParams map[string]string
|
||||
tr *timerecord.TimeRecorder
|
||||
}
|
||||
|
||||
// Ctx is the context of index tasks.
|
||||
@ -136,6 +142,7 @@ func (bt *BaseTask) Name() string {
|
||||
func (it *IndexBuildTask) OnEnqueue() error {
|
||||
it.SetID(it.req.IndexBuildID)
|
||||
log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("taskID", it.ID()), zap.Int64("index buildID", it.req.IndexBuildID))
|
||||
it.tr = timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -215,6 +222,8 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
||||
if err != nil {
|
||||
log.Error("IndexNode failed to checkIndexMeta", zap.Error(err))
|
||||
}
|
||||
msg := fmt.Sprintf("check index meta pre: %v", pre)
|
||||
it.tr.Record(msg)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -235,14 +244,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
return it.checkIndexMeta(ctx, false)
|
||||
}
|
||||
|
||||
// Execute actually performs the task of building an index.
|
||||
func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
log.Debug("IndexNode IndexBuildTask Execute ...", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
|
||||
defer sp.Finish()
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID))
|
||||
var err error
|
||||
|
||||
func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error {
|
||||
typeParams := make(map[string]string)
|
||||
for _, kvPair := range it.req.GetTypeParams() {
|
||||
key, value := kvPair.GetKey(), kvPair.GetValue()
|
||||
@ -282,28 +284,12 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
indexParams[key] = value
|
||||
}
|
||||
}
|
||||
it.newTypeParams = typeParams
|
||||
it.newIndexParams = indexParams
|
||||
return nil
|
||||
}
|
||||
|
||||
it.index, err = NewCIndex(typeParams, indexParams)
|
||||
if err != nil {
|
||||
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Warn("IndexNode IndexBuildTask Execute CIndexDelete failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
getKeyByPathNaive := func(path string) string {
|
||||
// splitElements := strings.Split(path, "/")
|
||||
// return splitElements[len(splitElements)-1]
|
||||
return path
|
||||
}
|
||||
func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID, storage.FieldData, error) {
|
||||
getValueByPath := func(path string) ([]byte, error) {
|
||||
data, err := it.kv.Load(path)
|
||||
if err != nil {
|
||||
@ -317,172 +303,234 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
return nil, err
|
||||
}
|
||||
return &Blob{
|
||||
Key: getKeyByPathNaive(path),
|
||||
Key: path,
|
||||
Value: value,
|
||||
}, nil
|
||||
}
|
||||
getStorageBlobs := func(blobs []*Blob) []*storage.Blob {
|
||||
return blobs
|
||||
}
|
||||
|
||||
toLoadDataPaths := it.req.GetDataPaths()
|
||||
keys := make([]string, len(toLoadDataPaths))
|
||||
blobs := make([]*Blob, len(toLoadDataPaths))
|
||||
|
||||
loadKey := func(idx int) error {
|
||||
keys[idx] = getKeyByPathNaive(toLoadDataPaths[idx])
|
||||
keys[idx] = toLoadDataPaths[idx]
|
||||
blob, err := getBlobByPath(toLoadDataPaths[idx])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blobs[idx] = blob
|
||||
|
||||
return nil
|
||||
}
|
||||
// Use runtime.GOMAXPROCS(0) instead of runtime.NumCPU()
|
||||
// to respect CPU quota of container/pod
|
||||
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
|
||||
err = funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey")
|
||||
err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey")
|
||||
if err != nil {
|
||||
log.Warn("loadKey from minio failed", zap.Error(err))
|
||||
it.internalErr = err
|
||||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return nil
|
||||
return storage.InvalidUniqueID, nil, err
|
||||
}
|
||||
log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
tr.Record("loadKey done")
|
||||
|
||||
storageBlobs := getStorageBlobs(blobs)
|
||||
log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
it.tr.Record("load vector data done")
|
||||
|
||||
var insertCodec storage.InsertCodec
|
||||
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(storageBlobs)
|
||||
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(blobs)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
return storage.InvalidUniqueID, nil, err2
|
||||
}
|
||||
if len(insertData.Data) != 1 {
|
||||
return errors.New("we expect only one field in deserialized insert data")
|
||||
return storage.InvalidUniqueID, nil, errors.New("we expect only one field in deserialized insert data")
|
||||
}
|
||||
it.collectionID = collectionID
|
||||
it.partitionID = partitionID
|
||||
it.segmentID = segmentID
|
||||
|
||||
log.Debug("IndexNode deserialize data success",
|
||||
zap.Int64("taskID", it.ID()),
|
||||
zap.Int64("IndexID", it.req.IndexID),
|
||||
zap.Int64("index buildID", it.req.IndexBuildID),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID))
|
||||
tr.Record("deserialize storage blobs done")
|
||||
zap.Int64("collectionID", it.collectionID),
|
||||
zap.Int64("partitionID", it.partitionID),
|
||||
zap.Int64("segmentID", it.segmentID))
|
||||
|
||||
for fieldID, value := range insertData.Data {
|
||||
// TODO: BinaryVectorFieldData
|
||||
floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData)
|
||||
if fOk {
|
||||
err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data)
|
||||
if err != nil {
|
||||
log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
tr.Record("build float vector index done")
|
||||
it.tr.Record("deserialize vector data done")
|
||||
|
||||
// we can ensure that there blobs are in one Field
|
||||
var data storage.FieldData
|
||||
var fieldID storage.FieldID
|
||||
for fID, value := range insertData.Data {
|
||||
data = value
|
||||
fieldID = fID
|
||||
break
|
||||
}
|
||||
return fieldID, data, nil
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob, error) {
|
||||
var fieldID storage.FieldID
|
||||
{
|
||||
var err error
|
||||
var fieldData storage.FieldData
|
||||
fieldID, fieldData, err = it.executeStepLoad(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
binaryVectorFieldData, bOk := value.(*storage.BinaryVectorFieldData)
|
||||
floatVectorFieldData, fOk := fieldData.(*storage.FloatVectorFieldData)
|
||||
if fOk {
|
||||
err := it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data)
|
||||
if err != nil {
|
||||
log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
binaryVectorFieldData, bOk := fieldData.(*storage.BinaryVectorFieldData)
|
||||
if bOk {
|
||||
err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data)
|
||||
err := it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data)
|
||||
if err != nil {
|
||||
log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
tr.Record("build binary vector index done")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !fOk && !bOk {
|
||||
return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData")
|
||||
return nil, errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData")
|
||||
}
|
||||
it.tr.Record("build index done")
|
||||
}
|
||||
|
||||
indexBlobs, err := it.index.Serialize()
|
||||
if err != nil {
|
||||
log.Error("IndexNode index Serialize failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
tr.Record("serialize index done")
|
||||
indexBlobs, err := it.index.Serialize()
|
||||
if err != nil {
|
||||
log.Error("IndexNode index Serialize failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
it.tr.Record("index serialize done")
|
||||
|
||||
codec := storage.NewIndexFileBinlogCodec()
|
||||
serializedIndexBlobs, err := codec.Serialize(
|
||||
it.req.IndexBuildID,
|
||||
it.req.Version,
|
||||
collectionID,
|
||||
partitionID,
|
||||
segmentID,
|
||||
fieldID,
|
||||
indexParams,
|
||||
it.req.IndexName,
|
||||
it.req.IndexID,
|
||||
getStorageBlobs(indexBlobs),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr.Record("serialize index codec done")
|
||||
it.serializedSize = 0
|
||||
for i := range serializedIndexBlobs {
|
||||
it.serializedSize += uint64(len(serializedIndexBlobs[i].Value))
|
||||
}
|
||||
log.Debug("serialize index codec done", zap.Uint64("serialized index size", it.serializedSize))
|
||||
// early release index for gc, and we can ensure that Delete is idempotent.
|
||||
if err := it.index.Delete(); err != nil {
|
||||
log.Error("IndexNode IndexBuildTask Execute CIndexDelete failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
getSavePathByKey := func(key string) string {
|
||||
var serializedIndexBlobs []*storage.Blob
|
||||
codec := storage.NewIndexFileBinlogCodec()
|
||||
serializedIndexBlobs, err = codec.Serialize(
|
||||
it.req.IndexBuildID,
|
||||
it.req.Version,
|
||||
it.collectionID,
|
||||
it.partitionID,
|
||||
it.segmentID,
|
||||
fieldID,
|
||||
it.newIndexParams,
|
||||
it.req.IndexName,
|
||||
it.req.IndexID,
|
||||
indexBlobs,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
it.tr.Record("index codec serialize done")
|
||||
return serializedIndexBlobs, nil
|
||||
}
|
||||
|
||||
return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)),
|
||||
strconv.Itoa(int(partitionID)), strconv.Itoa(int(segmentID)), key)
|
||||
}
|
||||
saveBlob := func(path string, value []byte) error {
|
||||
return it.kv.Save(path, string(value))
|
||||
}
|
||||
func (it *IndexBuildTask) executeSave(ctx context.Context, blobs []*storage.Blob) error {
|
||||
blobCnt := len(blobs)
|
||||
it.serializedSize = 0
|
||||
for i := range blobs {
|
||||
it.serializedSize += uint64(len(blobs[i].Value))
|
||||
}
|
||||
|
||||
it.savePaths = make([]string, len(serializedIndexBlobs))
|
||||
saveIndexFile := func(idx int) error {
|
||||
blob := serializedIndexBlobs[idx]
|
||||
key, value := blob.Key, blob.Value
|
||||
getSavePathByKey := func(key string) string {
|
||||
return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)),
|
||||
strconv.Itoa(int(it.partitionID)), strconv.Itoa(int(it.segmentID)), key)
|
||||
}
|
||||
|
||||
savePath := getSavePathByKey(key)
|
||||
|
||||
saveIndexFileFn := func() error {
|
||||
v, err := it.etcdKV.Load(it.req.MetaPath)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
err = proto.Unmarshal([]byte(v), &indexMeta)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version {
|
||||
log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version),
|
||||
zap.Any("indexMeta.Version", indexMeta.Version))
|
||||
return errors.New("This task has been reassigned, check indexMeta.version and request ")
|
||||
}
|
||||
return saveBlob(savePath, value)
|
||||
}
|
||||
err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5))
|
||||
it.savePaths = make([]string, blobCnt)
|
||||
saveIndexFile := func(idx int) error {
|
||||
blob := blobs[idx]
|
||||
savePath := getSavePathByKey(blob.Key)
|
||||
saveIndexFileFn := func() error {
|
||||
v, err := it.etcdKV.Load(it.req.MetaPath)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
it.savePaths[idx] = savePath
|
||||
|
||||
return nil
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
err = proto.Unmarshal([]byte(v), &indexMeta)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version {
|
||||
log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version),
|
||||
zap.Any("indexMeta.Version", indexMeta.Version))
|
||||
return errors.New("This task has been reassigned, check indexMeta.version and request ")
|
||||
}
|
||||
return it.kv.Save(savePath, string(blob.Value))
|
||||
}
|
||||
err = funcutil.ProcessFuncParallel(len(serializedIndexBlobs), runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5))
|
||||
if err != nil {
|
||||
log.Warn("saveIndexFile to minio failed", zap.Error(err))
|
||||
it.internalErr = err
|
||||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return nil
|
||||
log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
return err
|
||||
}
|
||||
tr.Record("save index file done")
|
||||
it.savePaths[idx] = savePath
|
||||
return nil
|
||||
}
|
||||
|
||||
err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
if err != nil {
|
||||
log.Warn("saveIndexFile to minio failed", zap.Error(err))
|
||||
// In this case, we intend not to return err, otherwise the task will be marked as failed.
|
||||
it.internalErr = err
|
||||
}
|
||||
log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", collectionID),
|
||||
zap.Int64("partition", partitionID), zap.Int64("segment", segmentID))
|
||||
tr.Elapse("all done")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Execute actually performs the task of building an index.
|
||||
func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
log.Debug("IndexNode IndexBuildTask Execute ...", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
|
||||
defer sp.Finish()
|
||||
|
||||
if err := it.executePrepareParams(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
it.index, err = NewCIndex(it.newTypeParams, it.newIndexParams)
|
||||
if err != nil {
|
||||
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := it.index.Delete()
|
||||
if err != nil {
|
||||
log.Error("IndexNode IndexBuildTask Execute CIndexDelete failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
var blobs []*storage.Blob
|
||||
blobs, err = it.executeStepBuild(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = it.executeSave(ctx, blobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
it.tr.Record("index file save done")
|
||||
it.tr.Elapse("index building all done")
|
||||
log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
|
||||
zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -45,3 +45,5 @@ const (
|
||||
// DataNodeRole is a constant represent DataNode
|
||||
DataNodeRole = "datanode"
|
||||
)
|
||||
|
||||
const InvalidUniqueID = UniqueID(-1)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user