From 950203aba03acf9aacb198702edd2a8f65e6477f Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 11 Dec 2024 10:12:43 +0800 Subject: [PATCH] enhance: Optimize save colelction target latency (#38345) issue: #38237 this PR only use better compression level for proto msg which is larger than 1MB, and use a lighter compression level for smaller proto msg, which could get a better latency in most case. this PR could reduce the latency from 22.7s to 4.7s with 10000 collctions and each collections has 1000 segments. before this PR: BenchmarkTargetManager-8 1 22781536357 ns/op 566407275088 B/op 11188282 allocs/op after this PR: BenchmarkTargetManager-8 1 4729566944 ns/op 36713248864 B/op 10963615 allocs/op Signed-off-by: Wei Liu --- .../metastore/kv/querycoord/kv_catalog.go | 8 +++- .../querycoordv2/meta/target_manager_test.go | 47 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index fd0c1a8476..026bf2b01d 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -279,8 +279,14 @@ func (s Catalog) SaveCollectionTargets(ctx context.Context, targets ...*querypb. if err != nil { return err } + + // only compress data when size is larger than 1MB + compressLevel := zstd.SpeedFastest + if len(v) > 1024*1024 { + compressLevel = zstd.SpeedBetterCompression + } var compressed bytes.Buffer - compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(compressLevel)) kvs[k] = compressed.String() } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index e7ca040ecb..c78b7207a8 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -675,6 +675,53 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() { assert.Len(suite.T(), currentTarget[0].Segments, 2) } +func BenchmarkTargetManager(b *testing.B) { + paramtable.Init() + config := GenerateEtcdConfig() + cli, _ := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + + kv := etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + catalog := querycoord.NewCatalog(kv) + idAllocator := RandomIncrementIDAllocator() + meta := NewMeta(idAllocator, catalog, session.NewNodeManager()) + mgr := NewTargetManager(nil, meta) + + segmentNum := 1000 + segments := make(map[int64]*datapb.SegmentInfo) + for i := 0; i < segmentNum; i++ { + segments[int64(i)] = &datapb.SegmentInfo{ + ID: int64(i), + InsertChannel: "channel-1", + } + } + + channels := map[string]*DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: int64(1), + ChannelName: "channel-1", + }, + }, + } + + collectionNum := 10000 + for i := 0; i < collectionNum; i++ { + mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + mgr.SaveCurrentTarget(context.TODO(), catalog) + } +} + func TestTargetManager(t *testing.T) { suite.Run(t, new(TargetManagerSuite)) }