mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: prevent data race in querycoord collection notifier update (#45037)
Fixes #45035 This commit addresses a data race issue where refreshCollection was updating the collection notifier without proper lock protection. Changes: - Add UpdateCollection method to CollectionManager with proper locking - Introduce CollectionOperator pattern for thread-safe collection updates - Make setRefreshNotifier private and use it through the operator pattern - Update refreshCollection to use the new UpdateCollection method - Handle collection not found error gracefully in refreshCollection The CollectionOperator pattern ensures all collection modifications go through the CollectionManager's lock, preventing concurrent access issues. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
b6261f0282
commit
20dcb45b3d
@ -53,7 +53,7 @@ type Collection struct {
|
|||||||
Schema *schemapb.CollectionSchema
|
Schema *schemapb.CollectionSchema
|
||||||
}
|
}
|
||||||
|
|
||||||
func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) {
|
func (collection *Collection) setRefreshNotifier(notifier chan struct{}) {
|
||||||
collection.mut.Lock()
|
collection.mut.Lock()
|
||||||
defer collection.mut.Unlock()
|
defer collection.mut.Unlock()
|
||||||
|
|
||||||
@ -576,6 +576,22 @@ func (m *CollectionManager) UpdateCollectionLoadPercent(ctx context.Context, col
|
|||||||
return collectionPercent, m.putCollection(ctx, saveCollection, newCollection)
|
return collectionPercent, m.putCollection(ctx, saveCollection, newCollection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *CollectionManager) UpdateCollection(ctx context.Context, collectionID int64, ops ...CollectionOperator) error {
|
||||||
|
m.rwmutex.Lock()
|
||||||
|
defer m.rwmutex.Unlock()
|
||||||
|
|
||||||
|
collection, ok := m.collections[collectionID]
|
||||||
|
if !ok {
|
||||||
|
return merr.WrapErrCollectionNotLoaded(collectionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, op := range ops {
|
||||||
|
op(collection)
|
||||||
|
}
|
||||||
|
m.collections[collectionID] = collection
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// RemoveCollection removes collection and its partitions.
|
// RemoveCollection removes collection and its partitions.
|
||||||
func (m *CollectionManager) RemoveCollection(ctx context.Context, collectionID typeutil.UniqueID) error {
|
func (m *CollectionManager) RemoveCollection(ctx context.Context, collectionID typeutil.UniqueID) error {
|
||||||
m.rwmutex.Lock()
|
m.rwmutex.Lock()
|
||||||
|
|||||||
27
internal/querycoordv2/meta/collection_operator.go
Normal file
27
internal/querycoordv2/meta/collection_operator.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package meta
|
||||||
|
|
||||||
|
// CollectionOperator is a function that can be used to modify a collection
|
||||||
|
type CollectionOperator func(collection *Collection) (changed bool)
|
||||||
|
|
||||||
|
func SetNotifierCollectionOp(notifier chan struct{}) CollectionOperator {
|
||||||
|
return func(collection *Collection) (changed bool) {
|
||||||
|
collection.setRefreshNotifier(notifier)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -631,8 +631,12 @@ func (s *Server) refreshCollection(ctx context.Context, collectionID int64) erro
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
collection.SetRefreshNotifier(readyCh)
|
err = s.meta.CollectionManager.UpdateCollection(ctx, collectionID, meta.SetNotifierCollectionOp(readyCh))
|
||||||
return nil
|
// if collection already released, treat as success
|
||||||
|
if errors.Is(err, merr.ErrCollectionNotFound) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is totally same to refreshCollection, remove it for now
|
// This is totally same to refreshCollection, remove it for now
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user