From dad060af2e082043de56c60a5cdec107d3dad860 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Tue, 23 Aug 2022 15:14:52 +0800 Subject: [PATCH] Fix func checkIfLoaded in proxy (#18769) Signed-off-by: aoiasd Signed-off-by: aoiasd --- internal/proxy/task_search.go | 38 +++++++----------------------- internal/proxy/task_search_test.go | 23 ++++++++++++++---- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index da6fd1bab4..7a0eeee3f5 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -444,52 +444,32 @@ func checkIfLoaded(ctx context.Context, qc types.QueryCoord, collectionName stri if info.isLoaded { return true, nil } - - // If request to search partitions - if len(searchPartitionIDs) > 0 { - resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ShowPartitions, - SourceID: Params.ProxyCfg.GetNodeID(), - }, - CollectionID: info.collID, - PartitionIDs: searchPartitionIDs, - }) - if err != nil { - return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err) - } - - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason()) - } - // Current logic: show partitions won't return error if the given partitions are all loaded - return true, nil + if len(searchPartitionIDs) == 0 { + return false, nil } - // If request to search collection and collection is not fully loaded + // If request to search partitions resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, SourceID: Params.ProxyCfg.GetNodeID(), }, CollectionID: info.collID, + PartitionIDs: searchPartitionIDs, }) if err != nil { return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason()) } - if len(resp.GetPartitionIDs()) > 0 { - log.Ctx(ctx).Warn("collection not fully loaded, search on these partitions", - zap.String("collection", collectionName), - zap.Int64("collectionID", info.collID), zap.Int64s("partitionIDs", resp.GetPartitionIDs())) - return true, nil + for _, persent := range resp.InMemoryPercentages { + if persent < 100 { + return false, nil + } } - - return false, nil + return true, nil } func decodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) { diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 03ae3b6ab7..460ba33f8a 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -1460,13 +1460,28 @@ func Test_checkIfLoaded(t *testing.T) { globalMetaCache = cache qc := NewQueryCoordMock() qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { - return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil + return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 100}}, nil }) loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) assert.NoError(t, err) assert.True(t, loaded) }) + t.Run("partitions loaded, some patitions not fully loaded", func(t *testing.T) { + cache := newMockCache() + cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { + return &collectionInfo{isLoaded: false}, nil + }) + globalMetaCache = cache + qc := NewQueryCoordMock() + qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { + return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 50}}, nil + }) + loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) + assert.NoError(t, err) + assert.False(t, loaded) + }) + t.Run("no specified partitions, show partitions failed", func(t *testing.T) { cache := newMockCache() cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) { @@ -1477,7 +1492,7 @@ func Test_checkIfLoaded(t *testing.T) { qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { return nil, errors.New("mock") }) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) + _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) assert.Error(t, err) }) @@ -1491,7 +1506,7 @@ func Test_checkIfLoaded(t *testing.T) { qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil }) - _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) + _, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2}) assert.Error(t, err) }) @@ -1507,7 +1522,7 @@ func Test_checkIfLoaded(t *testing.T) { }) loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{}) assert.NoError(t, err) - assert.True(t, loaded) + assert.False(t, loaded) }) t.Run("not loaded", func(t *testing.T) {