From 9e036af714e76e76fb064eec993858e443e36a83 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 30 Dec 2021 19:11:30 +0800 Subject: [PATCH] Set collectionID in FlushSegmentsRequest (#14563) Signed-off-by: Congqi Xia --- internal/datacoord/cluster.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 17a0893c0b..73271184fa 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -83,8 +83,11 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar channelNodes[ch.Name] = c.NodeID } } + // collectionID shall be the same in single Flush call + var collectionID int64 // find node on which segment exists for _, segment := range segments { + collectionID = segment.CollectionID nodeID, ok := channelNodes[segment.GetInsertChannel()] if !ok { log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) @@ -94,6 +97,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar targetNodes[nodeID] = struct{}{} } for _, segment := range markSegments { + collectionID = segment.CollectionID nodeID, ok := channelNodes[segment.GetInsertChannel()] if !ok { log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) @@ -114,6 +118,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar MsgType: commonpb.MsgType_Flush, SourceID: Params.DataCoordCfg.NodeID, }, + CollectionID: collectionID, SegmentIDs: segments, MarkSegmentIDs: marks, }