diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 17a0893c0b..73271184fa 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -83,8 +83,11 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar channelNodes[ch.Name] = c.NodeID } } + // collectionID shall be the same in single Flush call + var collectionID int64 // find node on which segment exists for _, segment := range segments { + collectionID = segment.CollectionID nodeID, ok := channelNodes[segment.GetInsertChannel()] if !ok { log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) @@ -94,6 +97,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar targetNodes[nodeID] = struct{}{} } for _, segment := range markSegments { + collectionID = segment.CollectionID nodeID, ok := channelNodes[segment.GetInsertChannel()] if !ok { log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) @@ -114,6 +118,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar MsgType: commonpb.MsgType_Flush, SourceID: Params.DataCoordCfg.NodeID, }, + CollectionID: collectionID, SegmentIDs: segments, MarkSegmentIDs: marks, }