From 46e0e2658b6b776f16139645c78c768e5c0d195a Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 27 Jun 2022 21:04:17 +0800 Subject: [PATCH] Move datacoord related methods from meta to globalMetaBroker (#17812) Signed-off-by: wayblink --- internal/querycoord/channel_allocator_test.go | 2 +- internal/querycoord/cluster_test.go | 6 ++-- internal/querycoord/global_meta_broker.go | 26 ++++++++++++++++ .../querycoord/global_meta_broker_test.go | 28 +++++++++++++++++ internal/querycoord/index_checker_test.go | 8 ++--- internal/querycoord/meta.go | 31 +------------------ internal/querycoord/meta_test.go | 24 +------------- internal/querycoord/query_coord.go | 2 +- internal/querycoord/segment_allocator_test.go | 2 +- internal/querycoord/task.go | 6 ++-- internal/querycoord/task_scheduler.go | 2 +- internal/querycoord/task_scheduler_test.go | 8 ++--- internal/querycoord/task_util.go | 4 +-- internal/querycoord/task_util_test.go | 13 +++++--- 14 files changed, 84 insertions(+), 78 deletions(-) diff --git a/internal/querycoord/channel_allocator_test.go b/internal/querycoord/channel_allocator_test.go index 1904947701..5cdd8955a8 100644 --- a/internal/querycoord/channel_allocator_test.go +++ b/internal/querycoord/channel_allocator_test.go @@ -49,7 +49,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(baseCtx, kv, nil, idAllocator, nil) + meta, err := newMeta(baseCtx, kv, nil, idAllocator) assert.Nil(t, err) var cluster Cluster = &queryNodeCluster{ ctx: baseCtx, diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 1a532df435..b3e09a4890 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -376,7 +376,7 @@ func TestReloadClusterFromKV(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(ctx, kv, factory, idAllocator, nil) + meta, err := newMeta(ctx, kv, factory, idAllocator) assert.Nil(t, err) cluster := &queryNodeCluster{ @@ -425,7 +425,7 @@ func TestGrpcRequest(t *testing.T) { idAllocator := func() (UniqueID, error) { return 0, nil } - meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) + meta, err := newMeta(baseCtx, kv, factory, idAllocator) assert.Nil(t, err) deltaChannelInfo := []*datapb.VchannelInfo{ { @@ -631,7 +631,7 @@ func TestSetNodeState(t *testing.T) { idAllocator := func() (UniqueID, error) { return 0, nil } - meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) + meta, err := newMeta(baseCtx, kv, factory, idAllocator) assert.Nil(t, err) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) diff --git a/internal/querycoord/global_meta_broker.go b/internal/querycoord/global_meta_broker.go index bbaa5e2a6f..e43b1a8f0f 100644 --- a/internal/querycoord/global_meta_broker.go +++ b/internal/querycoord/global_meta_broker.go @@ -531,3 +531,29 @@ func (broker *globalMetaBroker) releaseSegmentReferLock(ctx context.Context, tas return nil } + +// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord +func (broker *globalMetaBroker) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) { + var segmentInfos []*datapb.SegmentInfo + infoResp, err := broker.dataCoord.GetSegmentInfo(broker.ctx, &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyCfg.GetNodeID(), + }, + SegmentIDs: segmentIds, + IncludeUnHealthy: true, + }) + if err != nil { + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } + if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { + err = errors.New(infoResp.GetStatus().Reason) + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } + segmentInfos = infoResp.Infos + return segmentInfos, nil +} diff --git a/internal/querycoord/global_meta_broker_test.go b/internal/querycoord/global_meta_broker_test.go index a3fdffe1b3..d3cb850f2f 100644 --- a/internal/querycoord/global_meta_broker_test.go +++ b/internal/querycoord/global_meta_broker_test.go @@ -159,3 +159,31 @@ func TestGlobalMetaBroker_IndexCoord(t *testing.T) { cancel() } + +func TestGetDataSegmentInfosByIDs(t *testing.T) { + refreshParams() + ctx, cancel := context.WithCancel(context.Background()) + dataCoord := newDataCoordMock(ctx) + + cm := storage.NewLocalChunkManager(storage.RootPath(globalMetaTestDir)) + defer cm.RemoveWithPrefix("") + handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, cm) + assert.Nil(t, err) + + segmentInfos, err := handler.getDataSegmentInfosByIDs([]int64{1}) + assert.Nil(t, err) + assert.Equal(t, 1, len(segmentInfos)) + + dataCoord.returnError = true + segmentInfos2, err := handler.getDataSegmentInfosByIDs([]int64{1}) + assert.Error(t, err) + assert.Empty(t, segmentInfos2) + + dataCoord.returnError = false + dataCoord.returnGrpcError = true + segmentInfos3, err := handler.getDataSegmentInfosByIDs([]int64{1}) + assert.Error(t, err) + assert.Empty(t, segmentInfos3) + + cancel() +} diff --git a/internal/querycoord/index_checker_test.go b/internal/querycoord/index_checker_test.go index 9aef638f79..015d131ebe 100644 --- a/internal/querycoord/index_checker_test.go +++ b/internal/querycoord/index_checker_test.go @@ -47,7 +47,7 @@ func TestReloadFromKV(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(baseCtx, kv, nil, idAllocator, nil) + meta, err := newMeta(baseCtx, kv, nil, idAllocator) assert.Nil(t, err) segmentInfo := &querypb.SegmentInfo{ @@ -109,7 +109,7 @@ func TestCheckIndexLoop(t *testing.T) { return newID, nil } - meta, err := newMeta(ctx, kv, nil, idAllocator, nil) + meta, err := newMeta(ctx, kv, nil, idAllocator) assert.Nil(t, err) rootCoord := newRootCoordMock(ctx) @@ -184,7 +184,7 @@ func TestHandoffNotExistSegment(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(ctx, kv, nil, idAllocator, nil) + meta, err := newMeta(ctx, kv, nil, idAllocator) assert.Nil(t, err) rootCoord := newRootCoordMock(ctx) @@ -243,7 +243,7 @@ func TestProcessHandoffAfterIndexDone(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(ctx, kv, nil, idAllocator, nil) + meta, err := newMeta(ctx, kv, nil, idAllocator) assert.Nil(t, err) taskScheduler := &TaskScheduler{ ctx: ctx, diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 943c433e47..2972a2c49e 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -106,8 +106,6 @@ type Meta interface { getReplicasByNodeID(nodeID int64) ([]*milvuspb.ReplicaInfo, error) applyReplicaBalancePlan(p *balancePlan) error updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error - - getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) } // MetaReplica records the current load information on all querynodes @@ -137,7 +135,7 @@ type MetaReplica struct { dataCoord types.DataCoord } -func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAllocator func() (UniqueID, error), dataCoord types.DataCoord) (Meta, error) { +func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAllocator func() (UniqueID, error)) (Meta, error) { childCtx, cancel := context.WithCancel(ctx) collectionInfos := make(map[UniqueID]*querypb.CollectionInfo) queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo) @@ -157,7 +155,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl segmentsInfo: newSegmentsInfo(kv), replicas: NewReplicaInfos(), - dataCoord: dataCoord, } m.setKvClient(kv) @@ -1338,29 +1335,3 @@ func addNode2Segment(meta Meta, node UniqueID, replicas []*milvuspb.ReplicaInfo, segment.NodeIds = append(segment.NodeIds, node) } - -// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord -func (m *MetaReplica) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) { - var segmentInfos []*datapb.SegmentInfo - infoResp, err := m.dataCoord.GetSegmentInfo(m.ctx, &datapb.GetSegmentInfoRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyCfg.GetNodeID(), - }, - SegmentIDs: segmentIds, - IncludeUnHealthy: true, - }) - if err != nil { - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(infoResp.GetStatus().Reason) - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - segmentInfos = infoResp.Infos - return segmentInfos, nil -} diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index c699a8a263..1f09b8db13 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -75,7 +75,7 @@ func TestReplica_Release(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(context.Background(), etcdKV, nil, idAllocator, nil) + meta, err := newMeta(context.Background(), etcdKV, nil, idAllocator) assert.Nil(t, err) err = meta.addCollection(1, querypb.LoadType_LoadCollection, nil) require.NoError(t, err) @@ -424,25 +424,3 @@ func TestCreateQueryChannel(t *testing.T) { }) } } - -func TestGetDataSegmentInfosByIDs(t *testing.T) { - dataCoord := &dataCoordMock{} - meta := &MetaReplica{ - dataCoord: dataCoord, - } - - segmentInfos, err := meta.getDataSegmentInfosByIDs([]int64{1}) - assert.Nil(t, err) - assert.Equal(t, 1, len(segmentInfos)) - - dataCoord.returnError = true - segmentInfos2, err := meta.getDataSegmentInfosByIDs([]int64{1}) - assert.Error(t, err) - assert.Empty(t, segmentInfos2) - - dataCoord.returnError = false - dataCoord.returnGrpcError = true - segmentInfos3, err := meta.getDataSegmentInfosByIDs([]int64{1}) - assert.Error(t, err) - assert.Empty(t, segmentInfos3) -} diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 9df5b87d3b..0de9fc530a 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -159,7 +159,7 @@ func (qc *QueryCoord) Init() error { qc.factory.Init(&Params) // init meta - qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.factory, qc.idAllocator, qc.dataCoordClient) + qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.factory, qc.idAllocator) if initError != nil { log.Error("query coordinator init meta failed", zap.Error(initError)) return diff --git a/internal/querycoord/segment_allocator_test.go b/internal/querycoord/segment_allocator_test.go index b84ab80253..587e35217a 100644 --- a/internal/querycoord/segment_allocator_test.go +++ b/internal/querycoord/segment_allocator_test.go @@ -50,7 +50,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { newID := atomic.AddInt64(&id, 1) return newID, nil } - meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) + meta, err := newMeta(baseCtx, kv, factory, idAllocator) assert.Nil(t, err) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) assert.Nil(t, err) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index efb0ae1427..37fc519a04 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -483,7 +483,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { ReplicaID: replica.GetReplicaID(), } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.meta, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.broker, watchRequest) if err != nil { lct.setResultInfo(err) return err @@ -915,7 +915,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { ReplicaID: replica.GetReplicaID(), } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.meta, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.broker, watchRequest) if err != nil { lpt.setResultInfo(err) return err @@ -2035,7 +2035,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro watchRequest.PartitionIDs = toRecoverPartitionIDs } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.meta, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.broker, watchRequest) if err != nil { lbt.setResultInfo(err) return err diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 9127896ba1..8ab8102a29 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -359,7 +359,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - fullReq, err := generateFullWatchDmChannelsRequest(scheduler.meta, &req) + fullReq, err := generateFullWatchDmChannelsRequest(scheduler.broker, &req) if err != nil { return nil, err } diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index 049b3d708c..e16c0974a1 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -200,13 +200,13 @@ func TestUnMarshalTask(t *testing.T) { kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) baseCtx, cancel := context.WithCancel(context.Background()) dataCoord := &dataCoordMock{} - meta := &MetaReplica{ - dataCoord: dataCoord, - } + broker, err := newGlobalMetaBroker(baseCtx, nil, dataCoord, nil, nil) + assert.Nil(t, err) + taskScheduler := &TaskScheduler{ ctx: baseCtx, cancel: cancel, - meta: meta, + broker: broker, } t.Run("Test loadCollectionTask", func(t *testing.T) { diff --git a/internal/querycoord/task_util.go b/internal/querycoord/task_util.go index 0ca67f53be..bbb712b2c7 100644 --- a/internal/querycoord/task_util.go +++ b/internal/querycoord/task_util.go @@ -25,7 +25,7 @@ import ( ) // generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from Meta -func generateFullWatchDmChannelsRequest(m Meta, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { +func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest) vChannels := cloned.GetInfos() @@ -36,7 +36,7 @@ func generateFullWatchDmChannelsRequest(m Meta, request *querypb.WatchDmChannels segmentIds = append(segmentIds, vChannel.UnflushedSegmentIds...) segmentIds = append(segmentIds, vChannel.DroppedSegmentIds...) } - segmentInfos, err := m.getDataSegmentInfosByIDs(segmentIds) + segmentInfos, err := broker.getDataSegmentInfosByIDs(segmentIds) if err != nil { log.Error("Get Vchannel SegmentInfos failed", zap.Error(err)) return nil, err diff --git a/internal/querycoord/task_util_test.go b/internal/querycoord/task_util_test.go index cf0de21a52..9de47fb074 100644 --- a/internal/querycoord/task_util_test.go +++ b/internal/querycoord/task_util_test.go @@ -17,6 +17,7 @@ package querycoord import ( + "context" "testing" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -27,9 +28,9 @@ import ( func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { dataCoord := &dataCoordMock{} - meta := &MetaReplica{ - dataCoord: dataCoord, - } + ctx, cancel := context.WithCancel(context.Background()) + handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil) + assert.Nil(t, err) deltaChannel := &datapb.VchannelInfo{ CollectionID: defaultCollectionID, @@ -45,14 +46,16 @@ func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { NodeID: 1, } - fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(meta, watchDmChannelsRequest) + fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) assert.Nil(t, err) assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos()) dataCoord.returnError = true - fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(meta, watchDmChannelsRequest) + fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) assert.Error(t, err) assert.Empty(t, fullWatchDmChannelsRequest2.GetSegmentInfos()) + + cancel() } func TestThinWatchDmChannelsRequest(t *testing.T) {