From a18d6b46a4e2489c5de4d66dd323e9f7caa1d421 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 10 Feb 2022 13:19:46 +0800 Subject: [PATCH] Fix watch dml channel fail because of no collection meta (#15436) Signed-off-by: xiaofan-luan --- internal/querynode/collection_replica.go | 2 +- internal/querynode/task.go | 38 ++++++++---------------- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 1f54022d81..dd077c71b0 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -209,7 +209,7 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema var newCollection = newCollection(collectionID, schema) colReplica.collections[collectionID] = newCollection - + log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID)) return newCollection } diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 5547148d6f..e13b4fd41d 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -254,6 +254,10 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { zap.Strings("pChannels", pChannels), ) + // init collection meta + sCol := w.node.streaming.replica.addCollection(collectionID, w.req.Schema) + hCol := w.node.historical.replica.addCollection(collectionID, w.req.Schema) + // load growing segments unFlushedSegments := make([]*queryPb.SegmentLoadInfo, 0) unFlushedSegmentIDs := make([]UniqueID, 0) @@ -287,7 +291,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { if err != nil { return err } - log.Debug("load growing segments done in WatchDmChannels", + log.Debug("successfully load growing segments done in WatchDmChannels", zap.Int64("collectionID", collectionID), zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) @@ -407,10 +411,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { log.Debug("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) - // init collection - sCol := w.node.streaming.replica.addCollection(collectionID, w.req.Schema) - hCol := w.node.historical.replica.addCollection(collectionID, w.req.Schema) - sCol.addVChannels(vChannels) sCol.addPChannels(pChannels) sCol.setLoadType(lType) @@ -620,27 +620,15 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { for _, info := range l.req.Infos { collectionID := info.CollectionID partitionID := info.PartitionID - hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID) - hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID) - if !hasCollectionInHistorical { - l.node.historical.replica.addCollection(collectionID, l.req.Schema) + l.node.historical.replica.addCollection(collectionID, l.req.Schema) + err = l.node.historical.replica.addPartition(collectionID, partitionID) + if err != nil { + return err } - if !hasPartitionInHistorical { - err = l.node.historical.replica.addPartition(collectionID, partitionID) - if err != nil { - return err - } - } - hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID) - hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID) - if !hasCollectionInStreaming { - l.node.streaming.replica.addCollection(collectionID, l.req.Schema) - } - if !hasPartitionInStreaming { - err = l.node.streaming.replica.addPartition(collectionID, partitionID) - if err != nil { - return err - } + l.node.streaming.replica.addCollection(collectionID, l.req.Schema) + err = l.node.streaming.replica.addPartition(collectionID, partitionID) + if err != nil { + return err } }