diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 1f1de75f9f..8ddebc9c62 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" "go.uber.org/zap" @@ -129,18 +128,6 @@ func (m *CollectionManager) Recover(broker Broker) error { ctxLog.Info("recover collections and partitions from kv store") for _, collection := range collections { - // Dropped collection should be deprecated - _, err = broker.DescribeCollection(ctx, collection.GetCollectionID()) - if errors.Is(err, merr.ErrCollectionNotFound) { - ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) - m.catalog.ReleaseCollection(collection.GetCollectionID()) - continue - } - if err != nil { - ctxLog.Warn("failed to get collection schema", zap.Error(err)) - return err - } - if collection.GetReplicaNumber() <= 0 { ctxLog.Info("skip recovery and release collection due to invalid replica number", zap.Int64("collectionID", collection.GetCollectionID()), @@ -169,31 +156,6 @@ func (m *CollectionManager) Recover(broker Broker) error { } for collection, partitions := range partitions { - existPartitions, err := broker.GetPartitions(ctx, collection) - if errors.Is(err, merr.ErrCollectionNotFound) { - ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection)) - m.catalog.ReleaseCollection(collection) - continue - } - if err != nil { - ctxLog.Warn("failed to get partitions", zap.Error(err)) - return err - } - omitPartitions := make([]int64, 0) - partitions = lo.Filter(partitions, func(partition *querypb.PartitionLoadInfo, _ int) bool { - if !lo.Contains(existPartitions, partition.GetPartitionID()) { - omitPartitions = append(omitPartitions, partition.GetPartitionID()) - return false - } - return true - }) - if len(omitPartitions) > 0 { - ctxLog.Info("skip dropped partitions during recovery", - zap.Int64("collection", collection), - zap.Int64s("partitions", omitPartitions)) - m.catalog.ReleasePartition(collection, omitPartitions...) - } - for _, partition := range partitions { // Partitions not loaded done should be deprecated if partition.GetStatus() != querypb.LoadStatus_Loaded { diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 2adc7b758d..e3ce6df67f 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -17,7 +17,6 @@ package meta import ( - "fmt" "sort" "testing" "time" @@ -35,7 +34,6 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -178,13 +176,6 @@ func (suite *CollectionManagerSuite) TestGet() { func (suite *CollectionManagerSuite) TestUpdate() { mgr := suite.mgr - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } - collections := mgr.GetAllCollections() partitions := mgr.GetAllPartitions() for _, collection := range collections { @@ -251,11 +242,6 @@ func (suite *CollectionManagerSuite) TestGetFieldIndex() { func (suite *CollectionManagerSuite) TestRemove() { mgr := suite.mgr - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() - } - // Remove collections/partitions for i, collectionID := range suite.collections { if suite.loadTypes[i] == querypb.LoadType_LoadCollection { @@ -320,13 +306,6 @@ func (suite *CollectionManagerSuite) TestRemove() { func (suite *CollectionManagerSuite) TestRecover_normal() { mgr := suite.mgr - // recover successfully - for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, nil) - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } suite.clearMemory() err := mgr.Recover(suite.broker) suite.NoError(err) @@ -342,7 +321,6 @@ func (suite *CollectionManagerSuite) TestRecover_normal() { func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { mgr := suite.mgr suite.releaseAll() - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) // test put collection with partitions for i, collection := range suite.collections { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() @@ -424,64 +402,6 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { } } -func (suite *CollectionManagerSuite) TestRecover_with_dropped() { - mgr := suite.mgr - - droppedCollection := int64(101) - droppedPartition := int64(13) - - for _, collection := range suite.collections { - if collection == droppedCollection { - suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound) - } else { - suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, nil) - } - if len(suite.partitions[collection]) != 0 { - if collection == droppedCollection { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound) - } else { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection). - Return(lo.Filter(suite.partitions[collection], func(partition int64, _ int) bool { - return partition != droppedPartition - }), nil) - } - } - } - suite.clearMemory() - err := mgr.Recover(suite.broker) - suite.NoError(err) - for _, collection := range suite.collections { - exist := collection != droppedCollection - suite.Equal(exist, mgr.Exist(collection)) - if !exist { - continue - } - for _, partitionID := range suite.partitions[collection] { - partition := mgr.GetPartition(partitionID) - exist = partitionID != droppedPartition - suite.Equal(exist, partition != nil) - } - } -} - -func (suite *CollectionManagerSuite) TestRecover_Failed() { - mockErr1 := fmt.Errorf("mock.DescribeCollection err") - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, mockErr1) - suite.clearMemory() - err := suite.mgr.Recover(suite.broker) - suite.Error(err) - suite.ErrorIs(err, mockErr1) - - mockErr2 := fmt.Errorf("mock GetPartitions err") - suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return(nil, mockErr2) - suite.clearMemory() - err = suite.mgr.Recover(suite.broker) - suite.Error(err) - suite.ErrorIs(err, mockErr2) -} - func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() { mgr := suite.mgr mgr.PutCollection(&Collection{ @@ -539,13 +459,6 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() { suite.releaseAll() mgr := suite.mgr - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - for _, collection := range suite.collections { - if len(suite.partitions[collection]) > 0 { - suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) - } - } - // put old version of collections and partitions for i, collection := range suite.collections { status := querypb.LoadStatus_Loaded diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 9d1709199b..cd18da3e13 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -25,11 +25,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -148,11 +146,6 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti return nil, nil, err } - path := params.Params.MinioCfg.RootPath.GetValue() - // refill log ID with log path - for _, segmentInfo := range recoveryInfo.Segments { - datacoord.DecompressBinLog(path, segmentInfo) - } return recoveryInfo.Channels, recoveryInfo.Segments, nil }