From 245874a97092a7e68ba1e155d829e71552d02158 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Fri, 29 Oct 2021 18:04:49 +0800 Subject: [PATCH] Fix too many operations in txn request when save segmentInfo (#10909) Signed-off-by: xige-16 --- internal/querycoord/meta.go | 38 +++++++++++++++++++++----------- internal/querycoord/task_test.go | 5 +++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index de0d302b01..6784d64009 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -455,7 +455,8 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal globalSeekPositionTmp = queryChannelInfo.SeekPosition } - saveKvs := make(map[string]string) + // save segmentInfo to etcd + segmentInfoKvs := make(map[string]string) for _, infos := range saves { for _, info := range infos { segmentInfoBytes, err := proto.Marshal(info) @@ -463,10 +464,18 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal return col2SegmentChangeInfos, err } segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) - saveKvs[segmentKey] = string(segmentInfoBytes) + segmentInfoKvs[segmentKey] = string(segmentInfoBytes) + } + } + for key, value := range segmentInfoKvs { + err := m.client.Save(key, value) + if err != nil { + panic(err) } } + // save queryChannelInfo and sealedSegmentsChangeInfo to etcd + saveKvs := make(map[string]string) for collectionID, queryChannelInfo := range queryChannelInfosMap { channelInfoBytes, err := proto.Marshal(queryChannelInfo) if err != nil { @@ -496,9 +505,9 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal err = m.client.MultiSave(saveKvs) if err != nil { - log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err)) - return col2SegmentChangeInfos, err + panic(err) } + m.segmentMu.Lock() for _, segmentInfos := range saves { for _, info := range segmentInfos { @@ -576,6 +585,15 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio } queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos + // remove meta from etcd + for _, info := range removes { + segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) + err = m.client.Remove(segmentKey) + if err != nil { + panic(err) + } + } + // save meta to etcd saveKvs := make(map[string]string) channelInfoBytes, err := proto.Marshal(queryChannelInfo) @@ -601,17 +619,11 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID) saveKvs[changeInfoKey] = string(changeInfoBytes) - removeKeys := make([]string, 0) - for _, info := range removes { - segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) - removeKeys = append(removeKeys, segmentKey) + err = m.client.MultiSave(saveKvs) + if err != nil { + panic(err) } - err = m.client.MultiSaveAndRemove(saveKvs, removeKeys) - if err != nil { - log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err)) - return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err - } m.segmentMu.Lock() for _, info := range removes { delete(m.segmentInfos, info.SegmentID) diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index 8cfe46ac15..4de48f508a 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -730,8 +730,9 @@ func Test_reverseSealedSegmentChangeInfo(t *testing.T) { } queryCoord.meta.setKvClient(kv) - err = updateSegmentInfoFromTask(ctx, parentTask, queryCoord.meta) - assert.NotNil(t, err) + assert.Panics(t, func() { + updateSegmentInfoFromTask(ctx, parentTask, queryCoord.meta) + }) queryCoord.Stop() err = removeAllSession()