diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 0e1ba86511..2dcd96b6a0 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -159,8 +159,8 @@ func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *q status, err := node.client.LoadSegments(ctx, in) if err == nil && status.ErrorCode == commonpb.ErrorCode_Success { for _, info := range in.Infos { - c.clusterMeta.addCollection(info.CollectionID, in.Schema) - c.clusterMeta.addPartition(info.CollectionID, info.PartitionID) + //c.clusterMeta.addCollection(info.CollectionID, in.Schema) + //c.clusterMeta.addPartition(info.CollectionID, info.PartitionID) node.addCollection(info.CollectionID, in.Schema) node.addPartition(info.CollectionID, info.PartitionID) @@ -225,8 +225,8 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in log.Debug("queryNode watch dm channel done") if err == nil && status.ErrorCode == commonpb.ErrorCode_Success { collectionID := in.CollectionID - c.clusterMeta.addCollection(collectionID, in.Schema) - c.clusterMeta.addDmChannel(collectionID, nodeID, channels) + //c.clusterMeta.addCollection(collectionID, in.Schema) + //c.clusterMeta.addDmChannel(collectionID, nodeID, channels) node.addCollection(collectionID, in.Schema) node.addDmChannel(collectionID, channels) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index 66edd5970c..883cf779b0 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -108,15 +108,6 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle return status, err } - hasCollection := qc.meta.hasCollection(collectionID) - if hasCollection { - loadCollection, _ := qc.meta.getLoadCollection(collectionID) - if loadCollection { - status.Reason = "collection has been loaded" - return status, nil - } - } - loadCollectionTask := &LoadCollectionTask{ BaseTask: BaseTask{ ctx: qc.loopCtx, diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 15c87a4875..3266b96390 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -137,6 +137,7 @@ func (m *meta) showPartitions(collectionID UniqueID) ([]UniqueID, error) { m.RLock() defer m.RUnlock() + //TODO::should update after load collection if info, ok := m.collectionInfos[collectionID]; ok { return info.PartitionIDs, nil } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 58511d8bd2..81e42a16aa 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -164,8 +164,6 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error { ErrorCode: commonpb.ErrorCode_UnexpectedError, } - lct.meta.addCollection(collectionID, lct.Schema) - lct.meta.setLoadCollection(collectionID, true) showPartitionRequest := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, @@ -178,14 +176,38 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error { lct.result = status return err } - log.Debug("loadCollectionTask: get recovery info", zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs)) + log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs)) + + partitionIDs := showPartitionResponse.PartitionIDs toLoadPartitionIDs := make([]UniqueID, 0) - for _, id := range showPartitionResponse.PartitionIDs { - if !lct.meta.hasPartition(collectionID, id) { - toLoadPartitionIDs = append(toLoadPartitionIDs, id) + hasCollection := lct.meta.hasCollection(collectionID) + if hasCollection { + loadCollection, _ := lct.meta.getLoadCollection(collectionID) + if loadCollection { + for _, partitionID := range partitionIDs { + hasReleasePartition := lct.meta.hasReleasePartition(collectionID, partitionID) + if hasReleasePartition { + toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID) + } + } + } else { + for _, partitionID := range partitionIDs { + hasPartition := lct.meta.hasPartition(collectionID, partitionID) + if !hasPartition { + toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID) + } + } } + } else { + toLoadPartitionIDs = partitionIDs } + log.Debug("loadCollectionTask: toLoadPartitionIDs", zap.Int64s("partitionIDs", toLoadPartitionIDs)) + lct.meta.addCollection(collectionID, lct.Schema) + lct.meta.setLoadCollection(collectionID, true) + for _, id := range toLoadPartitionIDs { + lct.meta.addPartition(collectionID, id) + } segment2Binlog := make(map[UniqueID]*querypb.SegmentLoadInfo) watchRequests := make(map[string]*querypb.WatchDmChannelsRequest)