Make duplicate flush recalls success and other logic update (#16755)

Latest logic:
1) Duplicate flush calls on same segments will not result in errors (same as the original design)
2) `FlushSegments` now still flushes stale segments even if non-stale segments failed to get flushed

issue: #16749

/kind enhancement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
Ten Thousand Leaves 2022-05-06 17:49:51 +08:00 committed by GitHub
parent 98ceb162aa
commit a8e1c8fa9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 40 deletions

View File

@ -112,7 +112,7 @@ func (c *SessionManager) Flush(ctx context.Context, nodeID int64, req *datapb.Fl
func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) {
cli, err := c.getClient(ctx, nodeID) cli, err := c.getClient(ctx, nodeID)
if err != nil { if err != nil {
log.Warn("failed to get client", zap.Int64("nodeID", nodeID), zap.Error(err)) log.Warn("failed to get dataNode client", zap.Int64("dataNode ID", nodeID), zap.Error(err))
return return
} }
ctx, cancel := context.WithTimeout(ctx, flushTimeout) ctx, cancel := context.WithTimeout(ctx, flushTimeout)
@ -120,11 +120,10 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap
resp, err := cli.FlushSegments(ctx, req) resp, err := cli.FlushSegments(ctx, req)
if err := VerifyResponse(resp, err); err != nil { if err := VerifyResponse(resp, err); err != nil {
log.Warn("failed to flush", zap.Int64("node", nodeID), zap.Error(err)) log.Error("flush call (perhaps partially) failed", zap.Int64("dataNode ID", nodeID), zap.Error(err))
return } else {
log.Info("flush call succeeded", zap.Int64("dataNode ID", nodeID))
} }
log.Info("success to flush", zap.Int64("node", nodeID), zap.Any("segments", req))
} }
// Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` asynchronously. // Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` asynchronously.

View File

