diff --git a/internal/datanode/broker/datacoord.go b/internal/datanode/broker/datacoord.go index dc7a4f2feb..98bbc99660 100644 --- a/internal/datanode/broker/datacoord.go +++ b/internal/datanode/broker/datacoord.go @@ -2,6 +2,7 @@ package broker import ( "context" + "math" "time" "github.com/samber/lo" @@ -15,6 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -62,30 +64,48 @@ func (dc *dataCoordBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.Dat return nil } -func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) { - log := log.Ctx(ctx).With( - zap.Int64s("segmentIDs", segmentIDs), - ) +func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*datapb.SegmentInfo, error) { + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() - infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), - commonpbutil.WithSourceID(dc.serverID), - ), - SegmentIDs: segmentIDs, - IncludeUnHealthy: true, - }) - if err := merr.CheckRPCCall(infoResp, err); err != nil { - log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - err = binlog.DecompressMultiBinLogs(infoResp.GetInfos()) - if err != nil { - log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err)) - return nil, err + log := log.Ctx(ctx).With(zap.Int64s("segments", ids)) + + getSegmentInfo := func(ids []int64) (*datapb.GetSegmentInfoResponse, error) { + infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), + commonpbutil.WithSourceID(dc.serverID), + ), + SegmentIDs: ids, + IncludeUnHealthy: true, + }) + if err := merr.CheckRPCCall(infoResp, err); err != nil { + log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } + err = binlog.DecompressMultiBinLogs(infoResp.GetInfos()) + if err != nil { + log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err)) + return nil, err + } + return infoResp, nil } - return infoResp.Infos, nil + ret := make([]*datapb.SegmentInfo, 0, len(ids)) + batchSize := 1000 + startIdx := 0 + for startIdx < len(ids) { + endIdx := int(math.Min(float64(startIdx+batchSize), float64(len(ids)))) + + resp, err := getSegmentInfo(ids[startIdx:endIdx]) + if err != nil { + return nil, err + } + ret = append(ret, resp.GetInfos()...) + startIdx += batchSize + } + + return ret, nil } func (dc *dataCoordBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error { diff --git a/internal/datanode/channel_manager.go b/internal/datanode/channel_manager.go index c3036cc9d6..c114cbcef2 100644 --- a/internal/datanode/channel_manager.go +++ b/internal/datanode/channel_manager.go @@ -408,6 +408,7 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState { defer finishWaiter.Done() fg, err := r.watchFunc(ctx, r.dn, info, tickler) if err != nil { + log.Warn("failed to watch channel", zap.Error(err)) opState.state = datapb.ChannelWatchState_WatchFailure } else { opState.state = datapb.ChannelWatchState_WatchSuccess diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index dc237bd7ac..b0ef69c389 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -19,6 +19,7 @@ package meta import ( "context" "fmt" + "math" "time" "github.com/cockroachdb/errors" @@ -46,7 +47,7 @@ type Broker interface { GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) - GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error) + GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) @@ -255,35 +256,54 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti return recoveryInfo.Channels, recoveryInfo.Segments, nil } -func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error) { +func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) ([]*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() + log := log.Ctx(ctx).With( zap.Int64s("segments", ids), ) - req := &datapb.GetSegmentInfoRequest{ - SegmentIDs: ids, - IncludeUnHealthy: true, - } - resp, err := broker.dataCoord.GetSegmentInfo(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to get segment info from DataCoord", zap.Error(err)) - return nil, err + getSegmentInfo := func(ids []UniqueID) (*datapb.GetSegmentInfoResponse, error) { + req := &datapb.GetSegmentInfoRequest{ + SegmentIDs: ids, + IncludeUnHealthy: true, + } + resp, err := broker.dataCoord.GetSegmentInfo(ctx, req) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get segment info from DataCoord", zap.Error(err)) + return nil, err + } + + if len(resp.Infos) == 0 { + log.Warn("No such segment in DataCoord") + return nil, fmt.Errorf("no such segment in DataCoord") + } + + err = binlog.DecompressMultiBinLogs(resp.GetInfos()) + if err != nil { + log.Warn("failed to DecompressMultiBinLogs", zap.Error(err)) + return nil, err + } + + return resp, nil } - if len(resp.Infos) == 0 { - log.Warn("No such segment in DataCoord") - return nil, fmt.Errorf("no such segment in DataCoord") + ret := make([]*datapb.SegmentInfo, 0, len(ids)) + batchSize := 1000 + startIdx := 0 + for startIdx < len(ids) { + endIdx := int(math.Min(float64(startIdx+batchSize), float64(len(ids)))) + + resp, err := getSegmentInfo(ids[startIdx:endIdx]) + if err != nil { + return nil, err + } + ret = append(ret, resp.GetInfos()...) + startIdx += batchSize } - err = binlog.DecompressMultiBinLogs(resp.GetInfos()) - if err != nil { - log.Warn("failed to DecompressMultiBinLogs", zap.Error(err)) - return nil, err - } - - return resp, nil + return ret, nil } func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) { diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index dbecfc20a2..728b430cc6 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -392,9 +392,9 @@ func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() { }), }, nil) - resp, err := s.broker.GetSegmentInfo(ctx, segmentIDs...) + infos, err := s.broker.GetSegmentInfo(ctx, segmentIDs...) s.NoError(err) - s.ElementsMatch(segmentIDs, lo.Map(resp.GetInfos(), func(info *datapb.SegmentInfo, _ int) int64 { + s.ElementsMatch(segmentIDs, lo.Map(infos, func(info *datapb.SegmentInfo, _ int) int64 { return info.GetID() })) s.resetMock() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index a940aff58b..88389fced0 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -458,7 +458,7 @@ func (_c *MockBroker_GetRecoveryInfoV2_Call) RunAndReturn(run func(context.Conte } // GetSegmentInfo provides a mock function with given fields: ctx, segmentID -func (_m *MockBroker) GetSegmentInfo(ctx context.Context, segmentID ...int64) (*datapb.GetSegmentInfoResponse, error) { +func (_m *MockBroker) GetSegmentInfo(ctx context.Context, segmentID ...int64) ([]*datapb.SegmentInfo, error) { _va := make([]interface{}, len(segmentID)) for _i := range segmentID { _va[_i] = segmentID[_i] @@ -468,16 +468,16 @@ func (_m *MockBroker) GetSegmentInfo(ctx context.Context, segmentID ...int64) (* _ca = append(_ca, _va...) ret := _m.Called(_ca...) - var r0 *datapb.GetSegmentInfoResponse + var r0 []*datapb.SegmentInfo var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ...int64) (*datapb.GetSegmentInfoResponse, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, ...int64) ([]*datapb.SegmentInfo, error)); ok { return rf(ctx, segmentID...) } - if rf, ok := ret.Get(0).(func(context.Context, ...int64) *datapb.GetSegmentInfoResponse); ok { + if rf, ok := ret.Get(0).(func(context.Context, ...int64) []*datapb.SegmentInfo); ok { r0 = rf(ctx, segmentID...) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*datapb.GetSegmentInfoResponse) + r0 = ret.Get(0).([]*datapb.SegmentInfo) } } @@ -516,12 +516,12 @@ func (_c *MockBroker_GetSegmentInfo_Call) Run(run func(ctx context.Context, segm return _c } -func (_c *MockBroker_GetSegmentInfo_Call) Return(_a0 *datapb.GetSegmentInfoResponse, _a1 error) *MockBroker_GetSegmentInfo_Call { +func (_c *MockBroker_GetSegmentInfo_Call) Return(_a0 []*datapb.SegmentInfo, _a1 error) *MockBroker_GetSegmentInfo_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context, ...int64) (*datapb.GetSegmentInfoResponse, error)) *MockBroker_GetSegmentInfo_Call { +func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context, ...int64) ([]*datapb.SegmentInfo, error)) *MockBroker_GetSegmentInfo_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 1ee4df6a9c..76cbea92c1 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -674,12 +674,12 @@ func (ex *Executor) getMetaInfo(ctx context.Context, task Task) (*milvuspb.Descr func (ex *Executor) getLoadInfo(ctx context.Context, collectionID, segmentID int64, channel *meta.DmChannel) (*querypb.SegmentLoadInfo, []*indexpb.IndexInfo, error) { log := log.Ctx(ctx) - resp, err := ex.broker.GetSegmentInfo(ctx, segmentID) - if err != nil || len(resp.GetInfos()) == 0 { + segmentInfos, err := ex.broker.GetSegmentInfo(ctx, segmentID) + if err != nil || len(segmentInfos) == 0 { log.Warn("failed to get segment info from DataCoord", zap.Error(err)) return nil, nil, err } - segment := resp.GetInfos()[0] + segment := segmentInfos[0] log = log.With(zap.String("level", segment.GetLevel().String())) indexes, err := ex.broker.GetIndexInfo(ctx, collectionID, segment.GetID()) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 5585a82602..58b2f59fc2 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -228,14 +228,12 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { }) for channel, segment := range suite.growingSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment). - Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partitions[0], - InsertChannel: channel, - }, + Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partitions[0], + InsertChannel: channel, }, }, nil) } @@ -423,14 +421,12 @@ func (suite *TaskSuite) TestLoadSegmentTask() { }, }, nil) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) @@ -525,14 +521,12 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { }, }, nil) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFoundForSegment(segment)) @@ -621,14 +615,12 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { }, nil }) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, errors.New("index not ready")) @@ -829,14 +821,12 @@ func (suite *TaskSuite) TestMoveSegmentTask() { }, }, nil) for _, segment := range suite.moveSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) @@ -1003,14 +993,12 @@ func (suite *TaskSuite) TestTaskCanceled() { }, }, nil) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) @@ -1095,14 +1083,12 @@ func (suite *TaskSuite) TestSegmentTaskStale() { }, }, nil) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) @@ -1277,14 +1263,12 @@ func (suite *TaskSuite) TestLeaderTaskSet() { }, }, nil) for _, segment := range suite.loadSegments { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, }, }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index 799f95e29b..6bf9a289ce 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -21,6 +21,8 @@ import ( "fmt" "time" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -233,15 +235,14 @@ func fillSubChannelRequest( return nil } - resp, err := broker.GetSegmentInfo(ctx, segmentIDs.Collect()...) + segmentInfos, err := broker.GetSegmentInfo(ctx, segmentIDs.Collect()...) if err != nil { return err } - segmentInfos := make(map[int64]*datapb.SegmentInfo) - for _, info := range resp.GetInfos() { - segmentInfos[info.GetID()] = info - } - req.SegmentInfos = segmentInfos + + req.SegmentInfos = lo.SliceToMap(segmentInfos, func(info *datapb.SegmentInfo) (int64, *datapb.SegmentInfo) { + return info.GetID(), info + }) return nil }