From 2ea7579dbb105b465cdada601a0a7716075784a7 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 23 Oct 2023 21:44:09 +0800 Subject: [PATCH] Reduce rpc size for GetRecoveryInfoV2 (#27483) Signed-off-by: xiaofan-luan --- configs/milvus.yaml | 2 +- internal/datacoord/channel_manager.go | 9 +- internal/datacoord/cluster_test.go | 16 +- internal/datacoord/server_test.go | 1 + internal/datacoord/services.go | 33 ++- internal/datacoord/services_test.go | 216 +++++++++++++++++- internal/metastore/kv/datacoord/kv_catalog.go | 106 ++++++--- .../metastore/kv/datacoord/kv_catalog_test.go | 113 ++++++++- .../querycoordv2/meta/coordinator_broker.go | 7 + pkg/util/paramtable/grpc_param.go | 2 +- 10 files changed, 441 insertions(+), 64 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 79539fbec3..22f87a2adc 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -443,7 +443,7 @@ grpc: serverMaxSendSize: 536870912 serverMaxRecvSize: 536870912 client: - compressionEnabled: false + compressionEnabled: true dialTimeout: 200 keepAliveTime: 10000 keepAliveTimeout: 20000 diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index a70d5056b1..0d77425c08 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -491,8 +491,8 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data return channelsWithTimer } -// GetChannels gets channels info of registered nodes. -func (c *ChannelManager) GetChannels() []*NodeChannelInfo { +// GetAssignedChannels gets channels info of registered nodes. +func (c *ChannelManager) GetAssignedChannels() []*NodeChannelInfo { c.mu.RLock() defer c.mu.RUnlock() @@ -501,13 +501,14 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo { func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel { channels := make([]*channel, 0) - for _, nodeChannels := range c.store.GetNodesChannels() { + for _, nodeChannels := range c.store.GetChannels() { for _, channelInfo := range nodeChannels.Channels { if collectionID == channelInfo.CollectionID { channels = append(channels, channelInfo) } } } + log.Info("get channel", zap.Any("collection", collectionID), zap.Any("channel", channels)) return channels } @@ -899,7 +900,7 @@ func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName st } func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID) { - for _, nodeChannel := range c.GetChannels() { + for _, nodeChannel := range c.GetAssignedChannels() { for _, ch := range nodeChannel.Channels { if ch.Name == chName { return true, nodeChannel.NodeID diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 2b1a4f0401..2df57c6081 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -132,7 +132,7 @@ func (suite *ClusterSuite) TestCreate() { err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}}) suite.NoError(err) - channels := channelManager.GetChannels() + channels := channelManager.GetAssignedChannels() suite.EqualValues([]*NodeChannelInfo{{1, []*channel{{Name: "channel1", CollectionID: 1}}}}, channels) }) @@ -181,7 +181,7 @@ func (suite *ClusterSuite) TestCreate() { suite.EqualValues(1, len(sessions)) suite.EqualValues(2, sessions[0].info.NodeID) suite.EqualValues(addr, sessions[0].info.Address) - channels := channelManager2.GetChannels() + channels := channelManager2.GetAssignedChannels() suite.EqualValues(1, len(channels)) suite.EqualValues(2, channels[0].NodeID) }) @@ -253,7 +253,7 @@ func (suite *ClusterSuite) TestRegister() { suite.NoError(err) bufferChannels := channelManager.GetBufferChannels() suite.Empty(bufferChannels.Channels) - nodeChannels := channelManager.GetChannels() + nodeChannels := channelManager.GetAssignedChannels() suite.EqualValues(1, len(nodeChannels)) suite.EqualValues(1, nodeChannels[0].NodeID) suite.EqualValues("ch1", nodeChannels[0].Channels[0].Name) @@ -287,7 +287,7 @@ func (suite *ClusterSuite) TestRegister() { suite.NoError(err) restartCluster := NewCluster(sessionManager2, channelManager2) defer restartCluster.Close() - channels := channelManager2.GetChannels() + channels := channelManager2.GetAssignedChannels() suite.Empty(channels) suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1) @@ -352,7 +352,7 @@ func (suite *ClusterSuite) TestUnregister() { err = cluster.UnRegister(nodeInfo1) suite.NoError(err) - channels := channelManager.GetChannels() + channels := channelManager.GetAssignedChannels() suite.EqualValues(1, len(channels)) suite.EqualValues(2, channels[0].NodeID) suite.EqualValues(1, len(channels[0].Channels)) @@ -386,7 +386,7 @@ func (suite *ClusterSuite) TestUnregister() { suite.NoError(err) err = cluster.UnRegister(nodeInfo) suite.NoError(err) - channels := channelManager.GetChannels() + channels := channelManager.GetAssignedChannels() suite.Empty(channels) channel := channelManager.GetBufferChannels() suite.NotNil(channel) @@ -433,7 +433,7 @@ func TestWatchIfNeeded(t *testing.T) { assert.NoError(t, err) err = cluster.Watch(ctx, "ch1", 1) assert.NoError(t, err) - channels := channelManager.GetChannels() + channels := channelManager.GetAssignedChannels() assert.EqualValues(t, 1, len(channels)) assert.EqualValues(t, "ch1", channels[0].Channels[0].Name) }) @@ -452,7 +452,7 @@ func TestWatchIfNeeded(t *testing.T) { err = cluster.Watch(ctx, "ch1", 1) assert.NoError(t, err) - channels := channelManager.GetChannels() + channels := channelManager.GetAssignedChannels() assert.Empty(t, channels) channel := channelManager.GetBufferChannels() assert.NotNil(t, channel) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index fe20cc51d9..233fc45cb3 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -4209,6 +4209,7 @@ var globalTestTikv = tikv.SetupLocalTxn() func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { var err error paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int())) + paramtable.Get().Save(Params.RocksmqCfg.CompressionTypes.Key, "0,0,0,0,0") factory := dependency.NewDefaultFactory(true) etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 2312856210..0985b03145 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/segmentutil" @@ -115,7 +116,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F var isUnimplemented bool err = retry.Do(ctx, func() error { - for _, channelInfo := range s.channelManager.GetChannels() { + for _, channelInfo := range s.channelManager.GetAssignedChannels() { nodeID := channelInfo.NodeID channels := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool { return channel.CollectionID == req.GetCollectionID() @@ -817,15 +818,37 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI rowCount = segment.NumOfRows } + // save the traffic of sending + binLogs, err := datacoord.CompressBinLog(segment.Binlogs) + if err != nil { + log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + + deltaLogs, err := datacoord.CompressBinLog(segment.Deltalogs) + if err != nil { + log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + + statLogs, err := datacoord.CompressBinLog(segment.Statslogs) + if err != nil { + log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ ID: segment.ID, PartitionID: segment.PartitionID, CollectionID: segment.CollectionID, InsertChannel: segment.InsertChannel, NumOfRows: rowCount, - Binlogs: segment.Binlogs, - Statslogs: segment.Statslogs, - Deltalogs: segment.Deltalogs, + Binlogs: binLogs, + Statslogs: statLogs, + Deltalogs: deltaLogs, }) } @@ -1223,7 +1246,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq } channels := make([]string, 0) - for _, channelInfo := range s.channelManager.GetChannels() { + for _, channelInfo := range s.channelManager.GetAssignedChannels() { filtered := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool { return channel.CollectionID == req.GetCollectionID() }) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 5ac473e8ec..faaf503dda 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -2,7 +2,6 @@ package datacoord import ( "context" - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -346,10 +345,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/binlog/file1", + LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), }, { - LogPath: "/binlog/file2", + LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), }, }, }, @@ -359,10 +358,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/stats_log/file1", + LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), }, { - LogPath: "/stats_log/file2", + LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), }, }, }, @@ -373,7 +372,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { { TimestampFrom: 0, TimestampTo: 1, - LogPath: "/stats_log/file1", + LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000), LogSize: 1, }, }, @@ -418,13 +417,23 @@ func TestGetRecoveryInfoV2(t *testing.T) { } resp, err := svr.GetRecoveryInfoV2(context.TODO(), req) assert.NoError(t, err) + assert.NoError(t, merr.Error(resp.Status)) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.EqualValues(t, 1, len(resp.GetSegments())) assert.EqualValues(t, 0, resp.GetSegments()[0].GetID()) assert.EqualValues(t, 1, len(resp.GetSegments()[0].GetBinlogs())) assert.EqualValues(t, 1, resp.GetSegments()[0].GetBinlogs()[0].GetFieldID()) - for i, binlog := range resp.GetSegments()[0].GetBinlogs()[0].GetBinlogs() { - assert.Equal(t, fmt.Sprintf("/binlog/file%d", i+1), binlog.GetLogPath()) + for _, binlog := range resp.GetSegments()[0].GetBinlogs()[0].GetBinlogs() { + assert.Equal(t, "", binlog.GetLogPath()) + assert.Equal(t, int64(801), binlog.GetLogID()) + } + for _, binlog := range resp.GetSegments()[0].GetStatslogs()[0].GetBinlogs() { + assert.Equal(t, "", binlog.GetLogPath()) + assert.Equal(t, int64(10000), binlog.GetLogID()) + } + for _, binlog := range resp.GetSegments()[0].GetDeltalogs()[0].GetBinlogs() { + assert.Equal(t, "", binlog.GetLogPath()) + assert.Equal(t, int64(100000), binlog.GetLogID()) } }) t.Run("with dropped segments", func(t *testing.T) { @@ -516,6 +525,197 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) + t.Run("with failed compress", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + return newMockRootCoordClient(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: newTestSchema(), + }) + + err = svr.meta.UpdateChannelCheckpoint("vchan2", &msgpb.MsgPosition{ + ChannelName: "vchan2", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.meta.AddCollection(&collectionInfo{ + ID: 2, + Schema: newTestSchema(), + }) + + err = svr.meta.UpdateChannelCheckpoint("vchan3", &msgpb.MsgPosition{ + ChannelName: "vchan3", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.channelManager.AddNode(0) + ch := &channel{ + Name: "vchan1", + CollectionID: 0, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + ch = &channel{ + Name: "vchan2", + CollectionID: 1, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + ch = &channel{ + Name: "vchan3", + CollectionID: 2, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + seg := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + binLogPaths := make([]*datapb.Binlog, 1) + // miss one field + path := metautil.JoinIDPath(0, 0, 8, fieldID) + path = path + "/mock" + binLogPaths[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: path, + } + + seg.Statslogs = append(seg.Statslogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths, + }) + + binLogPaths2 := make([]*datapb.Binlog, 1) + pathCorrect := metautil.JoinIDPath(0, 0, 8, fieldID, 1) + binLogPaths2[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: pathCorrect, + } + + seg.Binlogs = append(seg.Binlogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths2, + }) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg)) + assert.NoError(t, err) + + // make sure collection is indexed + err = svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 0, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + + svr.meta.segments.SetSegmentIndex(seg.ID, &model.SegmentIndex{ + SegmentID: seg.ID, + CollectionID: 0, + PartitionID: 0, + NumRows: 100, + IndexID: 0, + BuildID: 0, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + + req := &datapb.GetRecoveryInfoRequestV2{ + CollectionID: 0, + } + resp, err := svr.GetRecoveryInfoV2(context.TODO(), req) + assert.NoError(t, err) + assert.True(t, resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError) + + // test bin log + path = metautil.JoinIDPath(0, 0, 9, fieldID) + path = path + "/mock" + binLogPaths[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: path, + } + + seg2 := createSegment(9, 1, 0, 100, 40, "vchan2", commonpb.SegmentState_Flushed) + seg2.Binlogs = append(seg2.Binlogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths, + }) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + assert.NoError(t, err) + + // make sure collection is indexed + err = svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 1, + FieldID: 2, + IndexID: 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + + svr.meta.segments.SetSegmentIndex(seg2.ID, &model.SegmentIndex{ + SegmentID: seg2.ID, + CollectionID: 1, + PartitionID: 0, + NumRows: 100, + IndexID: 1, + BuildID: 0, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + req = &datapb.GetRecoveryInfoRequestV2{ + CollectionID: 1, + } + resp, err = svr.GetRecoveryInfoV2(context.TODO(), req) + assert.NoError(t, err) + assert.True(t, resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError) + }) + t.Run("with continuous compaction", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 4cdd076807..b1864d43dc 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -730,18 +730,12 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type func fillLogPathByLogID(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID typeutil.UniqueID, fieldBinlog *datapb.FieldBinlog, -) error { +) { for _, binlog := range fieldBinlog.Binlogs { - path, err := buildLogPath(chunkManagerRootPath, binlogType, collectionID, partitionID, + path := buildLogPath(chunkManagerRootPath, binlogType, collectionID, partitionID, segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID()) - if err != nil { - return err - } - binlog.LogPath = path } - - return nil } func fillLogIDByLogPath(multiFieldBinlogs ...[]*datapb.FieldBinlog) error { @@ -768,42 +762,85 @@ func fillLogIDByLogPath(multiFieldBinlogs ...[]*datapb.FieldBinlog) error { return nil } -// build a binlog path on the storage by metadata -func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, filedID, logID typeutil.UniqueID) (string, error) { - switch binlogType { - case storage.InsertBinlog: - path := metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) - return path, nil - case storage.DeleteBinlog: - path := metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID) - return path, nil - case storage.StatsBinlog: - path := metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) - return path, nil - default: - return "", fmt.Errorf("invalid binlog type: %d", binlogType) +func CompressBinLog(fieldBinLogs []*datapb.FieldBinlog) ([]*datapb.FieldBinlog, error) { + compressedFieldBinLogs := make([]*datapb.FieldBinlog, 0) + for _, fieldBinLog := range fieldBinLogs { + compressedFieldBinLog := &datapb.FieldBinlog{} + compressedFieldBinLog.FieldID = fieldBinLog.FieldID + for _, binlog := range fieldBinLog.Binlogs { + logPath := binlog.LogPath + idx := strings.LastIndex(logPath, "/") + if idx == -1 { + return nil, fmt.Errorf("invailed binlog path: %s", logPath) + } + logPathStr := logPath[(idx + 1):] + logID, err := strconv.ParseInt(logPathStr, 10, 64) + if err != nil { + return nil, err + } + binlog := &datapb.Binlog{ + EntriesNum: binlog.EntriesNum, + // remove timestamp since it's not necessary + LogSize: binlog.LogSize, + LogID: logID, + } + compressedFieldBinLog.Binlogs = append(compressedFieldBinLog.Binlogs, binlog) + } + compressedFieldBinLogs = append(compressedFieldBinLogs, compressedFieldBinLog) } + return compressedFieldBinLogs, nil } -func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) { - check := func(getSegmentID func(logPath string) typeutil.UniqueID) { +func DecompressBinLog(path string, info *datapb.SegmentInfo) error { + for _, fieldBinLogs := range info.GetBinlogs() { + fillLogPathByLogID(path, storage.InsertBinlog, info.CollectionID, info.PartitionID, info.ID, fieldBinLogs) + } + + for _, deltaLogs := range info.GetDeltalogs() { + fillLogPathByLogID(path, storage.DeleteBinlog, info.CollectionID, info.PartitionID, info.ID, deltaLogs) + } + + for _, statsLogs := range info.GetStatslogs() { + fillLogPathByLogID(path, storage.StatsBinlog, info.CollectionID, info.PartitionID, info.ID, statsLogs) + } + return nil +} + +// build a binlog path on the storage by metadata +func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, filedID, logID typeutil.UniqueID) string { + switch binlogType { + case storage.InsertBinlog: + return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) + case storage.DeleteBinlog: + return metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID) + case storage.StatsBinlog: + return metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) + } + // should not happen + log.Panic("invalid binlog type", zap.Any("type", binlogType)) + return "" +} + +func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) error { + check := func(getSegmentID func(logPath string) typeutil.UniqueID) error { for _, fieldBinlog := range logs { for _, binlog := range fieldBinlog.Binlogs { if segmentID != getSegmentID(binlog.LogPath) { - log.Panic("the segment path doesn't match the segmentID", zap.Int64("segmentID", segmentID), zap.String("path", binlog.LogPath)) + return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, path %s", segmentID, binlog.LogPath) } } } + return nil } switch binlogType { case storage.InsertBinlog: - check(metautil.GetSegmentIDFromInsertLogPath) + return check(metautil.GetSegmentIDFromInsertLogPath) case storage.DeleteBinlog: - check(metautil.GetSegmentIDFromDeltaLogPath) + return check(metautil.GetSegmentIDFromDeltaLogPath) case storage.StatsBinlog: - check(metautil.GetSegmentIDFromStatsLogPath) + return check(metautil.GetSegmentIDFromStatsLogPath) default: - log.Panic("invalid binlog type") + return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, type %d", segmentID, binlogType) } } @@ -820,9 +857,18 @@ func hasSepcialStatslog(logs *datapb.FieldBinlog) bool { func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog, ignoreNumberCheck bool, ) (map[string]string, error) { - checkBinlogs(storage.InsertBinlog, segmentID, binlogs) + err := checkBinlogs(storage.InsertBinlog, segmentID, binlogs) + if err != nil { + return nil, err + } checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs) + if err != nil { + return nil, err + } checkBinlogs(storage.StatsBinlog, segmentID, statslogs) + if err != nil { + return nil, err + } // check stats log and bin log size match // num of stats log may one more than num of binlogs if segment flushed and merged stats log if !ignoreNumberCheck && len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) { diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 6943461875..a87c2507ee 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -277,9 +277,9 @@ func Test_AddSegments(t *testing.T) { metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error")).Maybe() catalog := NewCatalog(metakv, rootPath, "") - assert.Panics(t, func() { - catalog.AddSegment(context.TODO(), invalidSegment) - }) + + err := catalog.AddSegment(context.TODO(), invalidSegment) + assert.Error(t, err) }) t.Run("save error", func(t *testing.T) { @@ -327,11 +327,10 @@ func Test_AlterSegments(t *testing.T) { metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error")).Maybe() catalog := NewCatalog(metakv, rootPath, "") - assert.Panics(t, func() { - catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{ - Segment: invalidSegment, - }) + err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{ + Segment: invalidSegment, }) + assert.Error(t, err) }) t.Run("save error", func(t *testing.T) { @@ -1059,6 +1058,54 @@ func TestCatalog_DropSegmentIndex(t *testing.T) { }) } +func TestCatalog_Compress(t *testing.T) { + segmentInfo := getSegment(rootPath, 0, 1, 2, 3, 10000) + val, err := proto.Marshal(segmentInfo) + assert.NoError(t, err) + + compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) + compressedSegmentInfo.Binlogs, err = CompressBinLog(compressedSegmentInfo.Binlogs) + assert.NoError(t, err) + compressedSegmentInfo.Deltalogs, err = CompressBinLog(compressedSegmentInfo.Deltalogs) + assert.NoError(t, err) + compressedSegmentInfo.Statslogs, err = CompressBinLog(compressedSegmentInfo.Statslogs) + assert.NoError(t, err) + + valCompressed, err := proto.Marshal(compressedSegmentInfo) + assert.NoError(t, err) + + assert.True(t, len(valCompressed) < len(val)) + + // make sure the compact + unmarshaledSegmentInfo := &datapb.SegmentInfo{} + proto.Unmarshal(val, unmarshaledSegmentInfo) + + unmarshaledSegmentInfoCompressed := &datapb.SegmentInfo{} + proto.Unmarshal(valCompressed, unmarshaledSegmentInfoCompressed) + DecompressBinLog(rootPath, unmarshaledSegmentInfoCompressed) + + assert.Equal(t, len(unmarshaledSegmentInfo.GetBinlogs()), len(unmarshaledSegmentInfoCompressed.GetBinlogs())) + for i := 0; i < 1000; i++ { + assert.Equal(t, unmarshaledSegmentInfo.GetBinlogs()[0].Binlogs[i].LogPath, unmarshaledSegmentInfoCompressed.GetBinlogs()[0].Binlogs[i].LogPath) + } + + // test compress erorr path + fakeBinlogs := make([]*datapb.Binlog, 1) + fakeBinlogs[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: "test", + } + fieldBinLogs := make([]*datapb.FieldBinlog, 1) + fieldBinLogs[0] = &datapb.FieldBinlog{ + FieldID: 106, + Binlogs: fakeBinlogs, + } + compressedSegmentInfo.Binlogs, err = CompressBinLog(fieldBinLogs) + assert.Error(t, err) + + // test decompress error path +} + func BenchmarkCatalog_List1000Segments(b *testing.B) { paramtable.Init() etcdCli, err := etcd.GetEtcdClient( @@ -1140,6 +1187,58 @@ func addSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i }, }, } + + statslogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())), + }, + }, + }, + } + + return &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + Deltalogs: deltalogs, + Statslogs: statslogs, + } +} + +func getSegment(rootPath string, collectionID, partitionID, segmentID, fieldID int64, binlogNum int) *datapb.SegmentInfo { + binLogPaths := make([]*datapb.Binlog, binlogNum) + for i := 0; i < binlogNum; i++ { + binLogPaths[i] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(i)), + } + } + binlogs = []*datapb.FieldBinlog{ + { + FieldID: fieldID, + Binlogs: binLogPaths, + }, + } + + deltalogs = []*datapb.FieldBinlog{ + { + FieldID: fieldID, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, int64(rand.Int())), + }, + }, + }, + } + statslogs = []*datapb.FieldBinlog{ { FieldID: 1, diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 1eaa0babe4..d85b2a0224 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -26,9 +26,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -147,6 +149,11 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti return nil, nil, err } + path := params.Params.MinioCfg.RootPath.GetValue() + // refill log ID with log path + for _, segmentInfo := range recoveryInfo.Segments { + datacoord.DecompressBinLog(path, segmentInfo) + } return recoveryInfo.Channels, recoveryInfo.Segments, nil } diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index bd4a91b629..6264e4a71b 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -46,7 +46,7 @@ const ( DefaultMaxAttempts = 10 DefaultInitialBackoff float64 = 0.2 DefaultMaxBackoff float64 = 10 - DefaultCompressionEnabled bool = false + DefaultCompressionEnabled bool = true ProxyInternalPort = 19529 ProxyExternalPort = 19530