@ -48,6 +48,13 @@ func (c *Cache) Cache(ID UniqueID) {
c.cacheMap.Store(ID, struct{}{}) c.cacheMap.Store(ID, struct{}{})
} }
// checkOrCache returns true if `key` is present.
// Otherwise, it returns false and stores `key` into cache.
func (c *Cache) checkOrCache(key UniqueID) bool {
_, exist := c.cacheMap.LoadOrStore(key, struct{}{})
return exist
}
// Remove removes a set of IDs from the cache // Remove removes a set of IDs from the cache
func (c *Cache) Remove(IDs ...UniqueID) { func (c *Cache) Remove(IDs ...UniqueID) {
for _, id := range IDs { for _, id := range IDs {

View File

@ -30,6 +30,10 @@ func TestSegmentCache(t *testing.T) {
segCache.Cache(UniqueID(0)) segCache.Cache(UniqueID(0))
assert.True(t, segCache.checkIfCached(0)) assert.True(t, segCache.checkIfCached(0))
assert.False(t, segCache.checkOrCache(UniqueID(1)))
assert.True(t, segCache.checkIfCached(1))
assert.True(t, segCache.checkOrCache(UniqueID(1)))
segCache.Remove(UniqueID(0)) segCache.Remove(UniqueID(0))
assert.False(t, segCache.checkIfCached(0)) assert.False(t, segCache.checkIfCached(0))
} }

View File

@ -537,7 +537,7 @@ func (node *DataNode) ReadyToFlush() error {
return nil return nil
} }
// FlushSegments packs flush messages into flowgraph through flushChan. // FlushSegments packs flush messages into flowGraph through flushChan.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. // If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. // So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
// //
@ -547,48 +547,54 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
fmt.Sprint(Params.DataNodeCfg.GetNodeID()), fmt.Sprint(Params.DataNodeCfg.GetNodeID()),
MetricRequestsTotal).Inc() MetricRequestsTotal).Inc()
status := &commonpb.Status{ errStatus := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, ErrorCode: commonpb.ErrorCode_UnexpectedError,
} }
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
status.Reason = "DataNode not in HEALTHY state" errStatus.Reason = "dataNode not in HEALTHY state"
return status, nil return errStatus, nil
} }
log.Info("Receive FlushSegments req", log.Info("receiving FlushSegments request",
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collection ID", req.GetCollectionID()),
zap.Int64s("segments", req.GetSegmentIDs()), zap.Int64s("segments", req.GetSegmentIDs()),
zap.Int64s("stale segments", req.GetMarkSegmentIDs()), zap.Int64s("stale segments", req.GetMarkSegmentIDs()),
) )
processSegments := func(segmentIDs []UniqueID, flushed bool) bool { // TODO: Here and in other places, replace `flushed` param with a more meaningful name.
processSegments := func(segmentIDs []UniqueID, flushed bool) ([]UniqueID, bool) {
noErr := true noErr := true
for _, id := range segmentIDs { var flushedSeg []UniqueID
if node.segmentCache.checkIfCached(id) { for _, segID := range segmentIDs {
// Segment in flushing, ignore // if the segment in already being flushed, skip it.
log.Info("segment flushing, ignore the flush request until flush is done.", if node.segmentCache.checkIfCached(segID) {
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id)) logDupFlush(req.GetCollectionID(), segID)
status.Reason = "segment is flushing, nothing is done"
noErr = false
continue continue
} }
// Get the flush channel for the given segment ID.
node.segmentCache.Cache(id) // If no flush channel is found, report an error.
flushCh, err := node.flowgraphManager.getFlushCh(segID)
flushCh, err := node.flowgraphManager.getFlushCh(id)
if err != nil { if err != nil {
status.Reason = "no flush channel found for v-channel" errStatus.Reason = "no flush channel found for the segment, unable to flush"
log.Error("no flush channel found for v-channel", zap.Error(err)) log.Error(errStatus.Reason, zap.Int64("segment ID", segID), zap.Error(err))
noErr = false noErr = false
continue continue
} }
// Cache the segment and send it to its flush channel.
flushedSeg = append(flushedSeg, segID)
// Double check that the segment is still not cached.
exist := node.segmentCache.checkOrCache(segID)
if exist {
logDupFlush(req.GetCollectionID(), segID)
continue
}
flushCh <- flushMsg{ flushCh <- flushMsg{
msgID: req.Base.MsgID, msgID: req.GetBase().GetMsgID(),
timestamp: req.Base.Timestamp, timestamp: req.GetBase().GetTimestamp(),
segmentID: id, segmentID: segID,
collectionID: req.CollectionID, collectionID: req.GetCollectionID(),
flushed: flushed, flushed: flushed,
} }
} }
@ -597,24 +603,28 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
zap.Int64("collection ID", req.GetCollectionID()), zap.Int64("collection ID", req.GetCollectionID()),
zap.Int64s("segments", segmentIDs), zap.Int64s("segments", segmentIDs),
zap.Int64s("mark segments", req.GetMarkSegmentIDs())) zap.Int64s("mark segments", req.GetMarkSegmentIDs()))
return noErr return flushedSeg, noErr
} }
ok := processSegments(req.GetSegmentIDs(), true) seg, noErr1 := processSegments(req.GetSegmentIDs(), true)
if !ok { staleSeg, noErr2 := processSegments(req.GetMarkSegmentIDs(), false)
return status, nil // Log success flushed segments.
if len(seg)+len(staleSeg) > 0 {
log.Info("sending segments to flush channel",
zap.Any("newly sealed segment IDs", seg),
zap.Any("stale segment IDs", staleSeg))
} }
ok = processSegments(req.GetMarkSegmentIDs(), false) // Fail FlushSegments call if at least one segment (no matter stale or not) fails to get flushed.
if !ok { if !noErr1 || !noErr2 {
return status, nil return errStatus, nil
} }
status.ErrorCode = commonpb.ErrorCode_Success
metrics.DataNodeFlushReqCounter.WithLabelValues( metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()), fmt.Sprint(Params.DataNodeCfg.GetNodeID()),
MetricRequestsSuccess).Inc() MetricRequestsSuccess).Inc()
return &commonpb.Status{
return status, nil ErrorCode: commonpb.ErrorCode_Success,
}, nil
} }
// Stop will release DataNode resources and shutdown datanode // Stop will release DataNode resources and shutdown datanode
@ -1098,3 +1108,9 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
return nil return nil
} }
} }
func logDupFlush(cID, segID int64) {
log.Info("segment is already being flushed, ignoring flush request",
zap.Int64("collection ID", cID),
zap.Int64("segment ID", segID))
}

View File

@ -236,7 +236,7 @@ func TestDataNode(t *testing.T) {
// dup call // dup call
status, err := node1.FlushSegments(node1.ctx, req) status, err := node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
// failure call // failure call
req = &datapb.FlushSegmentsRequest{ req = &datapb.FlushSegmentsRequest{