diff --git a/internal/indexcoord/flush_segment_watcher.go b/internal/indexcoord/flush_segment_watcher.go index c98dd2dfa9..f84c28003d 100644 --- a/internal/indexcoord/flush_segment_watcher.go +++ b/internal/indexcoord/flush_segment_watcher.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util" - "github.com/milvus-io/milvus/internal/util/logutil" ) type flushedSegmentWatcher struct { @@ -72,7 +71,7 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, kvClient: kv, wg: sync.WaitGroup{}, internalTaskMutex: sync.RWMutex{}, - scheduleDuration: time.Second, + scheduleDuration: time.Second * 3, internalNotify: make(chan struct{}, 1), meta: meta, builder: builder, @@ -87,22 +86,23 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, } func (fsw *flushedSegmentWatcher) reloadFromKV() error { - log.Info("flushSegmentWatcher reloadFromKV") + log.Ctx(fsw.ctx).Info("flushSegmentWatcher reloadFromKV") fsw.internalTasks = make(map[UniqueID]*internalTask) _, values, version, err := fsw.kvClient.LoadWithRevision(util.FlushedSegmentPrefix) if err != nil { - log.Error("flushSegmentWatcher reloadFromKV fail", zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err)) + log.Ctx(fsw.ctx).Error("flushSegmentWatcher reloadFromKV fail", zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err)) return err } for _, value := range values { segID, err := strconv.ParseInt(value, 10, 64) if err != nil { - log.Error("flushSegmentWatcher parse segmentID fail", zap.String("value", value), zap.Error(err)) + log.Ctx(fsw.ctx).Error("flushSegmentWatcher parse segmentID fail", zap.String("value", value), zap.Error(err)) return err } fsw.enqueueInternalTask(segID) } fsw.etcdRevision = version + log.Ctx(fsw.ctx).Info("flushSegmentWatcher reloadFromKV success", zap.Int64("etcdRevision", version)) return nil } @@ -121,7 +121,7 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) { fsw.internalTaskMutex.Lock() defer fsw.internalTaskMutex.Unlock() - logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask", zap.Int64("segmentID", segmentID)) + log.Ctx(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask", zap.Int64("segmentID", segmentID)) if _, ok := fsw.internalTasks[segmentID]; !ok { fsw.internalTasks[segmentID] = &internalTask{ @@ -129,11 +129,12 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) { segmentInfo: nil, } } - logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask success", zap.Int64("segmentID", segmentID)) + + log.Ctx(fsw.ctx).Info("flushedSegmentWatcher already have the task success", zap.Int64("segmentID", segmentID)) } func (fsw *flushedSegmentWatcher) internalScheduler() { - log.Info("IndexCoord flushedSegmentWatcher internalScheduler start...") + log.Ctx(fsw.ctx).Info("IndexCoord flushedSegmentWatcher internalScheduler start...") defer fsw.wg.Done() ticker := time.NewTicker(fsw.scheduleDuration) @@ -142,7 +143,7 @@ func (fsw *flushedSegmentWatcher) internalScheduler() { for { select { case <-fsw.ctx.Done(): - log.Warn("IndexCoord flushedSegmentWatcher context done") + log.Ctx(fsw.ctx).Warn("IndexCoord flushedSegmentWatcher context done") return case <-ticker.C: fsw.internalRun() @@ -156,7 +157,8 @@ func (fsw *flushedSegmentWatcher) internalRun() { fsw.internalTaskMutex.RLock() segmentIDs := make([]UniqueID, 0, len(fsw.internalTasks)) if len(fsw.internalTasks) > 0 { - log.Debug("IndexCoord flushedSegmentWatcher schedule internal tasks", zap.Int("internal task num", len(fsw.internalTasks))) + log.Ctx(fsw.ctx).Debug("IndexCoord flushedSegmentWatcher schedule internal tasks", + zap.Int("internal task num", len(fsw.internalTasks))) for segID := range fsw.internalTasks { segmentIDs = append(segmentIDs, segID) } @@ -188,7 +190,8 @@ func (fsw *flushedSegmentWatcher) Len() int { func (fsw *flushedSegmentWatcher) updateInternalTaskState(segID UniqueID, state indexTaskState) { fsw.internalTaskMutex.Lock() defer fsw.internalTaskMutex.Unlock() - log.Debug("flushedSegmentWatcher updateInternalTaskState", zap.Int64("segID", segID), zap.String("state", state.String())) + log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher updateInternalTaskState", zap.Int64("segID", segID), + zap.String("state", state.String())) if _, ok := fsw.internalTasks[segID]; ok { fsw.internalTasks[segID].state = state } @@ -199,7 +202,7 @@ func (fsw *flushedSegmentWatcher) deleteInternalTask(segID UniqueID) { defer fsw.internalTaskMutex.Unlock() delete(fsw.internalTasks, segID) - log.Debug("flushedSegmentWatcher delete the internal task", zap.Int64("segID", segID)) + log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher delete the internal task", zap.Int64("segID", segID)) } func (fsw *flushedSegmentWatcher) getInternalTask(segID UniqueID) *internalTask { @@ -219,24 +222,24 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg if _, ok := fsw.internalTasks[segID]; ok { fsw.internalTasks[segID].segmentInfo = segInfo } - log.Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID)) + log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID)) } func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { t := fsw.getInternalTask(segID) - log.Debug("IndexCoord flushedSegmentWatcher process internal task", zap.Int64("segID", segID), + log.Ctx(fsw.ctx).RatedDebug(10, "flushedSegmentWatcher process internal task", zap.Int64("segID", segID), zap.String("state", t.state.String())) switch t.state { case indexTaskPrepare: if err := fsw.prepare(segID); err != nil { - log.Error("flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err)) + log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err)) return } fsw.updateInternalTaskState(segID, indexTaskInit) case indexTaskInit: if err := fsw.constructTask(t); err != nil { - log.Error("flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err)) + log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err)) return } fsw.updateInternalTaskState(segID, indexTaskInProgress) @@ -248,24 +251,22 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } case indexTaskDone: if err := fsw.removeFlushedSegment(t); err != nil { - log.Error("IndexCoord flushSegmentWatcher removeFlushedSegment fail", + log.Ctx(fsw.ctx).RatedWarn(10, "IndexCoord flushSegmentWatcher removeFlushedSegment fail", zap.Int64("segID", segID), zap.Error(err)) return } fsw.deleteInternalTask(segID) fsw.internalNotifyFunc() default: - log.Debug("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", segID), + log.Info("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", segID), zap.String("state", t.state.String())) } } func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { - log.Debug("IndexCoord flushedSegmentWatcher construct tasks by segment info", zap.Int64("segID", t.segmentInfo.ID), - zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "") if len(fieldIndexes) == 0 { - log.Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), + log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), zap.Int64("num of rows", t.segmentInfo.NumOfRows), zap.Int("collection indexes num", len(fieldIndexes))) // no need to build index return nil @@ -285,7 +286,7 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { // send to indexBuilder have, buildID, err := fsw.ic.createIndexForSegment(segIdx) if err != nil { - log.Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID), + log.Ctx(fsw.ctx).Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID), zap.Int64("indexID", index.IndexID), zap.Error(err)) return err } @@ -294,7 +295,7 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { } } fsw.handoff.enqueue(t.segmentInfo.ID) - log.Debug("flushedSegmentWatcher construct children task success", zap.Int64("segID", t.segmentInfo.ID), + log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID), zap.Int("tasks num", len(fieldIndexes))) return nil } @@ -303,11 +304,11 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error { deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID) err := fsw.kvClient.RemoveWithPrefix(deletedKeys) if err != nil { - log.Error("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID), + log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID), zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID), zap.Error(err)) return err } - log.Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID), + log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID), zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID)) return nil } diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index 0136bc4aa5..f31bc229bb 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -76,14 +76,14 @@ func (gc *garbageCollector) Stop() { func (gc *garbageCollector) recycleUnusedIndexes() { defer gc.wg.Done() - log.Info("IndexCoord garbageCollector recycleUnusedIndexes start") + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedIndexes start") ticker := time.NewTicker(gc.gcMetaDuration) defer ticker.Stop() for { select { case <-gc.ctx.Done(): - log.Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done") + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done") return case <-ticker.C: deletedIndexes := gc.metaTable.GetDeletedIndexes() @@ -91,7 +91,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() { buildIDs := gc.metaTable.GetBuildIDsFromIndexID(index.IndexID) if len(buildIDs) == 0 { if err := gc.metaTable.RemoveIndex(index.CollectionID, index.IndexID); err != nil { - log.Warn("IndexCoord remove index on collection fail", zap.Int64("collID", index.CollectionID), + log.Ctx(gc.ctx).Warn("IndexCoord remove index on collection fail", zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID), zap.Error(err)) continue } @@ -99,7 +99,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() { for _, buildID := range buildIDs { segIdx, ok := gc.metaTable.GetMeta(buildID) if !ok { - log.Debug("IndexCoord get segment index is not exist", zap.Int64("buildID", buildID)) + log.Ctx(gc.ctx).Debug("IndexCoord get segment index is not exist", zap.Int64("buildID", buildID)) continue } if segIdx.NodeID != 0 { @@ -107,13 +107,15 @@ func (gc *garbageCollector) recycleUnusedIndexes() { continue } if err := gc.metaTable.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID); err != nil { - log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID), + log.Ctx(gc.ctx).Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err)) continue } - log.Info("IndexCoord remove segment index meta success", zap.Int64("buildID", segIdx.BuildID), + log.Ctx(gc.ctx).Info("IndexCoord remove segment index meta success", zap.Int64("buildID", segIdx.BuildID), zap.Int64("nodeID", segIdx.NodeID)) } + log.Ctx(gc.ctx).Info("garbageCollector remove index success", zap.Int64("collID", index.CollectionID), + zap.Int64("indexID", index.IndexID)) } } } @@ -135,12 +137,12 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { PartitionID: -1, }) if err != nil { - log.Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", zap.Int64("collID", collID), zap.Error(err)) return } if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", zap.Int64("collID", collID), + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", zap.Int64("collID", collID), zap.String("fail reason", resp.Status.Reason)) return } @@ -153,7 +155,7 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { continue } if _, ok := flushedSegments[segID]; !ok { - log.Debug("segment is already not exist, mark it deleted", zap.Int64("collID", collID), + log.Ctx(gc.ctx).Debug("segment is already not exist, mark it deleted", zap.Int64("collID", collID), zap.Int64("segID", segID)) if err := gc.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool { return segIndex.SegmentID == segID @@ -171,18 +173,18 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { continue } if err := gc.metaTable.RemoveSegmentIndex(meta.CollectionID, meta.PartitionID, meta.SegmentID, meta.BuildID); err != nil { - log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", meta.BuildID), + log.Ctx(gc.ctx).Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", meta.BuildID), zap.Int64("nodeID", meta.NodeID), zap.Error(err)) continue } - log.Debug("index meta recycle success", zap.Int64("buildID", meta.BuildID)) + log.Ctx(gc.ctx).Debug("index meta recycle success", zap.Int64("buildID", meta.BuildID)) } } } func (gc *garbageCollector) recycleUnusedSegIndexes() { defer gc.wg.Done() - log.Info("IndexCoord garbageCollector recycleUnusedSegIndexes start") + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedSegIndexes start") ticker := time.NewTicker(gc.gcMetaDuration) defer ticker.Stop() @@ -190,7 +192,7 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() { for { select { case <-gc.ctx.Done(): - log.Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done") + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done") return case <-ticker.C: gc.recycleSegIndexesMeta() @@ -201,7 +203,7 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() { // recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta. func (gc *garbageCollector) recycleUnusedIndexFiles() { defer gc.wg.Done() - log.Info("IndexCoord garbageCollector start recycleUnusedIndexFiles loop") + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector start recycleUnusedIndexFiles loop") ticker := time.NewTicker(gc.gcFileDuration) defer ticker.Stop() @@ -215,35 +217,35 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() { // list dir first keys, _, err := gc.chunkManager.ListWithPrefix(prefix, false) if err != nil { - log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err)) + log.Ctx(gc.ctx).Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err)) continue } for _, key := range keys { - log.Debug("indexFiles keys", zap.String("key", key)) + log.Ctx(gc.ctx).Debug("indexFiles keys", zap.String("key", key)) buildID, err := parseBuildIDFromFilePath(key) if err != nil { - log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err)) + log.Ctx(gc.ctx).Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err)) continue } - log.Info("IndexCoord garbageCollector will recycle index files", zap.Int64("buildID", buildID)) + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector will recycle index files", zap.Int64("buildID", buildID)) if !gc.metaTable.HasBuildID(buildID) { // buildID no longer exists in meta, remove all index files - log.Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files", + log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files", zap.Int64("buildID", buildID)) err = gc.chunkManager.RemoveWithPrefix(key) if err != nil { - log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove index files failed", + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove index files failed", zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err)) continue } continue } - log.Info("index meta can be recycled, recycle index files", zap.Int64("buildID", buildID)) + log.Ctx(gc.ctx).Info("index meta can be recycled, recycle index files", zap.Int64("buildID", buildID)) canRecycle, indexFilePaths := gc.metaTable.GetIndexFilePathByBuildID(buildID) if !canRecycle { // Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc, // and delete all index files about the buildID at one time. - log.Warn("IndexCoord garbageCollector can not recycle index files", zap.Int64("buildID", buildID)) + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector can not recycle index files", zap.Int64("buildID", buildID)) continue } filesMap := make(map[string]bool) @@ -252,24 +254,24 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() { } files, _, err := gc.chunkManager.ListWithPrefix(key, true) if err != nil { - log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed", + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed", zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err)) continue } - log.Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)), + log.Ctx(gc.ctx).Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)), zap.Int("chunkManager files num", len(files))) deletedFilesNum := 0 for _, file := range files { if _, ok := filesMap[file]; !ok { if err = gc.chunkManager.Remove(file); err != nil { - log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove file failed", + log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove file failed", zap.Int64("buildID", buildID), zap.String("file", file), zap.Error(err)) continue } deletedFilesNum++ } } - log.Info("index files recycle success", zap.Int64("buildID", buildID), + log.Ctx(gc.ctx).Info("index files recycle success", zap.Int64("buildID", buildID), zap.Int("delete index files num", deletedFilesNum)) } } diff --git a/internal/indexcoord/index_builder.go b/internal/indexcoord/index_builder.go index c28a8baae5..503ad95b8c 100644 --- a/internal/indexcoord/index_builder.go +++ b/internal/indexcoord/index_builder.go @@ -137,14 +137,14 @@ func (ib *indexBuilder) enqueue(buildID UniqueID) { func (ib *indexBuilder) schedule() { // receive notifyChan // time ticker - log.Info("index builder schedule loop start") + log.Ctx(ib.ctx).Info("index builder schedule loop start") defer ib.wg.Done() ticker := time.NewTicker(ib.scheduleDuration) defer ticker.Stop() for { select { case <-ib.ctx.Done(): - log.Warn("index builder ctx done") + log.Ctx(ib.ctx).Warn("index builder ctx done") return case _, ok := <-ib.notifyChan: if ok { @@ -169,14 +169,18 @@ func (ib *indexBuilder) run() { return buildIDs[i] < buildIDs[j] }) if len(buildIDs) > 0 { - log.Info("index builder task schedule", zap.Int("task num", len(buildIDs))) + log.Ctx(ib.ctx).Info("index builder task schedule", zap.Int("task num", len(buildIDs))) } for _, buildID := range buildIDs { - ib.process(buildID) + ok := ib.process(buildID) + if !ok { + log.Ctx(ib.ctx).Debug("there is no IndexNode available or etcd is not serviceable, wait a minute...") + break + } } } -func (ib *indexBuilder) process(buildID UniqueID) { +func (ib *indexBuilder) process(buildID UniqueID) bool { ib.taskMutex.RLock() state := ib.tasks[buildID] ib.taskMutex.RUnlock() @@ -193,23 +197,24 @@ func (ib *indexBuilder) process(buildID UniqueID) { delete(ib.tasks, buildID) } - log.Info("index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String())) + log.Ctx(ib.ctx).RatedDebug(10, "index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String())) meta, exist := ib.meta.GetMeta(buildID) if !exist { - log.Debug("index task has not exist in meta table, remove task", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).RatedDebug(10, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID)) deleteFunc(buildID) - return + return true } switch state { case indexTaskInit: if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { + log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) deleteFunc(buildID) - return + return true } - log.Debug("task state is init, build index ...", zap.Int64("buildID", buildID), - zap.Int64("segID", meta.SegmentID), zap.Int64("num rows", meta.NumRows)) if meta.NumRows < Params.IndexCoordCfg.MinSegmentNumRowsToEnableIndex { + log.Ctx(ib.ctx).Debug("segment num rows is too few, no need to build index", zap.Int64("buildID", buildID), + zap.Int64("segID", meta.SegmentID), zap.Int64("num rows", meta.NumRows)) if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Finished, @@ -217,42 +222,42 @@ func (ib *indexBuilder) process(buildID UniqueID) { SerializedSize: 0, FailReason: "", }); err != nil { - log.Error("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err)) - return + log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err)) + return false } updateStateFunc(buildID, indexTaskDone) - return + return true } // peek client // if all IndexNodes are executing task, wait for one of them to finish the task. nodeID, client := ib.ic.nodeManager.PeekClient(meta) if client == nil { - log.RatedDebug(30, "index builder peek client error, there is no available") - return + log.Ctx(ib.ctx).RatedDebug(10, "index builder peek client error, there is no available") + return false } // update version and set nodeID if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil { - log.Error("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err)) - return + log.Ctx(ib.ctx).RatedWarn(10, "index builder update index version failed", zap.Int64("build", buildID), zap.Error(err)) + return false } // acquire lock if err := ib.ic.tryAcquireSegmentReferLock(ib.ctx, buildID, nodeID, []UniqueID{meta.SegmentID}); err != nil { - log.Error("index builder acquire segment reference lock failed", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).RatedWarn(10, "index builder acquire segment reference lock failed", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) updateStateFunc(buildID, indexTaskRetry) - return + return false } info, err := ib.ic.pullSegmentInfo(ib.ctx, meta.SegmentID) if err != nil { - log.Error("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), + log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), zap.Int64("buildID", buildID), zap.Error(err)) if errors.Is(err, ErrSegmentNotFound) { updateStateFunc(buildID, indexTaskDeleted) - return + return true } updateStateFunc(buildID, indexTaskRetry) - return + return false } binLogs := make([]string, 0) fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) @@ -298,89 +303,91 @@ func (ib *indexBuilder) process(buildID UniqueID) { TypeParams: typeParams, NumRows: meta.NumRows, } - log.Debug("assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) + log.Ctx(ib.ctx).RatedDebug(10, "assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) if err := ib.ic.assignTask(client, req); err != nil { // need to release lock then reassign, so set task state to retry - log.Error("index builder assign task to IndexNode failed", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).RatedWarn(10, "index builder assign task to IndexNode failed", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) updateStateFunc(buildID, indexTaskRetry) - return + return false } // update index meta state to InProgress if err := ib.meta.BuildIndex(buildID); err != nil { // need to release lock then reassign, so set task state to retry - log.Error("index builder update index meta to InProgress failed", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).RatedWarn(10, "index builder update index meta to InProgress failed", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) updateStateFunc(buildID, indexTaskRetry) - return + return false } - log.Debug("index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) + log.Ctx(ib.ctx).RatedDebug(10, "index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) updateStateFunc(buildID, indexTaskInProgress) case indexTaskDone: - log.Debug("index task has done", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).RatedDebug(10, "index task has done", zap.Int64("buildID", buildID)) if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { + log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) updateStateFunc(buildID, indexTaskDeleted) - return + return true } if !ib.dropIndexTask(buildID, meta.NodeID) { - return + return true } if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Error("index builder try to release reference lock failed", zap.Error(err)) - return + log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) + return false } deleteFunc(buildID) case indexTaskRetry: - log.Debug("index task state is retry, try to release reference lock", zap.Int64("buildID", buildID)) if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { + log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) updateStateFunc(buildID, indexTaskDeleted) - return + return true } if err := ib.releaseLockAndResetTask(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Error("index builder try to release reference lock failed", zap.Error(err)) - return + log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) + return false } - updateStateFunc(buildID, indexTaskInit) case indexTaskDeleted: - log.Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID)) // TODO: delete after QueryCoordV2 if err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]int64{buildID}); err != nil { - return + return false } if meta.NodeID != 0 { if !ib.dropIndexTask(buildID, meta.NodeID) { log.Ctx(ib.ctx).Warn("index task state is deleted and drop index job for node fail", zap.Int64("build", buildID), zap.Int64("nodeID", meta.NodeID)) - return + return true } if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Error("index builder try to release reference lock failed", zap.Error(err)) - return + log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) + return false } } // reset nodeID success, remove task. deleteFunc(buildID) default: - log.Debug("index task is in progress", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Debug("index task is in progress", zap.Int64("buildID", buildID), zap.String("state", meta.IndexState.String())) if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { + log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) updateStateFunc(buildID, indexTaskDeleted) - return + return true } updateStateFunc(buildID, ib.getTaskState(buildID, meta.NodeID)) } + return true } func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { - log.Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) + log.Ctx(ib.ctx).Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) client, exist := ib.ic.nodeManager.GetClientByID(nodeID) if exist { response, err := client.QueryJobs(ib.ctx, &indexpb.QueryJobsRequest{ @@ -388,12 +395,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { BuildIDs: []int64{buildID}, }) if err != nil { - log.Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), + log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), zap.Error(err)) return indexTaskInProgress } if response.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), + log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason)) return indexTaskInProgress } @@ -402,22 +409,22 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { for _, info := range response.IndexInfos { if info.BuildID == buildID { if info.State == commonpb.IndexState_Failed || info.State == commonpb.IndexState_Finished { - log.Info("this task has been finished", zap.Int64("buildID", info.BuildID), + log.Ctx(ib.ctx).Info("this task has been finished", zap.Int64("buildID", info.BuildID), zap.String("index state", info.State.String())) if err := ib.meta.FinishTask(info); err != nil { - log.Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID), + log.Ctx(ib.ctx).Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID), zap.String("index state", info.State.String()), zap.Error(err)) return indexTaskInProgress } return indexTaskDone } else if info.State == commonpb.IndexState_Retry || info.State == commonpb.IndexState_IndexStateNone { - log.Info("this task should be retry", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).Info("this task should be retry", zap.Int64("buildID", buildID), zap.String("fail reason", info.FailReason)) return indexTaskRetry } return indexTaskInProgress } } - log.Info("this task should be retry, indexNode does not have this task", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Info("this task should be retry, indexNode does not have this task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) return indexTaskRetry } @@ -426,7 +433,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { } func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool { - log.Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) + log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) client, exist := ib.ic.nodeManager.GetClientByID(nodeID) if exist { status, err := client.DropJobs(ib.ctx, &indexpb.DropJobsRequest{ @@ -434,12 +441,12 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool { BuildIDs: []UniqueID{buildID}, }) if err != nil { - log.Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) return false } if status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.String("fail reason", status.Reason)) return false } @@ -449,37 +456,37 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool { } func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueID) error { - log.Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Error("index builder try to release reference lock failed", zap.Error(err)) + log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err)) return err } if err := ib.meta.ResetNodeID(buildID); err != nil { - log.Error("index builder try to reset nodeID failed", zap.Error(err)) + log.Ctx(ib.ctx).Error("index builder try to reset nodeID failed", zap.Error(err)) return err } - log.Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) return nil } func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueID) error { - log.Info("release segment reference lock and reset task", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Info("release segment reference lock and reset task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) if nodeID != 0 { if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Error("index builder try to release reference lock failed", zap.Error(err)) + log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err)) return err } } if err := ib.meta.ResetMeta(buildID); err != nil { - log.Error("index builder try to reset task failed", zap.Error(err)) + log.Ctx(ib.ctx).Error("index builder try to reset task failed", zap.Error(err)) return err } - log.Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) return nil } diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index d5cfe7806f..01af51b9bf 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -104,7 +104,6 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { // PeekClient peeks the client with the least load. func (nm *NodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, types.IndexNode) { - log.Info("IndexCoord peek client") allClients := nm.GetAllClients() if len(allClients) == 0 { log.Error("there is no IndexNode online") @@ -155,7 +154,7 @@ func (nm *NodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, types.Ind return peekNodeID, allClients[peekNodeID] } - log.Warn("IndexCoord peek client fail") + log.RatedDebug(30, "IndexCoord peek client fail") return 0, nil } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index c882cbc7da..8edcb255d2 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -5,8 +5,6 @@ import ( "fmt" "strconv" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -16,7 +14,6 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" @@ -25,13 +22,13 @@ import ( func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) { stateCode := i.stateCode.Load().(internalpb.StateCode) if stateCode != internalpb.StateCode_Healthy { - log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID)) + log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "state code is not healthy", }, nil } - log.Info("IndexNode building index ...", + log.Ctx(ctx).Info("IndexNode building index ...", zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID), zap.Int64("IndexID", req.IndexID), @@ -46,13 +43,12 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest sp.SetTag("IndexBuildID", strconv.FormatInt(req.BuildID, 10)) sp.SetTag("ClusterID", req.ClusterID) metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10), metrics.TotalLabel).Inc() - taskCtx := logutil.WithModule(i.loopCtx, typeutil.IndexNodeRole) - taskCtx, taskCancel := context.WithCancel(taskCtx) + taskCtx, taskCancel := context.WithCancel(i.loopCtx) if oldInfo := i.loadOrStoreTask(req.ClusterID, req.BuildID, &taskInfo{ cancel: taskCancel, state: commonpb.IndexState_InProgress}); oldInfo != nil { - log.Warn("duplicated index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("BuildID", req.BuildID)) + log.Ctx(ctx).Warn("duplicated index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("BuildID", req.BuildID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_BuildIndexError, Reason: "duplicated index build task", @@ -60,7 +56,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest } cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.StorageConfig) if err != nil { - log.Error("create chunk manager failed", zap.String("Bucket", req.StorageConfig.BucketName), + log.Ctx(ctx).Error("create chunk manager failed", zap.String("Bucket", req.StorageConfig.BucketName), zap.String("AccessKey", req.StorageConfig.AccessKeyID), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID)) return &commonpb.Status{ @@ -86,20 +82,20 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest Reason: "", } if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil { - log.Warn("IndexNode failed to schedule", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.Error(err)) + log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.Error(err)) ret.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Reason = err.Error() metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } - log.Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.String("indexName", req.IndexName)) + log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.String("indexName", req.IndexName)) return ret, nil } func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { stateCode := i.stateCode.Load().(internalpb.StateCode) if stateCode != internalpb.StateCode_Healthy { - log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID)) + log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID)) return &indexpb.QueryJobsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -107,7 +103,6 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest }, }, nil } - log.Debug("querying index build task", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) infos := make(map[UniqueID]*taskInfo) i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) { if ClusterID == req.ClusterID { @@ -139,16 +134,19 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest ret.IndexInfos[i].IndexFiles = info.indexFiles ret.IndexInfos[i].SerializedSize = info.serializedSize ret.IndexInfos[i].FailReason = info.failReason + log.Ctx(ctx).Debug("querying index build task", zap.String("ClusterID", req.ClusterID), + zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()), + zap.String("fail reason", info.failReason)) } } return ret, nil } func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) { - log.Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) + log.Ctx(ctx).Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) stateCode := i.stateCode.Load().(internalpb.StateCode) if stateCode != internalpb.StateCode_Healthy { - log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID)) + log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "state code is not healthy", @@ -164,6 +162,8 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) info.cancel() } } + log.Ctx(ctx).Debug("drop index build jobs success", zap.String("ClusterID", req.ClusterID), + zap.Int64s("IndexBuildIDs", req.BuildIDs)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -173,7 +173,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { stateCode := i.stateCode.Load().(internalpb.StateCode) if stateCode != internalpb.StateCode_Healthy { - log.Warn("index node not ready", zap.Int32("state", int32(stateCode))) + log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode))) return &indexpb.GetJobStatsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -192,7 +192,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq if i.sched.buildParallel > unissued+active { slots = i.sched.buildParallel - unissued - active } - logutil.Logger(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots)) + log.Ctx(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots)) return &indexpb.GetJobStatsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -210,7 +210,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq // TODO(dragondriver): cache the Metrics and set a retention to the cache func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !i.isHealthy() { - log.Warn("IndexNode.GetMetrics failed", + log.Ctx(ctx).Warn("IndexNode.GetMetrics failed", zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.Error(errIndexNodeIsUnhealthy(Params.IndexNodeCfg.GetNodeID()))) @@ -226,7 +226,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ metricType, err := metricsinfo.ParseMetricType(req.Request) if err != nil { - log.Warn("IndexNode.GetMetrics failed to parse metric type", + log.Ctx(ctx).Warn("IndexNode.GetMetrics failed to parse metric type", zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.Error(err)) @@ -243,7 +243,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ if metricType == metricsinfo.SystemInfoMetrics { metrics, err := getSystemInfoMetrics(ctx, req, i) - log.Debug("IndexNode.GetMetrics", + log.Ctx(ctx).Debug("IndexNode.GetMetrics", zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.String("metric_type", metricType), @@ -252,7 +252,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ return metrics, nil } - log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet", + log.Ctx(ctx).Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet", zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.String("metric_type", metricType)) diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 90cf16dcee..3486ccfc98 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/schemapb" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" @@ -136,7 +137,7 @@ func (it *indexBuildTask) GetState() commonpb.IndexState { func (it *indexBuildTask) OnEnqueue(ctx context.Context) error { it.statistic.StartTime = time.Now().UnixMicro() it.statistic.PodID = it.node.GetNodeID() - logutil.Logger(ctx).Debug("IndexNode IndexBuilderTask Enqueue") + log.Ctx(ctx).Debug("IndexNode IndexBuilderTask Enqueue") return nil } @@ -193,7 +194,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { var err error it.statistic.Dim, err = strconv.ParseInt(dimStr, 10, 64) if err != nil { - logutil.Logger(ctx).Error("parse dimesion failed", zap.Error(err)) + log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err)) // ignore error } } @@ -207,7 +208,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { // var err error // it.cm, err = factory.NewVectorStorageChunkManager(ctx) // if err != nil { - // logutil.Logger(ctx).Error("init chunk manager failed", zap.Error(err), zap.String("BucketName", it.req.BucketName), zap.String("StorageAccessKey", it.req.StorageAccessKey)) + // log.Ctx(ctx).Error("init chunk manager failed", zap.Error(err), zap.String("BucketName", it.req.BucketName), zap.String("StorageAccessKey", it.req.StorageAccessKey)) // return err // } return nil @@ -253,11 +254,11 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error { // gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey") if err != nil { - logutil.Logger(it.ctx).Warn("loadKey failed", zap.Error(err)) + log.Ctx(ctx).Warn("loadKey failed", zap.Error(err)) return err } loadVectorDuration := it.tr.RecordSpan().Milliseconds() - logutil.Logger(ctx).Debug("indexnode load data success") + log.Ctx(ctx).Debug("indexnode load data success") it.tr.Record("load field data done") metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration)) @@ -283,13 +284,13 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { if dType != schemapb.DataType_None { it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams) if err != nil { - logutil.Logger(ctx).Error("failed to create index", zap.Error(err)) + log.Ctx(ctx).Error("failed to create index", zap.Error(err)) return err } err = it.index.Build(dataset) if err != nil { - logutil.Logger(ctx).Error("failed to build index", zap.Error(err)) + log.Ctx(ctx).Error("failed to build index", zap.Error(err)) return err } } @@ -298,7 +299,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { it.tr.Record("build index done") indexBlobs, err := it.index.Serialize() if err != nil { - logutil.Logger(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) + log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) return err } it.tr.Record("index serialize done") @@ -311,7 +312,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { // early release index for gc, and we can ensure that Delete is idempotent. if err := it.index.Delete(); err != nil { - logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) + log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) } var serializedIndexBlobs []*storage.Blob @@ -341,7 +342,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { // check index node support disk index if !Params.IndexNodeCfg.EnableDisk { - logutil.Logger(ctx).Error("IndexNode don't support build disk index", + log.Ctx(ctx).Error("IndexNode don't support build disk index", zap.String("index type", it.newIndexParams["index_type"]), zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk)) return errors.New("index node don't support build disk index") @@ -350,7 +351,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { // check load size and size of field data localUsedSize, err := indexcgowrapper.GetLocalUsedSize() if err != nil { - logutil.Logger(ctx).Error("IndexNode get local used size failed") + log.Ctx(ctx).Error("IndexNode get local used size failed") return errors.New("index node get local used size failed") } @@ -358,7 +359,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { maxUsedLocalSize := int64(float64(Params.IndexNodeCfg.DiskCapacityLimit) * Params.IndexNodeCfg.MaxDiskUsagePercentage) if usedLocalSizeWhenBuild > maxUsedLocalSize { - logutil.Logger(ctx).Error("IndexNode don't has enough disk size to build disk ann index", + log.Ctx(ctx).Error("IndexNode don't has enough disk size to build disk ann index", zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild), zap.Int64("maxUsedLocalSize", maxUsedLocalSize)) return errors.New("index node don't has enough disk size to build disk ann index") @@ -378,18 +379,18 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams) if err != nil { - logutil.Logger(ctx).Error("failed to create index", zap.Error(err)) + log.Ctx(ctx).Error("failed to create index", zap.Error(err)) return err } err = it.index.Build(dataset) if err != nil { if it.index.CleanLocalData() != nil { - logutil.Logger(ctx).Error("failed to clean cached data on disk after build index failed", + log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", zap.Int64("buildID", it.BuildID), zap.Int64("index version", it.req.GetIndexVersion())) } - logutil.Logger(ctx).Error("failed to build index", zap.Error(err)) + log.Ctx(ctx).Error("failed to build index", zap.Error(err)) return err } } @@ -400,7 +401,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { indexBlobs, err := it.index.SerializeDiskIndex() if err != nil { - logutil.Logger(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) + log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err)) return err } it.tr.Record("index serialize done") @@ -413,7 +414,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { // early release index for gc, and we can ensure that Delete is idempotent. if err := it.index.Delete(); err != nil { - logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) + log.Ctx(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) } encodeIndexFileDur := it.tr.Record("index codec serialize done") @@ -443,7 +444,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { return it.cm.Write(savePath, blob.Value) } if err := retry.Do(ctx, saveFn, retry.Attempts(5)); err != nil { - logutil.Logger(ctx).Warn("index node save index file failed", zap.Error(err), zap.String("savePath", savePath)) + log.Ctx(ctx).Warn("index node save index file failed", zap.Error(err), zap.String("savePath", savePath)) return err } savePaths[idx] = savePath @@ -452,17 +453,17 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { // If an error occurs, return the error that the task state will be set to retry. if err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile"); err != nil { - logutil.Logger(it.ctx).Error("saveIndexFile fail") + log.Ctx(ctx).Error("saveIndexFile fail") return err } it.savePaths = savePaths it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, savePaths, it.serializedSize, &it.statistic) - logutil.Logger(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) + log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) saveIndexFileDur := it.tr.Record("index file save done") metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") - logutil.Logger(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), + log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID)) return nil } @@ -501,7 +502,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error { return it.cm.Write(indexParamPath, indexParamBlob.Value) } if err := retry.Do(ctx, saveFn, retry.Attempts(5)); err != nil { - logutil.Logger(ctx).Warn("index node save index param file failed", zap.Error(err), zap.String("savePath", indexParamPath)) + log.Ctx(ctx).Warn("index node save index param file failed", zap.Error(err), zap.String("savePath", indexParamPath)) return err } @@ -510,11 +511,11 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error { it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, savePaths, it.serializedSize, &it.statistic) - logutil.Logger(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) + log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) saveIndexFileDur := it.tr.Record("index file save done") metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") - logutil.Logger(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), + log.Ctx(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID)) return nil } @@ -535,7 +536,7 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob it.partitionID = partitionID it.segmentID = segmentID - logutil.Logger(ctx).Debug("indexnode deserialize data success", + log.Ctx(ctx).Debug("indexnode deserialize data success", zap.Int64("index id", it.req.IndexID), zap.String("index name", it.req.IndexName), zap.Int64("collectionID", it.collectionID), @@ -557,45 +558,3 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob it.fieldData = data return nil } - -// Execute actually performs the task of building an index. -//func (it *indexBuildTask) Execute(ctx context.Context) error { -// logutil.Logger(it.ctx).Debug("IndexNode indexBuildTask Execute ...") -// sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute") -// defer sp.Finish() -// select { -// case <-ctx.Done(): -// logutil.Logger(it.ctx).Warn("build task was canceled") -// return errCancel -// default: -// if err := it.prepareParams(ctx); err != nil { -// it.SetState(commonpb.IndexState_Failed) -// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute prepareParams failed", zap.Error(err)) -// return err -// } -// defer it.releaseMemory() -// blobs, err := it.buildIndex(ctx) -// if err != nil { -// if errors.Is(err, ErrNoSuchKey) { -// it.SetState(commonpb.IndexState_Failed) -// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute buildIndex failed", zap.Error(err)) -// return err -// } -// it.SetState(commonpb.IndexState_Unissued) -// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute buildIndex failed, need to retry", zap.Error(err)) -// return err -// } -// if err = it.saveIndex(ctx, blobs); err != nil { -// logutil.Logger(it.ctx).Warn("save index file failed", zap.Error(err)) -// it.SetState(commonpb.IndexState_Unissued) -// return err -// } -// it.SetState(commonpb.IndexState_Finished) -// saveIndexFileDur := it.tr.Record("index file save done") -// metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) -// it.tr.Elapse("index building all done") -// logutil.Logger(it.ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), -// zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID)) -// return nil -// } -//} diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 57ed39fd9a..429adfb50a 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/logutil" ) // TaskQueue is a queue used to store tasks. @@ -217,7 +216,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { for _, fn := range pipelines { if err := wrap(fn); err != nil { if err == errCancel { - logutil.Logger(t.Ctx()).Warn("index build task canceled", zap.String("task", t.Name())) + log.Ctx(t.Ctx()).Warn("index build task canceled", zap.String("task", t.Name())) t.SetState(commonpb.IndexState_Failed, err.Error()) } else if errors.Is(err, ErrNoSuchKey) { t.SetState(commonpb.IndexState_Failed, err.Error()) diff --git a/internal/indexnode/taskinfo_ops.go b/internal/indexnode/taskinfo_ops.go index 747b3c9018..2be24af71d 100644 --- a/internal/indexnode/taskinfo_ops.go +++ b/internal/indexnode/taskinfo_ops.go @@ -2,10 +2,10 @@ package indexnode import ( "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" "github.com/milvus-io/milvus/api/commonpb" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/indexpb" )