mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
Fix CollectionNotExists when search and retrieve vector (#26532)
Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
1ea477151e
commit
b1e4142929
@ -2008,7 +2008,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
|
||||
}
|
||||
} else {
|
||||
if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
|
||||
request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
|
||||
request.GetPartitionNames(), request.GetCollectionName(), collectionID, request.GetDbName()); err != nil {
|
||||
return getErrResponse(err), nil
|
||||
}
|
||||
}
|
||||
@ -2113,7 +2113,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
|
||||
}
|
||||
} else {
|
||||
if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
|
||||
request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
|
||||
request.GetPartitionNames(), request.GetCollectionName(), collectionID, request.GetDbName()); err != nil {
|
||||
if errors.Is(err, ErrInsufficientMemory) {
|
||||
return &milvuspb.GetLoadStateResponse{
|
||||
Status: InSufficientMemoryStatus(request.GetCollectionName()),
|
||||
|
||||
@ -243,7 +243,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
||||
t.queryParams = queryParams
|
||||
t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset
|
||||
|
||||
loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.RetrieveRequest.GetPartitionIDs())
|
||||
loaded, err := checkIfLoaded(ctx, t.qc, t.request.GetDbName(), collectionName, t.RetrieveRequest.GetPartitionIDs())
|
||||
if err != nil {
|
||||
return fmt.Errorf("checkIfLoaded failed when query, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
|
||||
}
|
||||
@ -287,7 +287,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
||||
|
||||
partitionNames = append(partitionNames, hashedPartitionNames...)
|
||||
}
|
||||
t.RetrieveRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, partitionNames)
|
||||
t.RetrieveRequest.PartitionIDs, err = getPartitionIDs(ctx, t.request.GetDbName(), collectionName, partitionNames)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("failed to get partitions in collection.", zap.String("collection name", collectionName),
|
||||
zap.Error(err),
|
||||
|
||||
@ -71,14 +71,14 @@ type searchTask struct {
|
||||
node types.ProxyComponent
|
||||
}
|
||||
|
||||
func getPartitionIDs(ctx context.Context, collectionName string, partitionNames []string) (partitionIDs []UniqueID, err error) {
|
||||
func getPartitionIDs(ctx context.Context, dbName string, collectionName string, partitionNames []string) (partitionIDs []UniqueID, err error) {
|
||||
for _, tag := range partitionNames {
|
||||
if err := validatePartitionTag(tag, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
partitionsMap, err := globalMetaCache.GetPartitions(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName)
|
||||
partitionsMap, err := globalMetaCache.GetPartitions(ctx, dbName, collectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -250,7 +250,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// check if collection/partitions are loaded into query node
|
||||
loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.SearchRequest.GetPartitionIDs())
|
||||
loaded, err := checkIfLoaded(ctx, t.qc, t.request.GetDbName(), collectionName, t.SearchRequest.GetPartitionIDs())
|
||||
if err != nil {
|
||||
return fmt.Errorf("checkIfLoaded failed when search, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
|
||||
}
|
||||
@ -372,7 +372,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// translate partition name to partition ids. Use regex-pattern to match partition name.
|
||||
t.SearchRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, partitionNames)
|
||||
t.SearchRequest.PartitionIDs, err = getPartitionIDs(ctx, t.request.GetDbName(), collectionName, partitionNames)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -623,6 +623,7 @@ func (t *searchTask) Requery() error {
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Retrieve,
|
||||
},
|
||||
DbName: t.request.GetDbName(),
|
||||
CollectionName: t.request.GetCollectionName(),
|
||||
Expr: expr,
|
||||
OutputFields: t.request.GetOutputFields(),
|
||||
@ -728,10 +729,10 @@ func (t *searchTask) collectSearchResults(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// checkIfLoaded check if collection was loaded into QueryNode
|
||||
func checkIfLoaded(ctx context.Context, qc types.QueryCoord, collectionName string, searchPartitionIDs []UniqueID) (bool, error) {
|
||||
info, err := globalMetaCache.GetCollectionInfo(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName)
|
||||
func checkIfLoaded(ctx context.Context, qc types.QueryCoord, dbName string, collectionName string, searchPartitionIDs []UniqueID) (bool, error) {
|
||||
info, err := globalMetaCache.GetCollectionInfo(ctx, dbName, collectionName)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("GetCollectionInfo failed, collection = %s, err = %s", collectionName, err)
|
||||
return false, fmt.Errorf("GetCollectionInfo failed, dbName = %s, collection = %s, err = %s", dbName, collectionName, err)
|
||||
}
|
||||
if info.isLoaded {
|
||||
return true, nil
|
||||
@ -750,10 +751,10 @@ func checkIfLoaded(ctx context.Context, qc types.QueryCoord, collectionName stri
|
||||
PartitionIDs: searchPartitionIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err)
|
||||
return false, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, err = %s", dbName, collectionName, searchPartitionIDs, err)
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
return false, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, reason = %s", dbName, collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
}
|
||||
|
||||
for _, persent := range resp.InMemoryPercentages {
|
||||
|
||||
@ -1616,7 +1616,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
).Return(nil, errors.New("error mock GetCollectionInfo"))
|
||||
globalMetaCache = cache
|
||||
var qc types.QueryCoord
|
||||
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1629,7 +1629,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
).Return(&collectionInfo{isLoaded: true}, nil)
|
||||
globalMetaCache = cache
|
||||
var qc types.QueryCoord
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, loaded)
|
||||
})
|
||||
@ -1646,7 +1646,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return nil, errors.New("mock")
|
||||
})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1662,7 +1662,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil
|
||||
})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1678,7 +1678,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 100}}, nil
|
||||
})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, loaded)
|
||||
})
|
||||
@ -1695,7 +1695,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 50}}, nil
|
||||
})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, loaded)
|
||||
})
|
||||
@ -1712,7 +1712,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return nil, errors.New("mock")
|
||||
})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1728,7 +1728,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil
|
||||
})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
|
||||
_, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{1, 2})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1744,7 +1744,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{1, 2}}, nil
|
||||
})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{})
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, loaded)
|
||||
})
|
||||
@ -1761,7 +1761,7 @@ func Test_checkIfLoaded(t *testing.T) {
|
||||
qc.SetShowPartitionsFunc(func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
|
||||
return &querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{}}, nil
|
||||
})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
|
||||
loaded, err := checkIfLoaded(context.Background(), qc, "", "test", []UniqueID{})
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, loaded)
|
||||
})
|
||||
|
||||
@ -116,7 +116,7 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
partIDs, err := getPartitionIDs(ctx, g.collectionName, g.partitionNames)
|
||||
partIDs, err := getPartitionIDs(ctx, g.request.GetDbName(), g.collectionName, g.partitionNames)
|
||||
if err != nil { // err is not nil if partition not exists
|
||||
return err
|
||||
}
|
||||
@ -141,7 +141,7 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// check if collection/partitions are loaded into query node
|
||||
loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.collectionName, partIDs)
|
||||
loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.request.GetDbName(), g.collectionName, partIDs)
|
||||
if err != nil {
|
||||
g.fromDataCoord = true
|
||||
g.unloadedPartitionIDs = partIDs
|
||||
@ -319,14 +319,14 @@ func (g *getStatisticsTask) getStatisticsShard(ctx context.Context, nodeID int64
|
||||
|
||||
// checkFullLoaded check if collection / partition was fully loaded into QueryNode
|
||||
// return loaded partitions, unloaded partitions and error
|
||||
func checkFullLoaded(ctx context.Context, qc types.QueryCoord, collectionName string, searchPartitionIDs []UniqueID) ([]UniqueID, []UniqueID, error) {
|
||||
func checkFullLoaded(ctx context.Context, qc types.QueryCoord, dbName string, collectionName string, searchPartitionIDs []UniqueID) ([]UniqueID, []UniqueID, error) {
|
||||
var loadedPartitionIDs []UniqueID
|
||||
var unloadPartitionIDs []UniqueID
|
||||
|
||||
// TODO: Consider to check if partition loaded from cache to save rpc.
|
||||
info, err := globalMetaCache.GetCollectionInfo(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName)
|
||||
info, err := globalMetaCache.GetCollectionInfo(ctx, dbName, collectionName)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("GetCollectionInfo failed, collection = %s, err = %s", collectionName, err)
|
||||
return nil, nil, fmt.Errorf("GetCollectionInfo failed, dbName = %s, collection = %s, err = %s", dbName, collectionName, err)
|
||||
}
|
||||
|
||||
// If request to search partitions
|
||||
@ -340,10 +340,10 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, collectionName st
|
||||
PartitionIDs: searchPartitionIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err)
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, err = %s", dbName, collectionName, searchPartitionIDs, err)
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, reason = %s", dbName, collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
}
|
||||
|
||||
for i, percentage := range resp.GetInMemoryPercentages() {
|
||||
@ -365,10 +365,10 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, collectionName st
|
||||
CollectionID: info.collID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err)
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, err = %s", dbName, collectionName, searchPartitionIDs, err)
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
return nil, nil, fmt.Errorf("showPartitions failed, dbName = %s, collection = %s, partitionIDs = %v, reason = %s", dbName, collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
|
||||
}
|
||||
|
||||
loadedMap := make(map[UniqueID]bool)
|
||||
|
||||
@ -1072,13 +1072,18 @@ func getCollectionProgress(ctx context.Context, queryCoord types.QueryCoord,
|
||||
return resp.InMemoryPercentages[0], nil
|
||||
}
|
||||
|
||||
func getPartitionProgress(ctx context.Context, queryCoord types.QueryCoord,
|
||||
msgBase *commonpb.MsgBase, partitionNames []string, collectionName string, collectionID int64,
|
||||
func getPartitionProgress(ctx context.Context,
|
||||
queryCoord types.QueryCoord,
|
||||
msgBase *commonpb.MsgBase,
|
||||
partitionNames []string,
|
||||
collectionName string,
|
||||
collectionID int64,
|
||||
dbname string,
|
||||
) (int64, error) {
|
||||
IDs2Names := make(map[int64]string)
|
||||
partitionIDs := make([]int64, 0)
|
||||
for _, partitionName := range partitionNames {
|
||||
partitionID, err := globalMetaCache.GetPartitionID(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName, partitionName)
|
||||
partitionID, err := globalMetaCache.GetPartitionID(ctx, dbname, collectionName, partitionName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user