From 9b1a7287d1076366fa6ff540824f1f46c9ccd85a Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 11 Dec 2024 19:24:44 +0800 Subject: [PATCH] enhance: Optimize save collection target latency (#38345) (#38370) issue: #38237 pr: #38345 this PR only use better compression level for proto msg which is larger than 1MB, and use a lighter compression level for smaller proto msg, which could get a better latency in most case. this PR could reduce the latency from 22.7s to 4.7s with 10000 collctions and each collections has 1000 segments. before this PR: BenchmarkTargetManager-8 1 22781536357 ns/op 566407275088 B/op 11188282 allocs/op after this PR: BenchmarkTargetManager-8 1 4729566944 ns/op 36713248864 B/op 10963615 allocs/op Signed-off-by: Wei Liu --- .../metastore/kv/querycoord/kv_catalog.go | 8 +++- .../querycoordv2/meta/target_manager_test.go | 47 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index ce546d5234..3c5dcce661 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -270,8 +270,14 @@ func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) err if err != nil { return err } + + // only compress data when size is larger than 1MB + compressLevel := zstd.SpeedFastest + if len(v) > 1024*1024 { + compressLevel = zstd.SpeedBetterCompression + } var compressed bytes.Buffer - compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(compressLevel)) kvs[k] = compressed.String() } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 3e3954b24d..65d0a8d745 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -602,6 +602,53 @@ func (suite *TargetManagerSuite) TestRecover() { suite.Len(targets, 0) } +func BenchmarkTargetManager(b *testing.B) { + paramtable.Init() + config := GenerateEtcdConfig() + cli, _ := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + + kv := etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + catalog := querycoord.NewCatalog(kv) + idAllocator := RandomIncrementIDAllocator() + meta := NewMeta(idAllocator, catalog, session.NewNodeManager()) + mgr := NewTargetManager(nil, meta) + + segmentNum := 1000 + segments := make(map[int64]*datapb.SegmentInfo) + for i := 0; i < segmentNum; i++ { + segments[int64(i)] = &datapb.SegmentInfo{ + ID: int64(i), + InsertChannel: "channel-1", + } + } + + channels := map[string]*DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: int64(1), + ChannelName: "channel-1", + }, + }, + } + + collectionNum := 10000 + for i := 0; i < collectionNum; i++ { + mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + mgr.SaveCurrentTarget(catalog) + } +} + func TestTargetManager(t *testing.T) { suite.Run(t, new(TargetManagerSuite)) }