From f054fc9be25fe6dde7815a0403677ffc7219730d Mon Sep 17 00:00:00 2001 From: sunby Date: Mon, 7 Jun 2021 09:47:36 +0800 Subject: [PATCH] Add GetRecoveryInfo in dataservice (#5629) When loading a partition, QueryNode fetches the binlogs need to be loaded and the channels needed to be watched by GetRecoveryInfo Signed-off-by: sunby --- internal/dataservice/binlog_helper.go | 2 +- internal/dataservice/cluster.go | 1 - internal/dataservice/datanode_helper.go | 1 - internal/dataservice/grpc_services.go | 81 +++++++ internal/dataservice/mock_test.go | 3 +- internal/dataservice/param.go | 22 +- internal/dataservice/server.go | 3 - internal/dataservice/server_test.go | 126 +++++++++- internal/proto/data_service.proto | 26 +-- internal/proto/datapb/data_service.pb.go | 282 +++++++++++------------ 10 files changed, 362 insertions(+), 185 deletions(-) diff --git a/internal/dataservice/binlog_helper.go b/internal/dataservice/binlog_helper.go index 9c8c74276b..5df6b93558 100644 --- a/internal/dataservice/binlog_helper.go +++ b/internal/dataservice/binlog_helper.go @@ -206,7 +206,7 @@ func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, e } unflushedCheckpoints = append(unflushedCheckpoints, cp) - if seekPosition == nil || (useUnflushedPosition && s.DmlPosition.Timestamp < seekPosition.Timestamp) { + if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp { useUnflushedPosition = true seekPosition = s.DmlPosition } diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 7042113141..ec67b64aa6 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -113,7 +113,6 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo { uncompletes = append(uncompletes, vchannel{ CollectionID: ch.CollectionID, DmlChannel: ch.Name, - DdlChannel: c.posProvider.GetDdlChannel(), }) } } diff --git a/internal/dataservice/datanode_helper.go b/internal/dataservice/datanode_helper.go index 25ae0444c1..ff74012d89 100644 --- a/internal/dataservice/datanode_helper.go +++ b/internal/dataservice/datanode_helper.go @@ -18,7 +18,6 @@ import ( type vchannel struct { CollectionID UniqueID DmlChannel string - DdlChannel string } // positionProvider provides vchannel pair related position pairs diff --git a/internal/dataservice/grpc_services.go b/internal/dataservice/grpc_services.go index 2be2d11f3e..3b89943dc1 100644 --- a/internal/dataservice/grpc_services.go +++ b/internal/dataservice/grpc_services.go @@ -353,6 +353,87 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS return resp, nil } +func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { + collectionID := req.GetCollectionID() + partitionID := req.GetPartitionID() + log.Info("Receive get recovery info request", zap.Int64("collectionID", collectionID), + zap.Int64("partitionID", partitionID)) + resp := &datapb.GetRecoveryInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + } + segmentIDs := s.meta.GetSegmentsOfPartition(collectionID, partitionID) + segment2Binlogs := make(map[UniqueID][]*datapb.FieldBinlog) + for _, id := range segmentIDs { + meta, err := s.getSegmentBinlogMeta(id) + if err != nil { + log.Error("Get segment binlog meta failed", zap.Int64("segmentID", id)) + resp.Status.Reason = err.Error() + return resp, nil + } + field2Binlog := make(map[UniqueID][]string) + for _, m := range meta { + field2Binlog[m.FieldID] = append(field2Binlog[m.FieldID], m.BinlogPath) + } + + for f, paths := range field2Binlog { + fieldBinlogs := &datapb.FieldBinlog{ + FieldID: f, + Binlogs: paths, + } + segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs) + } + } + + binlogs := make([]*datapb.SegmentBinlogs, 0, len(segment2Binlogs)) + for segmentID, fieldBinlogs := range segment2Binlogs { + sbl := &datapb.SegmentBinlogs{ + SegmentID: segmentID, + FieldBinlogs: fieldBinlogs, + } + binlogs = append(binlogs, sbl) + } + + dresp, err := s.masterClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + SourceID: Params.NodeID, + }, + CollectionID: collectionID, + }) + if err = VerifyResponse(dresp, err); err != nil { + log.Error("Get collection info from master failed", + zap.Int64("collectionID", collectionID), + zap.Error(err)) + + resp.Status.Reason = err.Error() + return resp, nil + } + + channels := dresp.GetVirtualChannelNames() + vchans := make([]vchannel, 0, len(channels)) + for _, c := range channels { + vchans = append(vchans, vchannel{ + CollectionID: collectionID, + DmlChannel: c, + }) + } + + channelInfos, err := s.GetVChanPositions(vchans) + if err != nil { + log.Error("Get channel positions failed", zap.Strings("channels", channels), + zap.Error(err)) + resp.Status.Reason = err.Error() + return resp, nil + } + + resp.Binlogs = binlogs + resp.Channels = channelInfos + resp.Status.ErrorCode = commonpb.ErrorCode_Success + return resp, nil +} + func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { return &datapb.RegisterNodeResponse{ Status: &commonpb.Status{ diff --git a/internal/dataservice/mock_test.go b/internal/dataservice/mock_test.go index b489fa1679..c2b712235f 100644 --- a/internal/dataservice/mock_test.go +++ b/internal/dataservice/mock_test.go @@ -190,7 +190,8 @@ func (m *mockMasterService) DescribeCollection(ctx context.Context, req *milvusp Schema: &schemapb.CollectionSchema{ Name: "test", }, - CollectionID: 1314, + CollectionID: 1314, + VirtualChannelNames: []string{"vchan1"}, }, nil } diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index b49500d29e..2a2b1928f0 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -1,5 +1,4 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// +// Copyright (C) 2019-2020 Zilliz. All rights reserved.// // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // @@ -37,7 +36,6 @@ type ParamTable struct { SegmentDmlPosSubPath string SegmentDdlPosSubPath string DmlChannelPosSubPath string - DdlChannelPosSubPath string // --- Pulsar --- PulsarAddress string @@ -105,9 +103,7 @@ func (p *ParamTable) Init() { p.initFlushStreamPosSubPath() p.initStatsStreamPosSubPath() p.initSegmentDmlPosSubPath() - p.initSegmentDdlPosSubPath() p.initDmlChannelPosSubPath() - p.initDdlChannelPosSubPath() }) } @@ -324,14 +320,6 @@ func (p *ParamTable) initSegmentDmlPosSubPath() { p.SegmentDmlPosSubPath = subPath } -func (p *ParamTable) initSegmentDdlPosSubPath() { - subPath, err := p.Load("etcd.segmentDdlPosSubPath") - if err != nil { - panic(err) - } - p.SegmentDdlPosSubPath = subPath -} - func (p *ParamTable) initDmlChannelPosSubPath() { subPath, err := p.Load("etcd.dmlChanPosSubPath") if err != nil { @@ -339,11 +327,3 @@ func (p *ParamTable) initDmlChannelPosSubPath() { } p.DmlChannelPosSubPath = subPath } - -func (p *ParamTable) initDdlChannelPosSubPath() { - subPath, err := p.Load("etcd.ddlChanPosSubPath") - if err != nil { - panic(err) - } - p.DdlChannelPosSubPath = subPath -} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 7545d658e1..ea1baee8ab 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -538,9 +538,6 @@ func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]s return meta, nil } -func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { - panic("implement me") -} func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack { msgPack := msgstream.MsgPack{ diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index d54f8fef8d..804954316c 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -722,7 +722,6 @@ func TestGetVChannelPos(t *testing.T) { { CollectionID: 0, DmlChannel: "ch1", - DdlChannel: "ch2", }, }) assert.Nil(t, err) @@ -736,6 +735,131 @@ func TestGetVChannelPos(t *testing.T) { }) } +func TestGetRecoveryInfo(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.masterClientCreator = func(addr string) (types.MasterService, error) { + return newMockMasterService(), nil + } + + t.Run("test get recovery info with no segments", func(t *testing.T) { + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 0, len(resp.GetBinlogs())) + assert.EqualValues(t, 1, len(resp.GetChannels())) + assert.Nil(t, resp.GetChannels()[0].SeekPosition) + }) + + createSegment := func(id, collectionID, partitionID, numOfRows int64, posTs uint64, + channel string, state commonpb.SegmentState) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: id, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + NumOfRows: numOfRows, + State: state, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: channel, + MsgID: []byte{}, + Timestamp: posTs, + }, + } + } + + t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) { + seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) + seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed) + err := svr.meta.AddSegment(seg1) + assert.Nil(t, err) + err = svr.meta.AddSegment(seg2) + assert.Nil(t, err) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 1, len(resp.GetChannels())) + assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetCheckPoints())) + assert.EqualValues(t, []UniqueID{0, 1}, resp.GetChannels()[0].GetFlushedSegments()) + assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + }) + + t.Run("test get smallest position of unflushed segments as seek position", func(t *testing.T) { + seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) + seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing) + err := svr.meta.AddSegment(seg1) + assert.Nil(t, err) + err = svr.meta.AddSegment(seg2) + assert.Nil(t, err) + expectedCps := make(map[UniqueID]*datapb.SegmentInfo) + expectedCps[3] = seg1 + expectedCps[4] = seg2 + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 1, len(resp.GetChannels())) + assert.EqualValues(t, 2, len(resp.GetChannels()[0].GetCheckPoints())) + assert.EqualValues(t, []UniqueID{0, 1}, resp.GetChannels()[0].GetFlushedSegments()) + assert.EqualValues(t, 30, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + cps := resp.GetChannels()[0].GetCheckPoints() + for _, cp := range cps { + seg, ok := expectedCps[cp.GetSegmentID()] + assert.True(t, ok) + assert.EqualValues(t, seg.GetDmlPosition().GetTimestamp(), cp.GetPosition().GetTimestamp()) + assert.EqualValues(t, seg.GetNumOfRows(), cp.GetNumOfRows()) + } + }) + + t.Run("test get binlogs", func(t *testing.T) { + binlogReq := &datapb.SaveBinlogPathsRequest{ + SegmentID: 0, + CollectionID: 0, + Field2BinlogPaths: []*datapb.ID2PathList{ + { + ID: 1, + Paths: []string{ + "/binlog/file1", + "/binlog/file2", + }, + }, + }, + DdlBinlogPaths: []*datapb.DDLBinlogMeta{}, + } + meta, err := svr.prepareBinlog(binlogReq) + assert.Nil(t, err) + err = svr.kvClient.MultiSave(meta) + assert.Nil(t, err) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.EqualValues(t, 1, len(resp.GetBinlogs())) + assert.EqualValues(t, 0, resp.GetBinlogs()[0].GetSegmentID()) + assert.EqualValues(t, 1, len(resp.GetBinlogs()[0].GetFieldBinlogs())) + assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID()) + assert.EqualValues(t, []string{"/binlog/file1", "/binlog/file2"}, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs()) + }) +} + func newTestServer(t *testing.T, receiveCh chan interface{}) *Server { Params.Init() var err error diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto index 5c0d2b5e67..dff2577f51 100644 --- a/internal/proto/data_service.proto +++ b/internal/proto/data_service.proto @@ -272,18 +272,6 @@ message DataNodeTtMsg { uint64 timestamp = 3; } - - -message SegmentBinlogs { - int64 segmentID = 1; - repeated FieldBinlog fieldBinlogs = 2; -} - -message FieldBinlog{ - int64 fieldID = 1; - repeated string binlogs = 2; -} - enum ChannelWatchState { Uncomplete = 0; Complete = 1; @@ -301,8 +289,18 @@ message DataNodeInfo { repeated ChannelStatus channels = 3; } +message SegmentBinlogs { + int64 segmentID = 1; + repeated FieldBinlog fieldBinlogs = 2; +} + +message FieldBinlog{ + int64 fieldID = 1; + repeated string binlogs = 2; +} + message GetRecoveryInfoResponse { - common.MsgBase base = 1; + common.Status status = 1; repeated VchannelInfo channels = 2; repeated SegmentBinlogs binlogs = 3; } @@ -313,5 +311,3 @@ message GetRecoveryInfoRequest { int64 partitionID = 3; } - - diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go index ca6e746be7..aabd10dc5c 100644 --- a/internal/proto/datapb/data_service.pb.go +++ b/internal/proto/datapb/data_service.pb.go @@ -2145,100 +2145,6 @@ func (m *DataNodeTtMsg) GetTimestamp() uint64 { return 0 } -type SegmentBinlogs struct { - SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` - FieldBinlogs []*FieldBinlog `protobuf:"bytes,2,rep,name=fieldBinlogs,proto3" json:"fieldBinlogs,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *SegmentBinlogs) Reset() { *m = SegmentBinlogs{} } -func (m *SegmentBinlogs) String() string { return proto.CompactTextString(m) } -func (*SegmentBinlogs) ProtoMessage() {} -func (*SegmentBinlogs) Descriptor() ([]byte, []int) { - return fileDescriptor_3385cd32ad6cfe64, []int{38} -} - -func (m *SegmentBinlogs) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_SegmentBinlogs.Unmarshal(m, b) -} -func (m *SegmentBinlogs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_SegmentBinlogs.Marshal(b, m, deterministic) -} -func (m *SegmentBinlogs) XXX_Merge(src proto.Message) { - xxx_messageInfo_SegmentBinlogs.Merge(m, src) -} -func (m *SegmentBinlogs) XXX_Size() int { - return xxx_messageInfo_SegmentBinlogs.Size(m) -} -func (m *SegmentBinlogs) XXX_DiscardUnknown() { - xxx_messageInfo_SegmentBinlogs.DiscardUnknown(m) -} - -var xxx_messageInfo_SegmentBinlogs proto.InternalMessageInfo - -func (m *SegmentBinlogs) GetSegmentID() int64 { - if m != nil { - return m.SegmentID - } - return 0 -} - -func (m *SegmentBinlogs) GetFieldBinlogs() []*FieldBinlog { - if m != nil { - return m.FieldBinlogs - } - return nil -} - -type FieldBinlog struct { - FieldID int64 `protobuf:"varint,1,opt,name=fieldID,proto3" json:"fieldID,omitempty"` - Binlogs []string `protobuf:"bytes,2,rep,name=binlogs,proto3" json:"binlogs,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FieldBinlog) Reset() { *m = FieldBinlog{} } -func (m *FieldBinlog) String() string { return proto.CompactTextString(m) } -func (*FieldBinlog) ProtoMessage() {} -func (*FieldBinlog) Descriptor() ([]byte, []int) { - return fileDescriptor_3385cd32ad6cfe64, []int{39} -} - -func (m *FieldBinlog) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FieldBinlog.Unmarshal(m, b) -} -func (m *FieldBinlog) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FieldBinlog.Marshal(b, m, deterministic) -} -func (m *FieldBinlog) XXX_Merge(src proto.Message) { - xxx_messageInfo_FieldBinlog.Merge(m, src) -} -func (m *FieldBinlog) XXX_Size() int { - return xxx_messageInfo_FieldBinlog.Size(m) -} -func (m *FieldBinlog) XXX_DiscardUnknown() { - xxx_messageInfo_FieldBinlog.DiscardUnknown(m) -} - -var xxx_messageInfo_FieldBinlog proto.InternalMessageInfo - -func (m *FieldBinlog) GetFieldID() int64 { - if m != nil { - return m.FieldID - } - return 0 -} - -func (m *FieldBinlog) GetBinlogs() []string { - if m != nil { - return m.Binlogs - } - return nil -} - type ChannelStatus struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` State ChannelWatchState `protobuf:"varint,2,opt,name=state,proto3,enum=milvus.proto.data.ChannelWatchState" json:"state,omitempty"` @@ -2252,7 +2158,7 @@ func (m *ChannelStatus) Reset() { *m = ChannelStatus{} } func (m *ChannelStatus) String() string { return proto.CompactTextString(m) } func (*ChannelStatus) ProtoMessage() {} func (*ChannelStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_3385cd32ad6cfe64, []int{40} + return fileDescriptor_3385cd32ad6cfe64, []int{38} } func (m *ChannelStatus) XXX_Unmarshal(b []byte) error { @@ -2307,7 +2213,7 @@ func (m *DataNodeInfo) Reset() { *m = DataNodeInfo{} } func (m *DataNodeInfo) String() string { return proto.CompactTextString(m) } func (*DataNodeInfo) ProtoMessage() {} func (*DataNodeInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_3385cd32ad6cfe64, []int{41} + return fileDescriptor_3385cd32ad6cfe64, []int{39} } func (m *DataNodeInfo) XXX_Unmarshal(b []byte) error { @@ -2349,8 +2255,102 @@ func (m *DataNodeInfo) GetChannels() []*ChannelStatus { return nil } +type SegmentBinlogs struct { + SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` + FieldBinlogs []*FieldBinlog `protobuf:"bytes,2,rep,name=fieldBinlogs,proto3" json:"fieldBinlogs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SegmentBinlogs) Reset() { *m = SegmentBinlogs{} } +func (m *SegmentBinlogs) String() string { return proto.CompactTextString(m) } +func (*SegmentBinlogs) ProtoMessage() {} +func (*SegmentBinlogs) Descriptor() ([]byte, []int) { + return fileDescriptor_3385cd32ad6cfe64, []int{40} +} + +func (m *SegmentBinlogs) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SegmentBinlogs.Unmarshal(m, b) +} +func (m *SegmentBinlogs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SegmentBinlogs.Marshal(b, m, deterministic) +} +func (m *SegmentBinlogs) XXX_Merge(src proto.Message) { + xxx_messageInfo_SegmentBinlogs.Merge(m, src) +} +func (m *SegmentBinlogs) XXX_Size() int { + return xxx_messageInfo_SegmentBinlogs.Size(m) +} +func (m *SegmentBinlogs) XXX_DiscardUnknown() { + xxx_messageInfo_SegmentBinlogs.DiscardUnknown(m) +} + +var xxx_messageInfo_SegmentBinlogs proto.InternalMessageInfo + +func (m *SegmentBinlogs) GetSegmentID() int64 { + if m != nil { + return m.SegmentID + } + return 0 +} + +func (m *SegmentBinlogs) GetFieldBinlogs() []*FieldBinlog { + if m != nil { + return m.FieldBinlogs + } + return nil +} + +type FieldBinlog struct { + FieldID int64 `protobuf:"varint,1,opt,name=fieldID,proto3" json:"fieldID,omitempty"` + Binlogs []string `protobuf:"bytes,2,rep,name=binlogs,proto3" json:"binlogs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FieldBinlog) Reset() { *m = FieldBinlog{} } +func (m *FieldBinlog) String() string { return proto.CompactTextString(m) } +func (*FieldBinlog) ProtoMessage() {} +func (*FieldBinlog) Descriptor() ([]byte, []int) { + return fileDescriptor_3385cd32ad6cfe64, []int{41} +} + +func (m *FieldBinlog) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FieldBinlog.Unmarshal(m, b) +} +func (m *FieldBinlog) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FieldBinlog.Marshal(b, m, deterministic) +} +func (m *FieldBinlog) XXX_Merge(src proto.Message) { + xxx_messageInfo_FieldBinlog.Merge(m, src) +} +func (m *FieldBinlog) XXX_Size() int { + return xxx_messageInfo_FieldBinlog.Size(m) +} +func (m *FieldBinlog) XXX_DiscardUnknown() { + xxx_messageInfo_FieldBinlog.DiscardUnknown(m) +} + +var xxx_messageInfo_FieldBinlog proto.InternalMessageInfo + +func (m *FieldBinlog) GetFieldID() int64 { + if m != nil { + return m.FieldID + } + return 0 +} + +func (m *FieldBinlog) GetBinlogs() []string { + if m != nil { + return m.Binlogs + } + return nil +} + type GetRecoveryInfoResponse struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` Channels []*VchannelInfo `protobuf:"bytes,2,rep,name=channels,proto3" json:"channels,omitempty"` Binlogs []*SegmentBinlogs `protobuf:"bytes,3,rep,name=binlogs,proto3" json:"binlogs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -2383,9 +2383,9 @@ func (m *GetRecoveryInfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_GetRecoveryInfoResponse proto.InternalMessageInfo -func (m *GetRecoveryInfoResponse) GetBase() *commonpb.MsgBase { +func (m *GetRecoveryInfoResponse) GetStatus() *commonpb.Status { if m != nil { - return m.Base + return m.Status } return nil } @@ -2499,10 +2499,10 @@ func init() { proto.RegisterType((*SaveBinlogPathsRequest)(nil), "milvus.proto.data.SaveBinlogPathsRequest") proto.RegisterType((*CheckPoint)(nil), "milvus.proto.data.CheckPoint") proto.RegisterType((*DataNodeTtMsg)(nil), "milvus.proto.data.DataNodeTtMsg") - proto.RegisterType((*SegmentBinlogs)(nil), "milvus.proto.data.SegmentBinlogs") - proto.RegisterType((*FieldBinlog)(nil), "milvus.proto.data.FieldBinlog") proto.RegisterType((*ChannelStatus)(nil), "milvus.proto.data.ChannelStatus") proto.RegisterType((*DataNodeInfo)(nil), "milvus.proto.data.DataNodeInfo") + proto.RegisterType((*SegmentBinlogs)(nil), "milvus.proto.data.SegmentBinlogs") + proto.RegisterType((*FieldBinlog)(nil), "milvus.proto.data.FieldBinlog") proto.RegisterType((*GetRecoveryInfoResponse)(nil), "milvus.proto.data.GetRecoveryInfoResponse") proto.RegisterType((*GetRecoveryInfoRequest)(nil), "milvus.proto.data.GetRecoveryInfoRequest") } @@ -2510,13 +2510,13 @@ func init() { func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) } var fileDescriptor_3385cd32ad6cfe64 = []byte{ - // 2086 bytes of a gzipped FileDescriptorProto + // 2083 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x59, 0xdb, 0x6f, 0x1b, 0x59, 0x19, 0xcf, 0xd8, 0xce, 0xc5, 0x9f, 0xc7, 0x4e, 0x72, 0x1a, 0xb2, 0x96, 0xdb, 0xa6, 0xe9, 0xb0, 0xdb, 0xcd, 0x76, 0x45, 0xb2, 0x75, 0x11, 0xb7, 0xb2, 0xa0, 0xa6, 0x6e, 0x83, 0x45, 0x53, 0xc2, 0x49, 0x77, 0x57, 0x62, 0x85, 0xac, 0x89, 0xe7, 0xc4, 0x19, 0x32, 0x17, 0xef, 0x9c, 0x71, 0x92, - 0x3e, 0x75, 0xb5, 0xf0, 0x00, 0x08, 0xb1, 0xf0, 0x1f, 0x00, 0x12, 0x12, 0x12, 0x2f, 0xfc, 0x13, - 0xfc, 0x0b, 0xfc, 0x25, 0xbc, 0x83, 0xce, 0x65, 0xee, 0x63, 0x7b, 0xe2, 0xb4, 0xca, 0x9b, 0xcf, + 0x3e, 0x75, 0xb5, 0xf0, 0x00, 0x08, 0xb1, 0xf0, 0x1f, 0x00, 0x12, 0x12, 0x12, 0x2f, 0xfc, 0x19, + 0xfc, 0x05, 0xfc, 0x25, 0xbc, 0x83, 0xce, 0x65, 0xee, 0x63, 0x7b, 0xe2, 0xb4, 0xca, 0x9b, 0xcf, 0xf1, 0x77, 0x3b, 0xdf, 0xf9, 0xce, 0xef, 0xbb, 0x0c, 0x20, 0x43, 0xf7, 0xf5, 0x1e, 0x25, 0xde, 0x99, 0xd9, 0x27, 0xdb, 0x43, 0xcf, 0xf5, 0x5d, 0xb4, 0x6a, 0x9b, 0xd6, 0xd9, 0x88, 0x8a, 0xd5, 0x36, 0x23, 0x68, 0xa9, 0x7d, 0xd7, 0xb6, 0x5d, 0x47, 0x6c, 0xb5, 0x1a, 0xa6, 0xe3, 0x13, 0xcf, @@ -2605,43 +2605,43 @@ var fileDescriptor_3385cd32ad6cfe64 = []byte{ 0xa8, 0x09, 0x11, 0xd7, 0x14, 0xbc, 0xfc, 0x11, 0x2c, 0xcd, 0x70, 0xe3, 0x21, 0x4f, 0xfa, 0xc1, 0x96, 0x53, 0x0f, 0x56, 0xfb, 0x4a, 0x81, 0x7a, 0x47, 0xf7, 0xf5, 0x17, 0xae, 0x41, 0x5e, 0xce, 0x98, 0x43, 0x0b, 0xcc, 0xe1, 0x6e, 0x41, 0x95, 0xbd, 0x35, 0xea, 0xeb, 0xf6, 0x90, 0x1b, 0x51, - 0xc1, 0xd1, 0x86, 0xe6, 0x41, 0x43, 0x62, 0x82, 0xb8, 0x02, 0x3a, 0xc5, 0x29, 0xbb, 0xa0, 0x1e, - 0x47, 0x39, 0x77, 0x52, 0x2b, 0x1a, 0x4b, 0xcd, 0x38, 0xc1, 0xa3, 0x3d, 0x86, 0x5a, 0xec, 0xcf, - 0x09, 0x79, 0xb0, 0x09, 0x8b, 0x47, 0x31, 0x3d, 0x55, 0x1c, 0x2c, 0x59, 0x6b, 0x5d, 0x97, 0xc0, - 0x28, 0xea, 0x5d, 0x56, 0xe5, 0x70, 0x0f, 0x88, 0x14, 0xcd, 0x7f, 0xa3, 0x1f, 0x24, 0x47, 0x2f, - 0xef, 0xe6, 0xc6, 0x10, 0x17, 0xc2, 0xcb, 0xbe, 0x04, 0x2a, 0x16, 0x69, 0x96, 0xbe, 0x54, 0x40, - 0x0d, 0x6e, 0x90, 0x27, 0x88, 0x66, 0x34, 0xbf, 0x17, 0x76, 0x04, 0x4b, 0xf6, 0xcf, 0x19, 0xf1, - 0x68, 0x10, 0x4b, 0x65, 0x1c, 0x2c, 0xd1, 0x0f, 0x61, 0x29, 0xac, 0x13, 0xcb, 0x63, 0x9f, 0x4c, - 0xe2, 0xb0, 0x38, 0xe4, 0xd0, 0xfe, 0xad, 0xf0, 0xf1, 0x17, 0x26, 0x7d, 0xf7, 0x8c, 0x78, 0xaf, - 0x12, 0x43, 0x86, 0xcb, 0x87, 0xd3, 0xa3, 0x98, 0x2d, 0x05, 0x6b, 0xd6, 0x90, 0x01, 0x3d, 0x8a, - 0x6e, 0xab, 0x3c, 0xb6, 0x82, 0x48, 0x06, 0x5b, 0x74, 0xa1, 0x7f, 0x12, 0xb3, 0x92, 0xe4, 0x39, - 0xae, 0x75, 0xb2, 0x79, 0xff, 0x01, 0xac, 0x66, 0xa2, 0x03, 0x35, 0x00, 0x3e, 0x71, 0xfa, 0xae, - 0x3d, 0xb4, 0x88, 0x4f, 0x56, 0xe6, 0x90, 0x0a, 0x4b, 0x4f, 0x82, 0x95, 0xd2, 0xfe, 0x4f, 0x1d, - 0x6a, 0x2c, 0x20, 0x0e, 0xc5, 0xb7, 0x26, 0x34, 0x04, 0xc4, 0x9b, 0x69, 0x7b, 0xe8, 0x3a, 0xe1, - 0xf0, 0x0b, 0x7d, 0x34, 0x06, 0x46, 0xb2, 0xa4, 0xd2, 0x05, 0xad, 0x7b, 0x63, 0x38, 0x52, 0xe4, - 0xda, 0x1c, 0xb2, 0xb9, 0x46, 0x96, 0x46, 0x5f, 0x9a, 0xfd, 0xd3, 0xa0, 0x76, 0x98, 0xa0, 0x31, - 0x45, 0x1a, 0x68, 0x4c, 0xcd, 0xd4, 0xe4, 0x42, 0x0c, 0x5e, 0x82, 0x00, 0xd3, 0xe6, 0xd0, 0x17, - 0xb0, 0xc6, 0xba, 0xcb, 0xb0, 0xc9, 0x0d, 0x14, 0xb6, 0xc7, 0x2b, 0xcc, 0x10, 0x5f, 0x52, 0xa5, - 0x0e, 0x6a, 0xfc, 0x53, 0x17, 0xca, 0x9b, 0xbf, 0xe7, 0x7c, 0x8d, 0x6b, 0xbd, 0x3f, 0x95, 0x2e, - 0x54, 0xb1, 0x07, 0xf3, 0xbc, 0x98, 0x45, 0x79, 0xd1, 0x1f, 0xff, 0xac, 0xd5, 0x9a, 0xd4, 0x82, - 0x6b, 0x73, 0xe8, 0x57, 0xb0, 0x9c, 0xfa, 0xa0, 0x80, 0x3e, 0xc8, 0x11, 0x99, 0xff, 0x69, 0xa8, - 0x75, 0xbf, 0x08, 0x69, 0xdc, 0x2f, 0xf1, 0xa1, 0x7b, 0xae, 0x5f, 0x72, 0x3e, 0x1c, 0xe4, 0xfa, - 0x25, 0x6f, 0x7a, 0xaf, 0xcd, 0xa1, 0x01, 0x34, 0x92, 0xb3, 0x04, 0xb4, 0x95, 0xc3, 0x9c, 0x3b, - 0x5e, 0x6d, 0x7d, 0x50, 0x80, 0x32, 0x54, 0x64, 0xc3, 0x4a, 0x7a, 0x64, 0x8c, 0xee, 0x4f, 0x14, - 0x90, 0x7c, 0x2f, 0x1f, 0x16, 0xa2, 0x0d, 0xd5, 0xbd, 0xe2, 0x51, 0x9c, 0x19, 0x59, 0xa2, 0xed, - 0x7c, 0x31, 0xe3, 0x66, 0xa9, 0xad, 0x9d, 0xc2, 0xf4, 0xa1, 0x6a, 0x02, 0xab, 0x99, 0x11, 0x24, - 0xfa, 0x70, 0x92, 0x9c, 0xd4, 0x68, 0xa2, 0x35, 0x7d, 0x48, 0xaa, 0xcd, 0xa1, 0xaf, 0x44, 0x9a, - 0xc8, 0x1b, 0xeb, 0xa1, 0x07, 0xf9, 0xda, 0x26, 0xcc, 0x23, 0x5b, 0xed, 0xcb, 0xb0, 0x84, 0x67, - 0x7d, 0xcd, 0x21, 0x3e, 0x67, 0x34, 0x96, 0xc6, 0xa7, 0x40, 0xde, 0xf8, 0x99, 0x5f, 0xeb, 0xc1, - 0x25, 0x38, 0x42, 0x03, 0xdc, 0xf4, 0xec, 0x3f, 0x80, 0xab, 0x9d, 0xa9, 0xc1, 0x39, 0x1b, 0x56, - 0x7d, 0x0e, 0xcb, 0xa9, 0x82, 0x3f, 0xf7, 0xfd, 0xe7, 0x37, 0x05, 0x05, 0xc0, 0x25, 0x95, 0x31, - 0xd1, 0x98, 0x47, 0x96, 0x93, 0x55, 0x5b, 0xf7, 0x8b, 0x90, 0x06, 0x07, 0x69, 0xff, 0xbd, 0x0c, - 0x4b, 0x41, 0xa5, 0x73, 0x0d, 0x59, 0xed, 0x1a, 0xd2, 0xcc, 0xe7, 0xb0, 0x9c, 0x9a, 0x05, 0xe6, - 0x7a, 0x37, 0x7f, 0x5e, 0x38, 0xed, 0xea, 0x3e, 0x83, 0x7a, 0x62, 0xb8, 0x87, 0xde, 0x1f, 0x97, - 0x68, 0xd2, 0x68, 0x3d, 0x59, 0xf0, 0xee, 0xc3, 0x5f, 0x3c, 0x18, 0x98, 0xfe, 0xc9, 0xe8, 0x88, - 0xfd, 0xb3, 0x23, 0x48, 0xbf, 0x65, 0xba, 0xf2, 0xd7, 0x4e, 0xe0, 0xa0, 0x1d, 0xce, 0xbd, 0xc3, - 0xd4, 0x0c, 0x8f, 0x8e, 0x16, 0xf8, 0xea, 0xe1, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe, 0x39, - 0x43, 0x7c, 0x28, 0x23, 0x00, 0x00, + 0xc1, 0xd1, 0x06, 0xeb, 0x51, 0xeb, 0x12, 0x61, 0x44, 0xe1, 0xc8, 0xca, 0x05, 0x2e, 0x4a, 0xe4, + 0x3a, 0xfe, 0x1b, 0xfd, 0x20, 0x39, 0xc3, 0x78, 0x37, 0xf7, 0x32, 0xb8, 0x10, 0x5e, 0x3f, 0x25, + 0xe0, 0xa5, 0x48, 0xd7, 0xf1, 0xa5, 0x02, 0x6a, 0xe0, 0x0a, 0x8e, 0xb4, 0xcd, 0x68, 0x10, 0x2e, + 0xec, 0x08, 0x96, 0xec, 0x9f, 0x33, 0xe2, 0xd1, 0xe0, 0x52, 0xca, 0x38, 0x58, 0xa2, 0x1f, 0xc2, + 0x52, 0x58, 0x70, 0x95, 0xc7, 0xc6, 0x5e, 0xe2, 0xb0, 0x38, 0xe4, 0xd0, 0x3c, 0x68, 0x48, 0x70, + 0x14, 0xb1, 0x48, 0xa7, 0x44, 0xc7, 0x2e, 0xa8, 0xc7, 0x51, 0xf1, 0x31, 0xa9, 0x27, 0x8f, 0xd5, + 0x28, 0x38, 0xc1, 0xa3, 0x3d, 0x86, 0x5a, 0xec, 0xcf, 0x09, 0x05, 0x41, 0x13, 0x16, 0x8f, 0x62, + 0x7a, 0xaa, 0x38, 0x58, 0x6a, 0xff, 0x56, 0xf8, 0xf8, 0x0b, 0x93, 0xbe, 0x7b, 0x46, 0xbc, 0x57, + 0x57, 0x1f, 0x32, 0x3c, 0x8a, 0x79, 0xb1, 0x60, 0xd9, 0x1a, 0x32, 0xa0, 0x47, 0x91, 0x9d, 0xe5, + 0xb1, 0x45, 0x44, 0xd2, 0xcd, 0xd1, 0x51, 0xfe, 0x24, 0xc6, 0x25, 0xc9, 0xa3, 0x5c, 0xeb, 0x70, + 0xf3, 0xfe, 0x03, 0x58, 0xcd, 0xc4, 0x35, 0x6a, 0x00, 0x7c, 0xe2, 0xf4, 0x5d, 0x7b, 0x68, 0x11, + 0x9f, 0xac, 0xcc, 0x21, 0x15, 0x96, 0x9e, 0x04, 0x2b, 0xa5, 0xfd, 0x9f, 0x3a, 0xd4, 0x58, 0x28, + 0x1f, 0x8a, 0xcf, 0x4d, 0x68, 0x08, 0x88, 0xf7, 0xd3, 0xf6, 0xd0, 0x75, 0xc2, 0xf9, 0x17, 0xfa, + 0x68, 0x0c, 0x92, 0x64, 0x49, 0xa5, 0x0b, 0x5a, 0xf7, 0xc6, 0x70, 0xa4, 0xc8, 0xb5, 0x39, 0x64, + 0x73, 0x8d, 0x2c, 0x93, 0xbe, 0x34, 0xfb, 0xa7, 0x41, 0xf9, 0x30, 0x41, 0x63, 0x8a, 0x34, 0xd0, + 0x98, 0x1a, 0xab, 0xc9, 0x85, 0x98, 0xbd, 0x04, 0x31, 0xa6, 0xcd, 0xa1, 0x2f, 0x60, 0x8d, 0x35, + 0x98, 0x61, 0x9f, 0x1b, 0x28, 0x6c, 0x8f, 0x57, 0x98, 0x21, 0xbe, 0xa4, 0x4a, 0x1d, 0xd4, 0xf8, + 0xd7, 0x2e, 0x94, 0x37, 0x82, 0xcf, 0xf9, 0x20, 0xd7, 0x7a, 0x7f, 0x2a, 0x5d, 0xa8, 0x62, 0x0f, + 0xe6, 0x79, 0x3d, 0x8b, 0xf2, 0xa2, 0x3f, 0xfe, 0x65, 0xab, 0x35, 0xe9, 0x55, 0x69, 0x73, 0xe8, + 0x57, 0xb0, 0x9c, 0xfa, 0xa6, 0x80, 0x3e, 0xc8, 0x11, 0x99, 0xff, 0x75, 0xa8, 0x75, 0xbf, 0x08, + 0x69, 0xdc, 0x2f, 0xf1, 0xb9, 0x7b, 0xae, 0x5f, 0x72, 0xbe, 0x1d, 0xe4, 0xfa, 0x25, 0x6f, 0x80, + 0xaf, 0xcd, 0xa1, 0x01, 0x34, 0x92, 0xe3, 0x04, 0xb4, 0x95, 0xc3, 0x9c, 0x3b, 0x61, 0x6d, 0x7d, + 0x50, 0x80, 0x32, 0x54, 0x64, 0xc3, 0x4a, 0x7a, 0x6a, 0x8c, 0xee, 0x4f, 0x14, 0x90, 0x7c, 0x2f, + 0x1f, 0x16, 0xa2, 0x0d, 0xd5, 0xbd, 0xe2, 0x51, 0x9c, 0x99, 0x5a, 0xa2, 0xed, 0x7c, 0x31, 0xe3, + 0xc6, 0xa9, 0xad, 0x9d, 0xc2, 0xf4, 0xa1, 0x6a, 0x02, 0xab, 0x99, 0x29, 0x24, 0xfa, 0x70, 0x92, + 0x9c, 0xd4, 0x74, 0xa2, 0x35, 0x7d, 0x4e, 0xaa, 0xcd, 0xa1, 0xaf, 0x44, 0xa6, 0xc8, 0x9b, 0xec, + 0xa1, 0x07, 0xf9, 0xda, 0x26, 0x8c, 0x24, 0x5b, 0xed, 0xcb, 0xb0, 0x84, 0x67, 0x7d, 0xcd, 0x21, + 0x3e, 0x67, 0x3a, 0x96, 0xc6, 0xa7, 0x40, 0xde, 0xf8, 0xb1, 0x5f, 0xeb, 0xc1, 0x25, 0x38, 0x42, + 0x03, 0xdc, 0xf4, 0xf8, 0x3f, 0x80, 0xab, 0x9d, 0xa9, 0xc1, 0x39, 0x1b, 0x56, 0x7d, 0x0e, 0xcb, + 0xa9, 0x9a, 0x3f, 0xf7, 0xfd, 0xe7, 0xf7, 0x05, 0x05, 0xc0, 0x25, 0x95, 0x31, 0xd1, 0x98, 0x47, + 0x96, 0x93, 0x55, 0x5b, 0xf7, 0x8b, 0x90, 0x06, 0x07, 0x69, 0xff, 0xbd, 0x0c, 0x4b, 0x41, 0x8d, + 0x76, 0x0d, 0x59, 0xed, 0x1a, 0xd2, 0xcc, 0xe7, 0xb0, 0x9c, 0x1a, 0x07, 0xe6, 0x7a, 0x37, 0x7f, + 0x64, 0x38, 0xed, 0xea, 0x3e, 0x83, 0x7a, 0x62, 0xbe, 0x87, 0xde, 0x1f, 0x97, 0x68, 0xd2, 0x68, + 0x3d, 0x59, 0xf0, 0xee, 0xc3, 0x5f, 0x3c, 0x18, 0x98, 0xfe, 0xc9, 0xe8, 0x88, 0xfd, 0xb3, 0x23, + 0x48, 0xbf, 0x65, 0xba, 0xf2, 0xd7, 0x4e, 0xe0, 0xa0, 0x1d, 0xce, 0xbd, 0xc3, 0xd4, 0x0c, 0x8f, + 0x8e, 0x16, 0xf8, 0xea, 0xe1, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x11, 0xb6, 0xf8, 0x23, 0x2b, + 0x23, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used.