From dfc70108c45a209b19d661ba6c98ee80da67eeff Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 18 Nov 2021 22:31:34 +0800 Subject: [PATCH] Add load collection for AssignSegmentID (#12077) Signed-off-by: Congqi Xia --- internal/datacoord/server_test.go | 21 +++++++++++++++++++++ internal/datacoord/services.go | 8 ++++++++ 2 files changed, 29 insertions(+) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 8591f3fcab..639dd2dccc 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -122,6 +122,10 @@ func TestAssignSegmentID(t *testing.T) { t.Run("assign segment with invalid collection", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) + svr.rootCoordClient = &mockDescribeCollRoot{ + RootCoord: svr.rootCoordClient, + collID: collID, + } schema := newTestSchema() svr.meta.AddCollection(&datapb.CollectionInfo{ ID: collID, @@ -145,6 +149,23 @@ func TestAssignSegmentID(t *testing.T) { }) } +type mockDescribeCollRoot struct { + types.RootCoord + collID UniqueID +} + +func (r *mockDescribeCollRoot) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + if req.CollectionID != r.collID { + return &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "Collection not found", + }, + }, nil + } + return r.RootCoord.DescribeCollection(ctx, req) +} + func TestFlush(t *testing.T) { req := &datapb.FlushRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b783e9ec4e..3224e9ad27 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -114,6 +114,14 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.String("channelName", r.GetChannelName()), zap.Uint32("count", r.GetCount())) + if s.meta.GetCollection(r.GetCollectionID()) == nil { + err := s.loadCollectionFromRootCoord(ctx, r.GetCollectionID()) + if err != nil { + log.Warn("failed to load collection in alloc segment", zap.Any("request", r), zap.Error(err)) + continue + } + } + s.cluster.Watch(r.ChannelName, r.CollectionID) allocations, err := s.segmentManager.AllocSegment(ctx,