From 6fd83464cd0a71c3a98e294dbae91c2efde42f83 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 23 Aug 2022 17:44:56 +0800 Subject: [PATCH] Fix watchDmChannel may be out-of-date after compaction issue and add context (#18681) Signed-off-by: wayblink Signed-off-by: wayblink --- internal/datacoord/services.go | 7 ++- internal/querycoord/global_meta_broker.go | 4 +- .../querycoord/global_meta_broker_test.go | 6 +- .../querycoord/mock_3rd_component_test.go | 36 +++++++++-- internal/querycoord/task.go | 6 +- internal/querycoord/task_scheduler.go | 2 +- internal/querycoord/task_util.go | 60 +++++++++++++++++-- internal/querycoord/task_util_test.go | 42 +++++++++++-- internal/querynode/task.go | 9 ++- 9 files changed, 143 insertions(+), 29 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index da739fde36..ddee89bad9 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -318,9 +318,12 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR var info *SegmentInfo if req.IncludeUnHealthy { info = s.meta.GetAllSegment(id) - if info != nil { - infos = append(infos, info.SegmentInfo) + if info == nil { + log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) + resp.Status.Reason = fmt.Sprintf("failed to get segment %d", id) + return resp, nil } + infos = append(infos, info.SegmentInfo) } else { info = s.meta.GetSegment(id) if info == nil { diff --git a/internal/querycoord/global_meta_broker.go b/internal/querycoord/global_meta_broker.go index e31fe3a518..7718e5fda0 100644 --- a/internal/querycoord/global_meta_broker.go +++ b/internal/querycoord/global_meta_broker.go @@ -534,9 +534,9 @@ func (broker *globalMetaBroker) releaseSegmentReferLock(ctx context.Context, tas } // getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord -func (broker *globalMetaBroker) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) { +func (broker *globalMetaBroker) getDataSegmentInfosByIDs(ctx context.Context, segmentIds []int64) ([]*datapb.SegmentInfo, error) { var segmentInfos []*datapb.SegmentInfo - infoResp, err := broker.dataCoord.GetSegmentInfo(broker.ctx, &datapb.GetSegmentInfoRequest{ + infoResp, err := broker.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_SegmentInfo, MsgID: 0, diff --git a/internal/querycoord/global_meta_broker_test.go b/internal/querycoord/global_meta_broker_test.go index d3cb850f2f..3e62f3b437 100644 --- a/internal/querycoord/global_meta_broker_test.go +++ b/internal/querycoord/global_meta_broker_test.go @@ -170,18 +170,18 @@ func TestGetDataSegmentInfosByIDs(t *testing.T) { handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, cm) assert.Nil(t, err) - segmentInfos, err := handler.getDataSegmentInfosByIDs([]int64{1}) + segmentInfos, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1}) assert.Nil(t, err) assert.Equal(t, 1, len(segmentInfos)) dataCoord.returnError = true - segmentInfos2, err := handler.getDataSegmentInfosByIDs([]int64{1}) + segmentInfos2, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1}) assert.Error(t, err) assert.Empty(t, segmentInfos2) dataCoord.returnError = false dataCoord.returnGrpcError = true - segmentInfos3, err := handler.getDataSegmentInfosByIDs([]int64{1}) + segmentInfos3, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1}) assert.Error(t, err) assert.Empty(t, segmentInfos3) diff --git a/internal/querycoord/mock_3rd_component_test.go b/internal/querycoord/mock_3rd_component_test.go index 1d557ffe10..0b45767c6d 100644 --- a/internal/querycoord/mock_3rd_component_test.go +++ b/internal/querycoord/mock_3rd_component_test.go @@ -22,19 +22,19 @@ import ( "fmt" "sync" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/etcdpb" + "go.uber.org/atomic" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -334,6 +334,7 @@ type dataCoordMock struct { channelNumPerCol int returnError bool returnGrpcError bool + returnErrorCount atomic.Int32 segmentState commonpb.SegmentState errLevel int @@ -376,14 +377,27 @@ func (data *dataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetR }, nil } + if data.returnErrorCount.Load() > 0 { + data.returnErrorCount.Dec() + return &datapb.GetRecoveryInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "limited get recovery info failed", + }, + }, nil + } + if _, ok := data.col2DmChannels[collectionID]; !ok { channelInfos := make([]*datapb.VchannelInfo, 0) data.collections = append(data.collections, collectionID) for i := int32(0); i < common.DefaultShardsNum; i++ { vChannel := fmt.Sprintf("%s_%d_%dv%d", Params.CommonCfg.RootCoordDml, i, collectionID, i) channelInfo := &datapb.VchannelInfo{ - CollectionID: collectionID, - ChannelName: vChannel, + CollectionID: collectionID, + ChannelName: vChannel, + UnflushedSegmentIds: []int64{int64(i*1000 + 1)}, + FlushedSegmentIds: []int64{int64(i*1000 + 2)}, + DroppedSegmentIds: []int64{int64(i*1000 + 3)}, SeekPosition: &internalpb.MsgPosition{ ChannelName: vChannel, }, @@ -528,6 +542,16 @@ func (data *dataCoordMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSe }, nil } + if data.returnErrorCount.Load() > 0 { + data.returnErrorCount.Dec() + return &datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "limited mock get segmentInfo failed", + }, + }, nil + } + var segmentInfos []*datapb.SegmentInfo for _, segmentID := range req.SegmentIDs { segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 43d86ec5f4..ea35808b86 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -514,7 +514,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { ReplicaID: replica.GetReplicaID(), } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.broker, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lct.broker, watchRequest) if err != nil { lct.setResultInfo(err) return err @@ -1008,7 +1008,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { ReplicaID: replica.GetReplicaID(), } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.broker, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lpt.broker, watchRequest) if err != nil { lpt.setResultInfo(err) return err @@ -2059,7 +2059,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro watchRequest.PartitionIDs = toRecoverPartitionIDs } - fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.broker, watchRequest) + fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lbt.broker, watchRequest) if err != nil { lbt.setResultInfo(err) return err diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 3e377609cc..b6354a06fe 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -393,7 +393,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - fullReq, err := generateFullWatchDmChannelsRequest(scheduler.broker, &req) + fullReq, err := generateFullWatchDmChannelsRequest(baseTask.traceCtx(), scheduler.broker, &req) if err != nil { return nil, err } diff --git a/internal/querycoord/task_util.go b/internal/querycoord/task_util.go index 4971a3063e..172374cf39 100644 --- a/internal/querycoord/task_util.go +++ b/internal/querycoord/task_util.go @@ -17,15 +17,18 @@ package querycoord import ( + "context" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/retry" "go.uber.org/zap" ) // generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from meta broker -func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { +func generateFullWatchDmChannelsRequest(ctx context.Context, broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest) vChannels := cloned.GetInfos() @@ -34,6 +37,49 @@ func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *query reviseVChannelInfo(vChannel) } + vChannelDict := make(map[string]bool, len(vChannels)) + for _, info := range vChannels { + vChannelDict[info.ChannelName] = true + } + var segmentInfos []*datapb.SegmentInfo + + // if the return segmentInfos is less than required, this may because the segment is compacted. + // refresh the vchannels and segmentInfos needed. + retryFunc := func() error { + newVChannels := make([]*datapb.VchannelInfo, 0) + newSegmentIds := make([]int64, 0) + + newVChannelDict := make(map[string]bool) + for _, partitionID := range request.GetLoadMeta().GetPartitionIDs() { + partitionVChannels, _, err := broker.getRecoveryInfo(ctx, request.GetCollectionID(), partitionID) + if err != nil { + log.Error("GetRecoveryInfo failed, retrying...", zap.Error(err)) + return err + } + for _, vchannel := range partitionVChannels { + if vChannelDict[vchannel.GetChannelName()] && !newVChannelDict[vchannel.GetChannelName()] { + newVChannels = append(newVChannels, vchannel) + newVChannelDict[vchannel.GetChannelName()] = true + } + } + } + + for _, vChannel := range newVChannels { + newSegmentIds = append(newSegmentIds, vChannel.FlushedSegmentIds...) + newSegmentIds = append(newSegmentIds, vChannel.UnflushedSegmentIds...) + newSegmentIds = append(newSegmentIds, vChannel.DroppedSegmentIds...) + } + newSegmentInfos, err := broker.getDataSegmentInfosByIDs(ctx, newSegmentIds) + if err != nil { + log.Error("Get Vchannel SegmentInfos failed, retrying...", zap.Error(err)) + return err + } + + cloned.Infos = newVChannels + segmentInfos = newSegmentInfos + return nil + } + // fill segmentInfos segmentIds := make([]int64, 0) for _, vChannel := range vChannels { @@ -41,18 +87,24 @@ func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *query segmentIds = append(segmentIds, vChannel.UnflushedSegmentIds...) segmentIds = append(segmentIds, vChannel.DroppedSegmentIds...) } - segmentInfos, err := broker.getDataSegmentInfosByIDs(segmentIds) + segmentInfos, err := broker.getDataSegmentInfosByIDs(ctx, segmentIds) + if err != nil { log.Error("Get Vchannel SegmentInfos failed", zap.Error(err)) - return nil, err + retryErr := retry.Do(ctx, retryFunc, retry.Attempts(20)) + if retryErr != nil { + log.Error("Get Vchannel SegmentInfos failed after retry", zap.Error(retryErr)) + return nil, retryErr + } } + segmentDict := make(map[int64]*datapb.SegmentInfo) for _, info := range segmentInfos { segmentDict[info.ID] = info } cloned.SegmentInfos = segmentDict - return cloned, err + return cloned, nil } // thinWatchDmChannelsRequest will return a thin version of WatchDmChannelsRequest diff --git a/internal/querycoord/task_util_test.go b/internal/querycoord/task_util_test.go index edd8a986d3..ac92f024d7 100644 --- a/internal/querycoord/task_util_test.go +++ b/internal/querycoord/task_util_test.go @@ -27,8 +27,8 @@ import ( ) func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { - dataCoord := &dataCoordMock{} ctx, cancel := context.WithCancel(context.Background()) + dataCoord := newDataCoordMock(ctx) handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil) assert.Nil(t, err) @@ -46,12 +46,12 @@ func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { NodeID: 1, } - fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) + fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest) assert.Nil(t, err) assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos()) dataCoord.returnError = true - fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) + fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest) assert.Error(t, err) assert.Empty(t, fullWatchDmChannelsRequest2.GetSegmentInfos()) @@ -88,8 +88,8 @@ func TestThinWatchDmChannelsRequest(t *testing.T) { } func TestUpgradeCompatibility(t *testing.T) { - dataCoord := &dataCoordMock{} ctx, cancel := context.WithCancel(context.Background()) + dataCoord := newDataCoordMock(ctx) handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil) assert.Nil(t, err) @@ -110,7 +110,7 @@ func TestUpgradeCompatibility(t *testing.T) { NodeID: 1, } - fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) + fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest) assert.Nil(t, err) assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos()) vChannel := fullWatchDmChannelsRequest.GetInfos()[0] @@ -124,3 +124,35 @@ func TestUpgradeCompatibility(t *testing.T) { assert.Equal(t, 1, len(vChannel.GetUnflushedSegmentIds())) cancel() } + +func TestGetMissSegment(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + dataCoord := newDataCoordMock(ctx) + broker, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil) + assert.Nil(t, err) + + vChannels, _, err := broker.getRecoveryInfo(ctx, defaultCollectionID, 0) + assert.Nil(t, err) + + watchDmChannelsRequest := &querypb.WatchDmChannelsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchDmChannels, + }, + CollectionID: defaultCollectionID, + PartitionIDs: []int64{1}, + Infos: vChannels, + NodeID: 1, + LoadMeta: &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + CollectionID: defaultCollectionID, + PartitionIDs: []int64{1}, + }, + } + + // inject certain number of error + dataCoord.returnErrorCount.Store(3) + + _, err = generateFullWatchDmChannelsRequest(ctx, broker, watchDmChannelsRequest) + assert.NoError(t, err) + cancel() +} diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 9e2117512d..dd2e36cefb 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -170,8 +170,11 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { for _, info := range w.req.Infos { for _, ufInfoID := range info.GetUnflushedSegmentIds() { // unFlushed segment may not have binLogs, skip loading - ufInfo := w.req.SegmentInfos[ufInfoID] - if len(ufInfo.Binlogs) > 0 { + ufInfo := w.req.GetSegmentInfos()[ufInfoID] + if ufInfo == nil { + log.Warn("an unflushed segment is not found in segment infos", zap.Int64("segment ID", ufInfoID)) + } + if len(ufInfo.GetBinlogs()) > 0 { unFlushedSegments = append(unFlushedSegments, &queryPb.SegmentLoadInfo{ SegmentID: ufInfo.ID, PartitionID: ufInfo.PartitionID, @@ -182,7 +185,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { Deltalogs: ufInfo.Deltalogs, InsertChannel: ufInfo.InsertChannel, }) - unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.ID) + unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.GetID()) } } }