diff --git a/internal/indexnode/index.go b/internal/indexnode/index.go index 8f3e9500e5..e2827cbb84 100644 --- a/internal/indexnode/index.go +++ b/internal/indexnode/index.go @@ -135,16 +135,17 @@ func (index *CIndex) BuildBinaryVecIndexWithoutIds(vectors []byte) error { return HandleCStatus(&status, "BuildBinaryVecIndexWithoutIds failed") } -// Delete removes the pointer to build the index in 'C'. +// Delete removes the pointer to build the index in 'C'. we can ensure that it is idempotent. func (index *CIndex) Delete() error { /* void DeleteIndex(CIndex index); */ + if index.close { + return nil + } C.DeleteIndex(index.indexPtr) index.close = true - // TODO: check if index.indexPtr will be released by golang, though it occupies little memory - // C.free(index.indexPtr) return nil } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 5c5ce24cce..95d2d5f8fe 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -110,6 +110,12 @@ type IndexBuildTask struct { req *indexpb.CreateIndexRequest nodeID UniqueID serializedSize uint64 + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + newTypeParams map[string]string + newIndexParams map[string]string + tr *timerecord.TimeRecorder } // Ctx is the context of index tasks. @@ -136,6 +142,7 @@ func (bt *BaseTask) Name() string { func (it *IndexBuildTask) OnEnqueue() error { it.SetID(it.req.IndexBuildID) log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("taskID", it.ID()), zap.Int64("index buildID", it.req.IndexBuildID)) + it.tr = timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID)) return nil } @@ -215,6 +222,8 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error { if err != nil { log.Error("IndexNode failed to checkIndexMeta", zap.Error(err)) } + msg := fmt.Sprintf("check index meta pre: %v", pre) + it.tr.Record(msg) return err } @@ -235,14 +244,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { return it.checkIndexMeta(ctx, false) } -// Execute actually performs the task of building an index. -func (it *IndexBuildTask) Execute(ctx context.Context) error { - log.Debug("IndexNode IndexBuildTask Execute ...", zap.Int64("buildId", it.req.IndexBuildID)) - sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute") - defer sp.Finish() - tr := timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID)) - var err error - +func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error { typeParams := make(map[string]string) for _, kvPair := range it.req.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() @@ -282,28 +284,12 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { indexParams[key] = value } } + it.newTypeParams = typeParams + it.newIndexParams = indexParams + return nil +} - it.index, err = NewCIndex(typeParams, indexParams) - if err != nil { - log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", - zap.Int64("buildId", it.req.IndexBuildID), - zap.Error(err)) - return err - } - defer func() { - err = it.index.Delete() - if err != nil { - log.Warn("IndexNode IndexBuildTask Execute CIndexDelete failed", - zap.Int64("buildId", it.req.IndexBuildID), - zap.Error(err)) - } - }() - - getKeyByPathNaive := func(path string) string { - // splitElements := strings.Split(path, "/") - // return splitElements[len(splitElements)-1] - return path - } +func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID, storage.FieldData, error) { getValueByPath := func(path string) ([]byte, error) { data, err := it.kv.Load(path) if err != nil { @@ -317,172 +303,234 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { return nil, err } return &Blob{ - Key: getKeyByPathNaive(path), + Key: path, Value: value, }, nil } - getStorageBlobs := func(blobs []*Blob) []*storage.Blob { - return blobs - } toLoadDataPaths := it.req.GetDataPaths() keys := make([]string, len(toLoadDataPaths)) blobs := make([]*Blob, len(toLoadDataPaths)) loadKey := func(idx int) error { - keys[idx] = getKeyByPathNaive(toLoadDataPaths[idx]) + keys[idx] = toLoadDataPaths[idx] blob, err := getBlobByPath(toLoadDataPaths[idx]) if err != nil { return err } - blobs[idx] = blob - return nil } // Use runtime.GOMAXPROCS(0) instead of runtime.NumCPU() // to respect CPU quota of container/pod // gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value - err = funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey") + err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey") if err != nil { log.Warn("loadKey from minio failed", zap.Error(err)) it.internalErr = err // In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned. - return nil + return storage.InvalidUniqueID, nil, err } - log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID)) - tr.Record("loadKey done") - storageBlobs := getStorageBlobs(blobs) + log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID)) + it.tr.Record("load vector data done") + var insertCodec storage.InsertCodec - collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(storageBlobs) + collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(blobs) if err2 != nil { - return err2 + return storage.InvalidUniqueID, nil, err2 } if len(insertData.Data) != 1 { - return errors.New("we expect only one field in deserialized insert data") + return storage.InvalidUniqueID, nil, errors.New("we expect only one field in deserialized insert data") } + it.collectionID = collectionID + it.partitionID = partitionID + it.segmentID = segmentID + log.Debug("IndexNode deserialize data success", zap.Int64("taskID", it.ID()), + zap.Int64("IndexID", it.req.IndexID), zap.Int64("index buildID", it.req.IndexBuildID), - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID)) - tr.Record("deserialize storage blobs done") + zap.Int64("collectionID", it.collectionID), + zap.Int64("partitionID", it.partitionID), + zap.Int64("segmentID", it.segmentID)) - for fieldID, value := range insertData.Data { - // TODO: BinaryVectorFieldData - floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData) - if fOk { - err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data) - if err != nil { - log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err)) - return err - } - tr.Record("build float vector index done") + it.tr.Record("deserialize vector data done") + + // we can ensure that there blobs are in one Field + var data storage.FieldData + var fieldID storage.FieldID + for fID, value := range insertData.Data { + data = value + fieldID = fID + break + } + return fieldID, data, nil +} + +func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob, error) { + var fieldID storage.FieldID + { + var err error + var fieldData storage.FieldData + fieldID, fieldData, err = it.executeStepLoad(ctx) + if err != nil { + return nil, err } - binaryVectorFieldData, bOk := value.(*storage.BinaryVectorFieldData) + floatVectorFieldData, fOk := fieldData.(*storage.FloatVectorFieldData) + if fOk { + err := it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data) + if err != nil { + log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err)) + return nil, err + } + } + binaryVectorFieldData, bOk := fieldData.(*storage.BinaryVectorFieldData) if bOk { - err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data) + err := it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data) if err != nil { log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err)) - return err } - tr.Record("build binary vector index done") + return nil, err } if !fOk && !bOk { - return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData") + return nil, errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData") } + it.tr.Record("build index done") + } - indexBlobs, err := it.index.Serialize() - if err != nil { - log.Error("IndexNode index Serialize failed", zap.Error(err)) - return err - } - tr.Record("serialize index done") + indexBlobs, err := it.index.Serialize() + if err != nil { + log.Error("IndexNode index Serialize failed", zap.Error(err)) + return nil, err + } + it.tr.Record("index serialize done") - codec := storage.NewIndexFileBinlogCodec() - serializedIndexBlobs, err := codec.Serialize( - it.req.IndexBuildID, - it.req.Version, - collectionID, - partitionID, - segmentID, - fieldID, - indexParams, - it.req.IndexName, - it.req.IndexID, - getStorageBlobs(indexBlobs), - ) - if err != nil { - return err - } - tr.Record("serialize index codec done") - it.serializedSize = 0 - for i := range serializedIndexBlobs { - it.serializedSize += uint64(len(serializedIndexBlobs[i].Value)) - } - log.Debug("serialize index codec done", zap.Uint64("serialized index size", it.serializedSize)) + // early release index for gc, and we can ensure that Delete is idempotent. + if err := it.index.Delete(); err != nil { + log.Error("IndexNode IndexBuildTask Execute CIndexDelete failed", + zap.Int64("buildId", it.req.IndexBuildID), + zap.Error(err)) + } - getSavePathByKey := func(key string) string { + var serializedIndexBlobs []*storage.Blob + codec := storage.NewIndexFileBinlogCodec() + serializedIndexBlobs, err = codec.Serialize( + it.req.IndexBuildID, + it.req.Version, + it.collectionID, + it.partitionID, + it.segmentID, + fieldID, + it.newIndexParams, + it.req.IndexName, + it.req.IndexID, + indexBlobs, + ) + if err != nil { + return nil, err + } + it.tr.Record("index codec serialize done") + return serializedIndexBlobs, nil +} - return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)), - strconv.Itoa(int(partitionID)), strconv.Itoa(int(segmentID)), key) - } - saveBlob := func(path string, value []byte) error { - return it.kv.Save(path, string(value)) - } +func (it *IndexBuildTask) executeSave(ctx context.Context, blobs []*storage.Blob) error { + blobCnt := len(blobs) + it.serializedSize = 0 + for i := range blobs { + it.serializedSize += uint64(len(blobs[i].Value)) + } - it.savePaths = make([]string, len(serializedIndexBlobs)) - saveIndexFile := func(idx int) error { - blob := serializedIndexBlobs[idx] - key, value := blob.Key, blob.Value + getSavePathByKey := func(key string) string { + return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)), + strconv.Itoa(int(it.partitionID)), strconv.Itoa(int(it.segmentID)), key) + } - savePath := getSavePathByKey(key) - - saveIndexFileFn := func() error { - v, err := it.etcdKV.Load(it.req.MetaPath) - if err != nil { - log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err)) - return err - } - indexMeta := indexpb.IndexMeta{} - err = proto.Unmarshal([]byte(v), &indexMeta) - if err != nil { - log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err)) - return err - } - //log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) - if indexMeta.Version > it.req.Version { - log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version), - zap.Any("indexMeta.Version", indexMeta.Version)) - return errors.New("This task has been reassigned, check indexMeta.version and request ") - } - return saveBlob(savePath, value) - } - err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5)) + it.savePaths = make([]string, blobCnt) + saveIndexFile := func(idx int) error { + blob := blobs[idx] + savePath := getSavePathByKey(blob.Key) + saveIndexFileFn := func() error { + v, err := it.etcdKV.Load(it.req.MetaPath) if err != nil { - log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath)) + log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err)) return err } - - it.savePaths[idx] = savePath - - return nil + indexMeta := indexpb.IndexMeta{} + err = proto.Unmarshal([]byte(v), &indexMeta) + if err != nil { + log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err)) + return err + } + //log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) + if indexMeta.Version > it.req.Version { + log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version), + zap.Any("indexMeta.Version", indexMeta.Version)) + return errors.New("This task has been reassigned, check indexMeta.version and request ") + } + return it.kv.Save(savePath, string(blob.Value)) } - err = funcutil.ProcessFuncParallel(len(serializedIndexBlobs), runtime.NumCPU(), saveIndexFile, "saveIndexFile") + err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5)) if err != nil { - log.Warn("saveIndexFile to minio failed", zap.Error(err)) - it.internalErr = err - // In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned. - return nil + log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath)) + return err } - tr.Record("save index file done") + it.savePaths[idx] = savePath + return nil + } + + err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile") + if err != nil { + log.Warn("saveIndexFile to minio failed", zap.Error(err)) + // In this case, we intend not to return err, otherwise the task will be marked as failed. + it.internalErr = err } - log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", collectionID), - zap.Int64("partition", partitionID), zap.Int64("segment", segmentID)) - tr.Elapse("all done") + return nil +} + +// Execute actually performs the task of building an index. +func (it *IndexBuildTask) Execute(ctx context.Context) error { + log.Debug("IndexNode IndexBuildTask Execute ...", zap.Int64("buildId", it.req.IndexBuildID)) + sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute") + defer sp.Finish() + + if err := it.executePrepareParams(ctx); err != nil { + return err + } + + var err error + it.index, err = NewCIndex(it.newTypeParams, it.newIndexParams) + if err != nil { + log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", + zap.Int64("buildId", it.req.IndexBuildID), + zap.Error(err)) + return err + } + + defer func() { + err := it.index.Delete() + if err != nil { + log.Error("IndexNode IndexBuildTask Execute CIndexDelete failed", + zap.Int64("buildId", it.req.IndexBuildID), + zap.Error(err)) + } + }() + + var blobs []*storage.Blob + blobs, err = it.executeStepBuild(ctx) + if err != nil { + return err + } + + err = it.executeSave(ctx, blobs) + if err != nil { + return err + } + it.tr.Record("index file save done") + it.tr.Elapse("index building all done") + log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), + zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID)) return nil } diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go index 616ee76f54..adcc01f72f 100644 --- a/internal/util/typeutil/type.go +++ b/internal/util/typeutil/type.go @@ -45,3 +45,5 @@ const ( // DataNodeRole is a constant represent DataNode DataNodeRole = "datanode" ) + +const InvalidUniqueID = UniqueID(-1)