Fix partition released but can be searched (#23872)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2023-05-06 17:42:39 +08:00 committed by GitHub
parent 086f3bd748
commit f0eb5e8563
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 5 deletions

View File

@ -393,6 +393,8 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart
PartitionIDs: req.GetPartitionIDs(), PartitionIDs: req.GetPartitionIDs(),
LoadType: querypb.LoadType_LoadCollection, // TODO: dyh, remove loadType in querynode LoadType: querypb.LoadType_LoadCollection, // TODO: dyh, remove loadType in querynode
}) })
log.Info("load partitions done")
return merr.Status(nil), nil return merr.Status(nil), nil
} }
@ -483,13 +485,33 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas
} }
// ReleasePartitions clears all data related to this partition on the querynode // ReleasePartitions clears all data related to this partition on the querynode
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { log := log.Ctx(ctx).With(
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) zap.Int64("collection", req.GetCollectionID()),
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil zap.Int64s("partitions", req.GetPartitionIDs()),
)
log.Info("received release partitions request")
// check node healthy
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID())
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, nil
} }
defer node.lifetime.Done() defer node.lifetime.Done()
collection := node.manager.Collection.Get(req.GetCollectionID())
if collection != nil {
for _, partition := range req.GetPartitionIDs() {
collection.RemovePartition(partition)
}
}
log.Info("release partitions done")
return util.SuccessStatus(), nil return util.SuccessStatus(), nil
} }

View File

@ -689,10 +689,18 @@ func (suite *ServiceSuite) TestReleaseCollection_Failed() {
func (suite *ServiceSuite) TestReleasePartitions_Normal() { func (suite *ServiceSuite) TestReleasePartitions_Normal() {
ctx := context.Background() ctx := context.Background()
req := &querypb.ReleasePartitionsRequest{} suite.TestLoadPartition()
req := &querypb.ReleasePartitionsRequest{
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
}
status, err := suite.node.ReleasePartitions(ctx, req) status, err := suite.node.ReleasePartitions(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
collection := suite.node.manager.Collection.Get(suite.collectionID)
for _, partition := range suite.partitionIDs {
suite.False(collection.ExistPartition(partition))
}
} }
func (suite *ServiceSuite) TestReleasePartitions_Failed() { func (suite *ServiceSuite) TestReleasePartitions_Failed() {