diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 051d9cbb01..b11ccb36b2 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -112,7 +112,7 @@ func (c *SessionManager) Flush(ctx context.Context, nodeID int64, req *datapb.Fl func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { cli, err := c.getClient(ctx, nodeID) if err != nil { - log.Warn("failed to get client", zap.Int64("nodeID", nodeID), zap.Error(err)) + log.Warn("failed to get dataNode client", zap.Int64("dataNode ID", nodeID), zap.Error(err)) return } ctx, cancel := context.WithTimeout(ctx, flushTimeout) @@ -120,11 +120,10 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap resp, err := cli.FlushSegments(ctx, req) if err := VerifyResponse(resp, err); err != nil { - log.Warn("failed to flush", zap.Int64("node", nodeID), zap.Error(err)) - return + log.Error("flush call (perhaps partially) failed", zap.Int64("dataNode ID", nodeID), zap.Error(err)) + } else { + log.Info("flush call succeeded", zap.Int64("dataNode ID", nodeID)) } - - log.Info("success to flush", zap.Int64("node", nodeID), zap.Any("segments", req)) } // Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` asynchronously. diff --git a/internal/datanode/cache.go b/internal/datanode/cache.go index 4bfd148f1c..26ea020939 100644 --- a/internal/datanode/cache.go +++ b/internal/datanode/cache.go @@ -48,6 +48,13 @@ func (c *Cache) Cache(ID UniqueID) { c.cacheMap.Store(ID, struct{}{}) } +// checkOrCache returns true if `key` is present. +// Otherwise, it returns false and stores `key` into cache. +func (c *Cache) checkOrCache(key UniqueID) bool { + _, exist := c.cacheMap.LoadOrStore(key, struct{}{}) + return exist +} + // Remove removes a set of IDs from the cache func (c *Cache) Remove(IDs ...UniqueID) { for _, id := range IDs { diff --git a/internal/datanode/cache_test.go b/internal/datanode/cache_test.go index 9ef3aaeda3..01776fee48 100644 --- a/internal/datanode/cache_test.go +++ b/internal/datanode/cache_test.go @@ -30,6 +30,10 @@ func TestSegmentCache(t *testing.T) { segCache.Cache(UniqueID(0)) assert.True(t, segCache.checkIfCached(0)) + assert.False(t, segCache.checkOrCache(UniqueID(1))) + assert.True(t, segCache.checkIfCached(1)) + assert.True(t, segCache.checkOrCache(UniqueID(1))) + segCache.Remove(UniqueID(0)) assert.False(t, segCache.checkIfCached(0)) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 193dd6315f..e9586c0be2 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -537,7 +537,7 @@ func (node *DataNode) ReadyToFlush() error { return nil } -// FlushSegments packs flush messages into flowgraph through flushChan. +// FlushSegments packs flush messages into flowGraph through flushChan. // If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. // So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. // @@ -547,48 +547,54 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen fmt.Sprint(Params.DataNodeCfg.GetNodeID()), MetricRequestsTotal).Inc() - status := &commonpb.Status{ + errStatus := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, } if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { - status.Reason = "DataNode not in HEALTHY state" - return status, nil + errStatus.Reason = "dataNode not in HEALTHY state" + return errStatus, nil } - log.Info("Receive FlushSegments req", - zap.Int64("collectionID", req.GetCollectionID()), + log.Info("receiving FlushSegments request", + zap.Int64("collection ID", req.GetCollectionID()), zap.Int64s("segments", req.GetSegmentIDs()), zap.Int64s("stale segments", req.GetMarkSegmentIDs()), ) - processSegments := func(segmentIDs []UniqueID, flushed bool) bool { + // TODO: Here and in other places, replace `flushed` param with a more meaningful name. + processSegments := func(segmentIDs []UniqueID, flushed bool) ([]UniqueID, bool) { noErr := true - for _, id := range segmentIDs { - if node.segmentCache.checkIfCached(id) { - // Segment in flushing, ignore - log.Info("segment flushing, ignore the flush request until flush is done.", - zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id)) - status.Reason = "segment is flushing, nothing is done" - noErr = false + var flushedSeg []UniqueID + for _, segID := range segmentIDs { + // if the segment in already being flushed, skip it. + if node.segmentCache.checkIfCached(segID) { + logDupFlush(req.GetCollectionID(), segID) continue } - - node.segmentCache.Cache(id) - - flushCh, err := node.flowgraphManager.getFlushCh(id) + // Get the flush channel for the given segment ID. + // If no flush channel is found, report an error. + flushCh, err := node.flowgraphManager.getFlushCh(segID) if err != nil { - status.Reason = "no flush channel found for v-channel" - log.Error("no flush channel found for v-channel", zap.Error(err)) + errStatus.Reason = "no flush channel found for the segment, unable to flush" + log.Error(errStatus.Reason, zap.Int64("segment ID", segID), zap.Error(err)) noErr = false continue } + // Cache the segment and send it to its flush channel. + flushedSeg = append(flushedSeg, segID) + // Double check that the segment is still not cached. + exist := node.segmentCache.checkOrCache(segID) + if exist { + logDupFlush(req.GetCollectionID(), segID) + continue + } flushCh <- flushMsg{ - msgID: req.Base.MsgID, - timestamp: req.Base.Timestamp, - segmentID: id, - collectionID: req.CollectionID, + msgID: req.GetBase().GetMsgID(), + timestamp: req.GetBase().GetTimestamp(), + segmentID: segID, + collectionID: req.GetCollectionID(), flushed: flushed, } } @@ -597,24 +603,28 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen zap.Int64("collection ID", req.GetCollectionID()), zap.Int64s("segments", segmentIDs), zap.Int64s("mark segments", req.GetMarkSegmentIDs())) - return noErr + return flushedSeg, noErr } - ok := processSegments(req.GetSegmentIDs(), true) - if !ok { - return status, nil + seg, noErr1 := processSegments(req.GetSegmentIDs(), true) + staleSeg, noErr2 := processSegments(req.GetMarkSegmentIDs(), false) + // Log success flushed segments. + if len(seg)+len(staleSeg) > 0 { + log.Info("sending segments to flush channel", + zap.Any("newly sealed segment IDs", seg), + zap.Any("stale segment IDs", staleSeg)) } - ok = processSegments(req.GetMarkSegmentIDs(), false) - if !ok { - return status, nil + // Fail FlushSegments call if at least one segment (no matter stale or not) fails to get flushed. + if !noErr1 || !noErr2 { + return errStatus, nil } - status.ErrorCode = commonpb.ErrorCode_Success metrics.DataNodeFlushReqCounter.WithLabelValues( fmt.Sprint(Params.DataNodeCfg.GetNodeID()), MetricRequestsSuccess).Inc() - - return status, nil + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil } // Stop will release DataNode resources and shutdown datanode @@ -1098,3 +1108,9 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root return nil } } + +func logDupFlush(cID, segID int64) { + log.Info("segment is already being flushed, ignoring flush request", + zap.Int64("collection ID", cID), + zap.Int64("segment ID", segID)) +} diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 9a2e003355..17aa5f2926 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -236,7 +236,7 @@ func TestDataNode(t *testing.T) { // dup call status, err := node1.FlushSegments(node1.ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) // failure call req = &datapb.FlushSegmentsRequest{