From fe8432016da41598752732f837465ed4977d53a4 Mon Sep 17 00:00:00 2001 From: sunby Date: Wed, 23 Jun 2021 12:10:12 +0800 Subject: [PATCH] Refactor data coordinator (#6008) Signed-off-by: sunby --- internal/datacoord/OWNERS | 2 +- internal/datacoord/allocator.go | 8 ++++---- internal/datacoord/cluster.go | 4 +++- internal/datacoord/server.go | 8 ++++---- internal/datacoord/util.go | 7 +++++-- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/internal/datacoord/OWNERS b/internal/datacoord/OWNERS index 0e77925e3a..6c42718959 100644 --- a/internal/datacoord/OWNERS +++ b/internal/datacoord/OWNERS @@ -11,5 +11,5 @@ approvers: - scsven labels: -- component/dataservice +- component/datacoord diff --git a/internal/datacoord/allocator.go b/internal/datacoord/allocator.go index 8502cd845f..8e8742de44 100644 --- a/internal/datacoord/allocator.go +++ b/internal/datacoord/allocator.go @@ -40,8 +40,8 @@ func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) { resp, err := alloc.rootCoordClient.AllocTimestamp(alloc.ctx, &rootcoordpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestTSO, - MsgID: -1, // todo add msg id - Timestamp: 0, // todo + MsgID: 0, + Timestamp: 0, SourceID: Params.NodeID, }, Count: 1, @@ -56,8 +56,8 @@ func (alloc *rootCoordAllocator) allocID() (UniqueID, error) { resp, err := alloc.rootCoordClient.AllocID(alloc.ctx, &rootcoordpb.AllocIDRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestID, - MsgID: -1, // todo add msg id - Timestamp: 0, // todo + MsgID: 0, + Timestamp: 0, SourceID: Params.NodeID, }, Count: 1, diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 52df6a5790..3c8af986f4 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -78,7 +78,9 @@ func defaultAssignPolicy() channelAssignPolicy { return newBalancedAssignPolicy() } -func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster { +func newCluster(ctx context.Context, dataManager *clusterNodeManager, + sessionManager sessionManager, posProvider positionProvider, + opts ...clusterOption) *cluster { c := &cluster{ ctx: ctx, sessionManager: sessionManager, diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 07863468b8..f8edb3d23c 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -512,8 +512,8 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, - MsgID: -1, // todo - Timestamp: 0, // todo + MsgID: 0, + Timestamp: 0, SourceID: Params.NodeID, }, DbName: "", @@ -556,8 +556,8 @@ func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack { completeFlushMsg := internalpb.SegmentFlushCompletedMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_SegmentFlushDone, - MsgID: 0, // TODO - Timestamp: 0, // TODO + MsgID: 0, + Timestamp: 0, SourceID: Params.NodeID, }, SegmentID: segmentID, diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index be3e581331..0e95270589 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -20,12 +20,15 @@ type Response interface { GetStatus() *commonpb.Status } +var errNilResponse = errors.New("response is nil") +var errUnknownResponseType = errors.New("unknown response type") + func VerifyResponse(response interface{}, err error) error { if err != nil { return err } if response == nil { - return errors.New("response is nil") + return errNilResponse } switch resp := response.(type) { case Response: @@ -37,7 +40,7 @@ func VerifyResponse(response interface{}, err error) error { return errors.New(resp.Reason) } default: - return errors.New("unknown response type") + return errUnknownResponseType } return nil }