From 6bc2efe4292b1c6e3ea641f612fff57a301f90b2 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 24 Jan 2022 17:18:46 +0800 Subject: [PATCH] Fixbug: IndexNode should panic when save meta failed to MetaKV (#15347) Signed-off-by: zhenshan.cao --- Makefile | 4 + internal/indexnode/indexnode_test.go | 20 +-- internal/indexnode/task.go | 188 ++++++++++++++++---------- internal/indexnode/task_scheduler.go | 7 + internal/indexnode/task_state.go | 41 ++++++ internal/indexnode/task_state_test.go | 30 ++++ 6 files changed, 209 insertions(+), 81 deletions(-) create mode 100644 internal/indexnode/task_state.go create mode 100644 internal/indexnode/task_state_test.go diff --git a/Makefile b/Makefile index 7fd218c2e8..0f6d0d1c3c 100644 --- a/Makefile +++ b/Makefile @@ -120,6 +120,10 @@ build-cpp-with-unittest: # Run the tests. unittest: test-cpp test-go +test-indexnode: + @echo "Running go unittests..." + go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/indexnode -v + test-proxy: @echo "Running go unittests..." go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/proxy -v diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index e96b08486a..334c071a28 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -305,7 +305,7 @@ func TestIndexNode(t *testing.T) { defer in.etcdKV.RemoveWithPrefix(metaPath2) }) - t.Run("Create Deleted_Index", func(t *testing.T) { + t.Run("Create DeletedIndex", func(t *testing.T) { var insertCodec storage.InsertCodec insertCodec.Schema = &etcdpb.CollectionMeta{ @@ -398,19 +398,21 @@ func TestIndexNode(t *testing.T) { status, err2 := in.CreateIndex(ctx, req) assert.Nil(t, err2) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - + time.Sleep(100 * time.Millisecond) strValue, err3 := in.etcdKV.Load(metaPath3) assert.Nil(t, err3) indexMetaTmp := indexpb.IndexMeta{} err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) assert.Nil(t, err) - for indexMetaTmp.State != commonpb.IndexState_Finished { - time.Sleep(100 * time.Millisecond) - strValue, err := in.etcdKV.Load(metaPath3) - assert.Nil(t, err) - err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) - assert.Nil(t, err) - } + assert.Equal(t, true, indexMetaTmp.MarkDeleted) + assert.Equal(t, int64(1), indexMetaTmp.Version) + //for indexMetaTmp.State != commonpb.IndexState_Finished { + // time.Sleep(100 * time.Millisecond) + // strValue, err := in.etcdKV.Load(metaPath3) + // assert.Nil(t, err) + // err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) + // assert.Nil(t, err) + //} defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths) defer func() { for k := range kvs { diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index bc83963185..773b293516 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -60,6 +60,8 @@ type task interface { Notify(err error) OnEnqueue() error SetError(err error) + SetState(state TaskState) + GetState() TaskState } // BaseTask is an basic instance of task. @@ -69,6 +71,17 @@ type BaseTask struct { id UniqueID err error internalErr error + state TaskState +} + +// SetState sets task's state. +func (bt *BaseTask) SetState(state TaskState) { + bt.state = state +} + +// GetState gets task's state. +func (bt *BaseTask) GetState() TaskState { + return bt.state } // SetError sets an error to task. @@ -142,90 +155,105 @@ func (bt *BaseTask) Name() string { // OnEnqueue enqueues indexing tasks. func (it *IndexBuildTask) OnEnqueue() error { it.SetID(it.req.IndexBuildID) + it.SetState(TaskStateNormal) log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("taskID", it.ID()), zap.Int64("index buildID", it.req.IndexBuildID)) it.tr = timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID)) return nil } -// checkIndexMeta load meta from etcd to determine whether the task should continue execution. -func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error { +// loadIndexMeta load meta from etcd. +func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta, int64, error) { + indexMeta := indexpb.IndexMeta{} + var source int64 fn := func() error { //TODO error handling need to be optimized, return Unrecoverable to avoid retry - indexMeta := indexpb.IndexMeta{} _, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath) if err != nil { - log.Error("IndexNode checkIndexMeta", zap.Any("load meta error with path", it.req.MetaPath), - zap.Error(err), zap.Any("pre", pre)) return err } if len(values) == 0 { - return fmt.Errorf("IndexNode checkIndexMeta the indexMeta is empty") + return fmt.Errorf("IndexNode loadIndexMeta get empty") } - log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre)) err = proto.Unmarshal([]byte(values[0]), &indexMeta) if err != nil { - log.Error("IndexNode failed to unmarshal index meta", zap.Error(err)) return err } - log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta)) - if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { - log.Info("IndexNode checkIndexMeta version mismatch", - zap.Any("req version", it.req.Version), - zap.Any("index meta version", indexMeta.Version)) - return nil - } - if indexMeta.MarkDeleted { - indexMeta.State = commonpb.IndexState_Finished - v, err := proto.Marshal(&indexMeta) - if err != nil { - return err - } - err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(v)) - if err != nil { - return err - } - errMsg := fmt.Sprintf("the index has been deleted with indexBuildID %d", indexMeta.IndexBuildID) - log.Warn(errMsg) - return fmt.Errorf(errMsg) - } - if pre { - return nil - } - indexMeta.IndexFilePaths = it.savePaths - indexMeta.State = commonpb.IndexState_Finished - indexMeta.SerializeSize = it.serializedSize - // Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added. - if it.err != nil { - log.Error("IndexNode CreateIndex failed and can not be retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.err)) - indexMeta.State = commonpb.IndexState_Failed - indexMeta.FailReason = it.err.Error() - } else if it.internalErr != nil { - log.Error("IndexNode CreateIndex failed, but it can retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.internalErr)) - indexMeta.State = commonpb.IndexState_Unissued - } + source = versions[0] + return nil + } + err := retry.Do(ctx, fn, retry.Attempts(3)) + if err != nil { + return nil, -1, err + } + return &indexMeta, source, nil +} - log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State)) - var metaValue []byte - metaValue, err = proto.Marshal(&indexMeta) - if err != nil { - log.Warn("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State), - zap.Any("proto.Marshal failed:", err)) - return err - } - err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(metaValue)) - if err != nil { - log.Warn("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err)) - } +func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskState { + if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { + it.SetState(TaskStateAbandon) + } else if indexMeta.MarkDeleted { + it.SetState(TaskStateAbandon) + } + return it.GetState() +} + +// saveIndexMeta try to save index meta to metaKV. +// if failed, IndexNode will panic to inform indexcoord. +func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error { + defer it.tr.Record("IndexNode IndexBuildTask saveIndexMeta") + indexMeta, version, err := it.loadIndexMeta(ctx) + if err != nil { + errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID) + panic(errMsg) + } + + taskState := it.updateTaskState(indexMeta) + if taskState == TaskStateAbandon { + log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()), + zap.Int64("IndexBuildID", indexMeta.IndexBuildID)) return nil } - err := retry.Do(ctx, fn, retry.Attempts(3)) - if err != nil { - log.Error("IndexNode failed to checkIndexMeta", zap.Error(err)) + indexMeta.IndexFilePaths = it.savePaths + indexMeta.SerializeSize = it.serializedSize + + if taskState == TaskStateFailed { + log.Error("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Failed", + zap.String("TaskState", taskState.String()), + zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.err)) + indexMeta.State = commonpb.IndexState_Failed + indexMeta.FailReason = it.err.Error() + } else if taskState == TaskStateRetry { + log.Info("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Unissued", + zap.String("TaskState", taskState.String()), + zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.internalErr)) + indexMeta.State = commonpb.IndexState_Unissued + } else { // TaskStateNormal + log.Info("IndexNode IndexBuildTask saveIndexmeta indexMeta.state to IndexState_Unissued", + zap.String("TaskState", taskState.String()), + zap.Int64("IndexBuildID", indexMeta.IndexBuildID)) + indexMeta.State = commonpb.IndexState_Finished } - msg := fmt.Sprintf("check index meta pre: %v", pre) - it.tr.Record(msg) - return err + + var metaValue []byte + metaValue, err = proto.Marshal(indexMeta) + if err != nil { + errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s", + indexMeta.IndexBuildID, err.Error()) + panic(errMsg) + } + + strMetaValue := string(metaValue) + + fn := func() error { + return it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue) + } + + err = retry.Do(ctx, fn, retry.Attempts(3)) + if err != nil { + panic(err.Error()) + } + return nil } // PreExecute does some checks before building the index, for example, whether the index has been deleted. @@ -233,7 +261,13 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask preExecute...", zap.Int64("buildId", it.req.IndexBuildID)) sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PreExecute") defer sp.Finish() - return it.checkIndexMeta(ctx, true) + indexMeta, _, err := it.loadIndexMeta(ctx) + if err != nil { + // assume that we can loadIndexMeta later... + return nil + } + it.updateTaskState(indexMeta) + return nil } // PostExecute does some checks after building the index, for example, whether the index has been deleted or @@ -242,10 +276,10 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask PostExecute...", zap.Int64("buildId", it.req.IndexBuildID)) sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PostExecute") defer sp.Finish() - return it.checkIndexMeta(ctx, false) + return it.saveIndexMeta(ctx) } -func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error { +func (it *IndexBuildTask) prepareParams(ctx context.Context) error { typeParams := make(map[string]string) for _, kvPair := range it.req.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() @@ -290,7 +324,7 @@ func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error { return nil } -func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID, storage.FieldData, error) { +func (it *IndexBuildTask) loadVector(ctx context.Context) (storage.FieldID, storage.FieldData, error) { getValueByPath := func(path string) ([]byte, error) { data, err := it.kv.Load(path) if err != nil { @@ -369,12 +403,12 @@ func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID, return fieldID, data, nil } -func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob, error) { +func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, error) { var fieldID storage.FieldID { var err error var fieldData storage.FieldData - fieldID, fieldData, err = it.executeStepLoad(ctx) + fieldID, fieldData, err = it.loadVector(ctx) if err != nil { return nil, err } @@ -437,7 +471,7 @@ func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob return serializedIndexBlobs, nil } -func (it *IndexBuildTask) executeSave(ctx context.Context, blobs []*storage.Blob) error { +func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob) error { blobCnt := len(blobs) it.serializedSize = 0 for i := range blobs { @@ -501,7 +535,11 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute") defer sp.Finish() - if err := it.executePrepareParams(ctx); err != nil { + if err := it.prepareParams(ctx); err != nil { + it.SetState(TaskStateFailed) + log.Error("IndexNode IndexBuildTask Execute prepareParams failed", + zap.Int64("buildId", it.req.IndexBuildID), + zap.Error(err)) return err } @@ -510,6 +548,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { var err error it.index, err = NewCIndex(it.newTypeParams, it.newIndexParams) if err != nil { + it.SetState(TaskStateFailed) log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", zap.Int64("buildId", it.req.IndexBuildID), zap.Error(err)) @@ -526,13 +565,18 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { }() var blobs []*storage.Blob - blobs, err = it.executeStepBuild(ctx) + blobs, err = it.buildIndex(ctx) if err != nil { + it.SetState(TaskStateFailed) + log.Error("IndexNode IndexBuildTask Execute buildIndex failed", + zap.Int64("buildId", it.req.IndexBuildID), + zap.Error(err)) return err } - err = it.executeSave(ctx, blobs) + err = it.saveIndex(ctx, blobs) if err != nil { + it.SetState(TaskStateRetry) return err } it.tr.Record("index file save done") diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 4cd5e48e0e..69ce2aaead 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -245,6 +245,13 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { err := t.PreExecute(ctx) t.SetError(err) + if t.GetState() == TaskStateAbandon { + log.Info("IndexNode scheduler abandon task", + zap.String("TaskState", t.GetState().String()), + zap.Int64("taskID", t.ID())) + return + } + defer func() { span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) err := t.PostExecute(ctx) diff --git a/internal/indexnode/task_state.go b/internal/indexnode/task_state.go new file mode 100644 index 0000000000..6613ff8c3d --- /dev/null +++ b/internal/indexnode/task_state.go @@ -0,0 +1,41 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexnode + +type TaskState int32 + +const ( + TaskStateNormal TaskState = 0 + TaskStateAbandon TaskState = 1 + TaskStateRetry TaskState = 2 + TaskStateFailed TaskState = 3 +) + +var TaskStateNames = map[TaskState]string{ + 0: "Normal", + 1: "Abandon", + 2: "Retry", + 3: "Failed", +} + +func (x TaskState) String() string { + ret, ok := TaskStateNames[x] + if !ok { + return "None" + } + return ret +} diff --git a/internal/indexnode/task_state_test.go b/internal/indexnode/task_state_test.go new file mode 100644 index 0000000000..60ba3673a8 --- /dev/null +++ b/internal/indexnode/task_state_test.go @@ -0,0 +1,30 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexnode + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTaskState_String(t *testing.T) { + assert.Equal(t, TaskStateNormal.String(), "Normal") + assert.Equal(t, TaskStateAbandon.String(), "Abandon") + assert.Equal(t, TaskStateRetry.String(), "Retry") + assert.Equal(t, TaskStateFailed.String(), "Failed") +}