diff --git a/internal/querynode/task.go b/internal/querynode/task.go index babd8e3612..bb076681d9 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -186,7 +186,7 @@ func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error { return nil } -func (w *watchDmChannelsTask) Execute(ctx context.Context) error { +func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { collectionID := w.req.CollectionID partitionIDs := w.req.GetPartitionIDs() @@ -201,8 +201,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } // get all vChannels - vChannels := make([]Channel, 0) - pChannels := make([]Channel, 0) + var vChannels, pChannels []Channel VPChannels := make(map[string]string) // map[vChannel]pChannel for _, info := range w.req.Infos { v := info.ChannelName @@ -234,6 +233,14 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { w.node.ShardClusterService.addShardCluster(w.req.GetCollectionID(), w.req.GetReplicaID(), vchannel) } + defer func() { + if err != nil { + for _, vchannel := range vChannels { + w.node.ShardClusterService.releaseShardCluster(vchannel) + } + } + }() + // load growing segments unFlushedSegments := make([]*queryPb.SegmentLoadInfo, 0) unFlushedSegmentIDs := make([]UniqueID, 0) @@ -279,7 +286,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { zap.Int64("collectionID", collectionID), zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) - err := w.node.loader.loadSegment(req, segmentTypeGrowing) + err = w.node.loader.loadSegment(req, segmentTypeGrowing) if err != nil { log.Warn(err.Error()) return err