diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 77e76f5874..8bf2d9f5c0 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -94,7 +94,6 @@ type collectionInfo struct { shardLeaders *shardLeaders createdTimestamp uint64 createdUtcTimestamp uint64 - isLoaded bool } func (info *collectionInfo) isCollectionCached() bool { @@ -294,40 +293,6 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) } - if !collInfo.isLoaded { - // check if collection was loaded - showResp, err := m.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - CollectionIDs: []int64{collInfo.collID}, - }) - if err != nil { - return nil, err - } - if showResp.Status.ErrorCode != commonpb.ErrorCode_Success { - return nil, errors.New(showResp.Status.Reason) - } - log.Debug("QueryCoord show collections", - zap.Int64("collID", collInfo.collID), - zap.Int64s("collections", showResp.GetCollectionIDs()), - zap.Int64s("collectionsInMemoryPercentages", showResp.GetInMemoryPercentages()), - ) - loaded := false - for index, collID := range showResp.CollectionIDs { - if collID == collInfo.collID && showResp.GetInMemoryPercentages()[index] >= int64(100) { - loaded = true - break - } - } - if loaded { - m.mu.Lock() - m.collInfo[collectionName].isLoaded = true - m.mu.Unlock() - } - } - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc() return collInfo, nil } diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index 5d5a336f7f..8db3e71726 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -685,60 +685,6 @@ func TestMetaCache_PolicyInfo(t *testing.T) { }) } -func TestMetaCache_LoadCache(t *testing.T) { - ctx := context.Background() - rootCoord := &MockRootCoordClientInterface{} - queryCoord := &types.MockQueryCoord{} - mgr := newShardClientMgr() - err := InitMetaCache(ctx, rootCoord, queryCoord, mgr) - assert.Nil(t, err) - - qcCounter := 0 - queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - CollectionIDs: []UniqueID{1, 2}, - InMemoryPercentages: []int64{100, 50}, - }, nil).Run(func(ctx context.Context, req *querypb.ShowCollectionsRequest) { - qcCounter++ - }) - t.Run("test IsCollectionLoaded", func(t *testing.T) { - info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1") - assert.NoError(t, err) - assert.True(t, info.isLoaded) - // no collectionInfo of collection1, should access RootCoord - assert.Equal(t, rootCoord.GetAccessCount(), 1) - // not loaded, should access QueryCoord - assert.Equal(t, qcCounter, 1) - - info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") - assert.NoError(t, err) - assert.True(t, info.isLoaded) - // shouldn't access QueryCoord or RootCoord again - assert.Equal(t, rootCoord.GetAccessCount(), 1) - assert.Equal(t, qcCounter, 1) - - // test collection2 not fully loaded - info, err = globalMetaCache.GetCollectionInfo(ctx, "collection2") - assert.NoError(t, err) - assert.False(t, info.isLoaded) - // no collectionInfo of collection2, should access RootCoord - assert.Equal(t, rootCoord.GetAccessCount(), 2) - // not loaded, should access QueryCoord - assert.Equal(t, qcCounter, 2) - }) - - t.Run("test RemoveCollectionLoadCache", func(t *testing.T) { - globalMetaCache.RemoveCollection(ctx, "collection1") - info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1") - assert.NoError(t, err) - assert.True(t, info.isLoaded) - // should access QueryCoord - assert.Equal(t, qcCounter, 3) - }) -} - func TestMetaCache_RemoveCollection(t *testing.T) { ctx := context.Background() rootCoord := &MockRootCoordClientInterface{} @@ -755,31 +701,27 @@ func TestMetaCache_RemoveCollection(t *testing.T) { InMemoryPercentages: []int64{100, 50}, }, nil) - info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1") + _, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) - assert.True(t, info.isLoaded) // no collectionInfo of collection1, should access RootCoord assert.Equal(t, rootCoord.GetAccessCount(), 1) - info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") + _, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) - assert.True(t, info.isLoaded) // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 1) globalMetaCache.RemoveCollection(ctx, "collection1") // no collectionInfo of collection2, should access RootCoord - info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") + _, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) - assert.True(t, info.isLoaded) // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 2) globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1)) // no collectionInfo of collection2, should access RootCoord - info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") + _, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) - assert.True(t, info.isLoaded) // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 3) } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 72a90c7199..cbddb8b31c 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -308,14 +308,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error { t.queryParams = queryParams t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset - loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.RetrieveRequest.GetPartitionIDs()) - if err != nil { - return fmt.Errorf("checkIfLoaded failed when query, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err) - } - if !loaded { - return fmt.Errorf("collection:%v or partition:%v not loaded into memory when query", collectionName, t.request.GetPartitionNames()) - } - schema, _ := globalMetaCache.GetCollectionSchema(ctx, collectionName) t.schema = schema diff --git a/internal/proxy/task_query_test.go b/internal/proxy/task_query_test.go index 3c7f670a7b..acc06d9f20 100644 --- a/internal/proxy/task_query_test.go +++ b/internal/proxy/task_query_test.go @@ -111,12 +111,6 @@ func TestQueryTask_all(t *testing.T) { collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) - qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &successStatus, - CollectionIDs: []int64{collectionID}, - InMemoryPercentages: []int64{100}, - }, nil) - status, err := qc.LoadCollection(ctx, &querypb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadCollection, diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index bc103116cf..1848960be2 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -234,15 +234,6 @@ func (t *searchTask) PreExecute(ctx context.Context) error { return err } - // check if collection/partitions are loaded into query node - loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.SearchRequest.GetPartitionIDs()) - if err != nil { - return fmt.Errorf("checkIfLoaded failed when search, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err) - } - if !loaded { - return fmt.Errorf("collection:%v or partition:%v not loaded into memory when search", collectionName, t.request.GetPartitionNames()) - } - t.request.OutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false) if err != nil { return err @@ -639,43 +630,6 @@ func (t *searchTask) collectSearchResults(ctx context.Context) error { return nil } -// checkIfLoaded check if collection was loaded into QueryNode -func checkIfLoaded(ctx context.Context, qc types.QueryCoord, collectionName string, searchPartitionIDs []UniqueID) (bool, error) { - info, err := globalMetaCache.GetCollectionInfo(ctx, collectionName) - if err != nil { - return false, fmt.Errorf("GetCollectionInfo failed, collection = %s, err = %s", collectionName, err) - } - if info.isLoaded { - return true, nil - } - if len(searchPartitionIDs) == 0 { - return false, nil - } - - // If request to search partitions - resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - CollectionID: info.collID, - PartitionIDs: searchPartitionIDs, - }) - if err != nil { - return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err) - } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason()) - } - - for _, persent := range resp.InMemoryPercentages { - if persent < 100 { - return false, nil - } - } - return true, nil -} - func decodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) { tr := timerecord.NewTimeRecorder("decodeSearchResults") results := make([]*schemapb.SearchResultData, 0) diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 8a249e09df..7e800bc039 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -128,7 +127,6 @@ func TestSearchTask_PreExecute(t *testing.T) { qc = types.NewMockQueryCoord(t) ctx = context.TODO() ) - successStatus := commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} err = rc.Start() defer rc.Stop() @@ -169,30 +167,7 @@ func TestSearchTask_PreExecute(t *testing.T) { return task } - mockShowCollectionSuccess := func() *mock.Call { - return qc.On("ShowCollections", mock.Anything, mock.Anything).Return( - func(ctx context.Context, req *querypb.ShowCollectionsRequest) *querypb.ShowCollectionsResponse { - return &querypb.ShowCollectionsResponse{ - Status: &successStatus, - CollectionIDs: req.CollectionIDs, - InMemoryPercentages: []int64{100}, - } - }, nil) - } - - mockShowCollectionFail := func() *mock.Call { - return qc.On("ShowCollections", mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "mock", - }, - }, nil) - } - t.Run("bad nq 0", func(t *testing.T) { - call := mockShowCollectionSuccess() - defer call.Unset() - collName := "test_bad_nq0_error" + funcutil.GenRandomStr() createColl(t, collName, rc) // Nq must be in range [1, 16384]. @@ -202,8 +177,6 @@ func TestSearchTask_PreExecute(t *testing.T) { }) t.Run("bad nq 16385", func(t *testing.T) { - call := mockShowCollectionSuccess() - defer call.Unset() collName := "test_bad_nq16385_error" + funcutil.GenRandomStr() createColl(t, collName, rc) @@ -221,8 +194,6 @@ func TestSearchTask_PreExecute(t *testing.T) { }) t.Run("invalid IgnoreGrowing param", func(t *testing.T) { - call := mockShowCollectionSuccess() - defer call.Unset() collName := "test_invalid_param" + funcutil.GenRandomStr() createColl(t, collName, rc) @@ -232,21 +203,7 @@ func TestSearchTask_PreExecute(t *testing.T) { assert.Error(t, err) }) - t.Run("test checkIfLoaded error", func(t *testing.T) { - collName := "test_checkIfLoaded_error" + funcutil.GenRandomStr() - createColl(t, collName, rc) - task := getSearchTask(t, collName) - - t.Run("show collection status unexpected error", func(t *testing.T) { - call := mockShowCollectionFail() - defer call.Unset() - assert.Error(t, task.PreExecute(ctx)) - }) - }) - t.Run("search with timeout", func(t *testing.T) { - call := mockShowCollectionSuccess() - defer call.Unset() collName := "search_with_timeout" + funcutil.GenRandomStr() createColl(t, collName, rc) @@ -1551,135 +1508,6 @@ func TestTaskSearch_reduceSearchResultData(t *testing.T) { }) } -func Test_checkIfLoaded(t *testing.T) { - t.Run("failed to get collection info", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return nil, errors.New("mock") - }) - globalMetaCache = cache - var qc types.QueryCoord - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) - assert.Error(t, err) - }) - - t.Run("collection loaded", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: true}, nil - }) - globalMetaCache = cache - var qc types.QueryCoord - loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) - assert.NoError(t, err) - assert.True(t, loaded) - }) - - t.Run("show partitions failed", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Times(1) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.Error(t, err) - }) - - t.Run("show partitions but didn't success", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil).Times(1) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.Error(t, err) - }) - - t.Run("partitions loaded", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return( - &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 100}}, nil).Times(1) - loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.NoError(t, err) - assert.True(t, loaded) - }) - - t.Run("partitions loaded, some patitions not fully loaded", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return( - &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 50}}, nil).Times(1) - loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.NoError(t, err) - assert.False(t, loaded) - }) - - t.Run("no specified partitions, show partitions failed", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Times(1) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.Error(t, err) - }) - - t.Run("no specified partitions, show partitions but didn't succeed", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil).Times(1) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) - assert.Error(t, err) - }) - - t.Run("not fully loaded", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return( - &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{1, 2}}, nil).Times(1) - loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) - assert.NoError(t, err) - assert.False(t, loaded) - }) - - t.Run("not loaded", func(t *testing.T) { - cache := newMockCache() - cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { - return &collectionInfo{isLoaded: false}, nil - }) - globalMetaCache = cache - qc := getQueryCoord() - qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return( - &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{}}, nil).Times(1) - loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) - assert.NoError(t, err) - assert.False(t, loaded) - }) -} - func TestSearchTask_ErrExecute(t *testing.T) { var ( diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 18d1b4176e..4bb92b5bdb 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -839,7 +839,14 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade Status: merr.Status(nil), } - if s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) < 100 { + percentage := s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) + if percentage < 0 { + err := merr.WrapErrCollectionNotLoaded(req.GetCollectionID()) + log.Warn("failed to GetShardLeaders", zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + if percentage < 100 { msg := fmt.Sprintf("collection %v is not fully loaded", req.GetCollectionID()) log.Warn(msg) resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 7af73e5b03..7dca34010b 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1516,6 +1516,15 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode()) } + + // collection not loaded + req := &querypb.GetShardLeadersRequest{ + CollectionID: -1, + } + resp, err := server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + suite.True(errors.Is(merr.Error(resp.GetStatus()), merr.ErrCollectionNotLoaded)) } func (suite *ServiceSuite) TestHandleNodeUp() { diff --git a/tests/python_client/testcases/test_partition.py b/tests/python_client/testcases/test_partition.py index f5971dcc1f..81a62bad73 100644 --- a/tests/python_client/testcases/test_partition.py +++ b/tests/python_client/testcases/test_partition.py @@ -894,7 +894,7 @@ class TestPartitionOperations(TestcaseBase): params={"nprobe": 32}, limit=1, check_task=ct.CheckTasks.err_res, check_items={ct.err_code: 0, - ct.err_msg: "not been loaded"}) + ct.err_msg: "not loaded"}) # release partition partition_w.release()