Set collectionID in FlushSegmentsRequest (#14563)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-12-30 19:11:30 +08:00 committed by GitHub
parent b063bfda8f
commit 9e036af714
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -83,8 +83,11 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar
channelNodes[ch.Name] = c.NodeID
}
}
// collectionID shall be the same in single Flush call
var collectionID int64
// find node on which segment exists
for _, segment := range segments {
collectionID = segment.CollectionID
nodeID, ok := channelNodes[segment.GetInsertChannel()]
if !ok {
log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel()))
@ -94,6 +97,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar
targetNodes[nodeID] = struct{}{}
}
for _, segment := range markSegments {
collectionID = segment.CollectionID
nodeID, ok := channelNodes[segment.GetInsertChannel()]
if !ok {
log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel()))
@ -114,6 +118,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar
MsgType: commonpb.MsgType_Flush,
SourceID: Params.DataCoordCfg.NodeID,
},
CollectionID: collectionID,
SegmentIDs: segments,
MarkSegmentIDs: marks,
}