From c7ec882033f156b28f64afe9cf373bb71c841efe Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 21 Nov 2023 18:08:29 +0800 Subject: [PATCH] enhance: Remove rpc during querycoord start (#28396) (#28604) issue: #28332 pr: #28396 during querycoord's recover, it try to call `DescribeCollection` and `ShowPartitions` to root coord, to checker whether collection or partition has been released in rootcoord. but if rootcoord isn't not ready yet, the rpc will fail, the querycoord panic. to fix this, we remove rpc call during querycoord's start Signed-off-by: Wei Liu --- .../querycoordv2/meta/collection_manager.go | 38 -------- .../meta/collection_manager_test.go | 87 ------------------- .../querycoordv2/meta/coordinator_broker.go | 7 -- 3 files changed, 132 deletions(-) diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 43f4dcaf3e..8ddebc9c62 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" "go.uber.org/zap" @@ -129,18 +128,6 @@ func (m *CollectionManager) Recover(broker Broker) error { ctxLog.Info("recover collections and partitions from kv store") for _, collection := range collections { - // Dropped collection should be deprecated - _, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID()) - if errors.Is(err, merr.ErrCollectionNotFound) { - ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) - m.catalog.ReleaseCollection(collection.GetCollectionID()) - continue - } - if err != nil { - ctxLog.Warn("failed to get collection schema", zap.Error(err)) - return err - } - if collection.GetReplicaNumber() <= 0 { ctxLog.Info("skip recovery and release collection due to invalid replica number", zap.Int64("collectionID", collection.GetCollectionID()), @@ -169,31 +156,6 @@ func (m *CollectionManager) Recover(broker Broker) error { } for collection, partitions := range partitions { - existPartitions, err := broker.GetPartitions(ctx, collection) - if errors.Is(err, merr.ErrCollectionNotFound) { - ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection)) - m.catalog.ReleaseCollection(collection) - continue - } - if err != nil { - ctxLog.Warn("failed to get partitions", zap.Error(err)) - return err - } - omitPartitions := make([]int64, 0) - partitions = lo.Filter(partitions, func(partition *querypb.PartitionLoadInfo, _ int) bool { - if !lo.Contains(existPartitions, partition.GetPartitionID()) { - omitPartitions = append(omitPartitions, partition.GetPartitionID()) - return false - } - return true - }) - if len(omitPartitions) > 0 { - ctxLog.Info("skip dropped partitions during recovery", - zap.Int64("collection", collection), - zap.Int64s("partitions", omitPartitions)) - m.catalog.ReleasePartition(collection, omitPartitions...) - } - for _, partition := range partitions { // Partitions not loaded done should be deprecated if partition.GetStatus() != querypb.LoadStatus_Loaded { diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 311bbbe6af..e3ce6df67f 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -17,7 +17,6 @@ package meta import ( - "fmt" "sort" "testing" "time" @@ -35,7 +34,6 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -178,13 +176,6 @@ func (suite *CollectionManagerSuite) TestGet() { func (suite *CollectionManagerSuite) TestUpdate() { mgr := suite.mgr - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } - collections := mgr.GetAllCollections() partitions := mgr.GetAllPartitions() for _, collection := range collections { @@ -251,11 +242,6 @@ func (suite *CollectionManagerSuite) TestGetFieldIndex() { func (suite *CollectionManagerSuite) TestRemove() { mgr := suite.mgr - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() - } - // Remove collections/partitions for i, collectionID := range suite.collections { if suite.loadTypes[i] == querypb.LoadType_LoadCollection { @@ -320,13 +306,6 @@ func (suite *CollectionManagerSuite) TestRemove() { func (suite *CollectionManagerSuite) TestRecover_normal() { mgr := suite.mgr - // recover successfully - for _, collection := range suite.collections { - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, nil) - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } suite.clearMemory() err := mgr.Recover(suite.broker) suite.NoError(err) @@ -342,7 +321,6 @@ func (suite *CollectionManagerSuite) TestRecover_normal() { func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { mgr := suite.mgr suite.releaseAll() - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) // test put collection with partitions for i, collection := range suite.collections { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() @@ -424,64 +402,6 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { } } -func (suite *CollectionManagerSuite) TestRecover_with_dropped() { - mgr := suite.mgr - - droppedCollection := int64(101) - droppedPartition := int64(13) - - for _, collection := range suite.collections { - if collection == droppedCollection { - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound) - } else { - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, nil) - } - if len(suite.partitions[collection]) != 0 { - if collection == droppedCollection { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound) - } else { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection). - Return(lo.Filter(suite.partitions[collection], func(partition int64, _ int) bool { - return partition != droppedPartition - }), nil) - } - } - } - suite.clearMemory() - err := mgr.Recover(suite.broker) - suite.NoError(err) - for _, collection := range suite.collections { - exist := collection != droppedCollection - suite.Equal(exist, mgr.Exist(collection)) - if !exist { - continue - } - for _, partitionID := range suite.partitions[collection] { - partition := mgr.GetPartition(partitionID) - exist = partitionID != droppedPartition - suite.Equal(exist, partition != nil) - } - } -} - -func (suite *CollectionManagerSuite) TestRecover_Failed() { - mockErr1 := fmt.Errorf("mock GetCollectionSchema err") - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, mockErr1) - suite.clearMemory() - err := suite.mgr.Recover(suite.broker) - suite.Error(err) - suite.ErrorIs(err, mockErr1) - - mockErr2 := fmt.Errorf("mock GetPartitions err") - suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return(nil, mockErr2) - suite.clearMemory() - err = suite.mgr.Recover(suite.broker) - suite.Error(err) - suite.ErrorIs(err, mockErr2) -} - func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() { mgr := suite.mgr mgr.PutCollection(&Collection{ @@ -539,13 +459,6 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() { suite.releaseAll() mgr := suite.mgr - suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } - // put old version of collections and partitions for i, collection := range suite.collections { status := querypb.LoadStatus_Loaded diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index d85b2a0224..1eaa0babe4 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -26,11 +26,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -149,11 +147,6 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti return nil, nil, err } - path := params.Params.MinioCfg.RootPath.GetValue() - // refill log ID with log path - for _, segmentInfo := range recoveryInfo.Segments { - datacoord.DecompressBinLog(path, segmentInfo) - } return recoveryInfo.Channels, recoveryInfo.Segments, nil }