diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index b1a4eacf83..e7d8c1a851 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -53,7 +53,7 @@ type Collection struct { Schema *schemapb.CollectionSchema } -func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) { +func (collection *Collection) setRefreshNotifier(notifier chan struct{}) { collection.mut.Lock() defer collection.mut.Unlock() @@ -576,6 +576,22 @@ func (m *CollectionManager) UpdateCollectionLoadPercent(ctx context.Context, col return collectionPercent, m.putCollection(ctx, saveCollection, newCollection) } +func (m *CollectionManager) UpdateCollection(ctx context.Context, collectionID int64, ops ...CollectionOperator) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + collection, ok := m.collections[collectionID] + if !ok { + return merr.WrapErrCollectionNotLoaded(collectionID) + } + + for _, op := range ops { + op(collection) + } + m.collections[collectionID] = collection + return nil +} + // RemoveCollection removes collection and its partitions. func (m *CollectionManager) RemoveCollection(ctx context.Context, collectionID typeutil.UniqueID) error { m.rwmutex.Lock() diff --git a/internal/querycoordv2/meta/collection_operator.go b/internal/querycoordv2/meta/collection_operator.go new file mode 100644 index 0000000000..61e460e383 --- /dev/null +++ b/internal/querycoordv2/meta/collection_operator.go @@ -0,0 +1,27 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +// CollectionOperator is a function that can be used to modify a collection +type CollectionOperator func(collection *Collection) (changed bool) + +func SetNotifierCollectionOp(notifier chan struct{}) CollectionOperator { + return func(collection *Collection) (changed bool) { + collection.setRefreshNotifier(notifier) + return true + } +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d0d10078f8..566f360bd6 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -631,8 +631,12 @@ func (s *Server) refreshCollection(ctx context.Context, collectionID int64) erro return err } - collection.SetRefreshNotifier(readyCh) - return nil + err = s.meta.CollectionManager.UpdateCollection(ctx, collectionID, meta.SetNotifierCollectionOp(readyCh)) + // if collection already released, treat as success + if errors.Is(err, merr.ErrCollectionNotFound) { + return nil + } + return err } // This is totally same to refreshCollection, remove it for now