enhance: optimize describe collection and index (#37490)

fix #37489
combine multiple describe collection and list index into one call

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2024-11-07 18:18:34 -08:00 committed by GitHub
parent 49657c4690
commit e073906a19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -26,6 +26,8 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -390,6 +392,10 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
actions := make([]*querypb.SyncAction, 0, 1)
var collectionInfo *milvuspb.DescribeCollectionResponse
var partitions []int64
var indexInfo []*indexpb.IndexInfo
var err error
for _, replica := range replicas {
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
@ -406,7 +412,34 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
if !ob.sync(ctx, replica, leaderView, actions) {
if len(actions) == 0 {
continue
}
// init all the meta information
if collectionInfo == nil {
collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
partitions, err = utils.GetPartitions(ob.meta.CollectionManager, collectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}
// Get collection index info
indexInfo, err = ob.broker.ListIndexes(ctx, collectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
}
if !ob.sync(ctx, replica, leaderView, actions, collectionInfo, partitions, indexInfo) {
return false
}
}
@ -415,7 +448,9 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
return true
}
func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction,
collectionInfo *milvuspb.DescribeCollectionResponse, partitions []int64, indexInfo []*indexpb.IndexInfo,
) bool {
if len(diffs) == 0 {
return true
}
@ -427,24 +462,6 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
zap.String("channel", leaderView.Channel),
)
collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}
// Get collection index info
indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID())
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),