From 31ddff205665807c94e31e2086b2c2431678e4de Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Fri, 6 May 2022 21:35:51 +0800 Subject: [PATCH] Some minor fixes and improvements (#16814) /kind improvement Signed-off-by: Yuchen Gao --- internal/datanode/data_node.go | 6 ++++-- internal/rootcoord/root_coord.go | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index e9586c0be2..50924bb040 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -582,14 +582,16 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen continue } - // Cache the segment and send it to its flush channel. - flushedSeg = append(flushedSeg, segID) // Double check that the segment is still not cached. + // Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed. exist := node.segmentCache.checkOrCache(segID) if exist { logDupFlush(req.GetCollectionID(), segID) continue } + // flushedSeg is only for logging purpose. + flushedSeg = append(flushedSeg, segID) + // Send the segment to its flush channel. flushCh <- flushMsg{ msgID: req.GetBase().GetMsgID(), timestamp: req.GetBase().GetTimestamp(), diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 278bea1b7c..84e21a959a 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -451,7 +451,10 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t segID2PartID[s] = partID } } else { - log.Debug("failed to get flushed segments from data coord", zap.Int64("collection_id", collID), zap.Int64("partition_id", partID), zap.Error(err)) + log.Error("failed to get flushed segments from dataCoord", + zap.Int64("collection ID", collID), + zap.Int64("partition ID", partID), + zap.Error(err)) return nil, err } } @@ -2514,16 +2517,16 @@ func (c *Core) checkSegmentLoadedLoop(ctx context.Context, taskID int64, colID i log.Info("(in check segment loaded loop) context done, exiting checkSegmentLoadedLoop") return case <-ticker.C: - log.Info("(in check segment loaded loop) check segments' loading states", - zap.Int64("task ID", taskID)) resp, err := c.CallGetSegmentInfoService(ctx, colID, segIDs) if err != nil { - log.Warn("failed to call get segment info on queryCoord", + log.Warn("(in check segment loaded loop) failed to call get segment info on queryCoord", + zap.Int64("task ID", taskID), zap.Int64("collection ID", colID), zap.Int64s("segment IDs", segIDs)) } else if len(resp.GetInfos()) == len(segIDs) { // Check if all segment info are loaded in queryNodes. - log.Info("all import data segments loaded in queryNodes", + log.Info("(in check segment loaded loop) all import data segments loaded in queryNodes", + zap.Int64("task ID", taskID), zap.Int64("collection ID", colID), zap.Int64s("segment IDs", segIDs)) c.importManager.updateTaskStateCode(taskID, commonpb.ImportState_DataQueryable) @@ -2552,10 +2555,9 @@ func (c *Core) checkCompleteIndexLoop(ctx context.Context, taskID int64, colID i log.Info("(in check complete index loop) context done, exiting checkCompleteIndexLoop") return case <-ticker.C: - log.Info("(in check complete index loop) check segments' index states", - zap.Int64("task ID", taskID)) if ct, err := c.CountCompleteIndex(ctx, colName, colID, segIDs); err == nil && ct == len(segIDs) { - log.Info("all segment indices are ready!") + log.Info("(in check complete index loop) all segment indices are ready!", + zap.Int64("task ID", taskID)) c.importManager.updateTaskStateCode(taskID, commonpb.ImportState_DataIndexed) return }