From 6426e3d0372e2a3b3fc81d008d3c9fda52dfbce3 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Wed, 8 Jun 2022 15:08:07 +0800 Subject: [PATCH] Split Etcd save operation into small txns (#17386) Signed-off-by: xiaofan-luan --- internal/datacoord/channel_store.go | 3 ++- internal/datacoord/meta.go | 27 +++++++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 725ad00dff..7f6108bccc 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -32,7 +32,8 @@ import ( const ( bufferID = math.MinInt64 delimiter = "/" - maxOperationsPerTxn = 128 + maxOperationsPerTxn = 64 + maxBytesPerTxn = 1024 * 1024 ) var errUnknownOpType = errors.New("unknown operation type") diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 3225722395..c92fa1bdae 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -250,6 +250,13 @@ func (m *meta) UpdateFlushSegmentsInfo( m.Lock() defer m.Unlock() + log.Info("update flush segments info", zap.Int64("segmentId", segmentID), + zap.Int("binlog", len(binlogs)), + zap.Int("statslog", len(statslogs)), + zap.Int("deltalogs", len(deltalogs)), + zap.Bool("flushed", flushed), + zap.Bool("dropped", dropped), + zap.Bool("importing", importing)) segment := m.segments.GetSegment(segmentID) if importing { m.segments.SetRowCount(segmentID, segment.currRows) @@ -273,7 +280,7 @@ func (m *meta) UpdateFlushSegmentsInfo( clonedSegment.DroppedAt = uint64(time.Now().UnixNano()) modSegments[segmentID] = clonedSegment } - + // TODO add diff encoding and compression currBinlogs := clonedSegment.GetBinlogs() var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog { @@ -390,6 +397,7 @@ func (m *meta) UpdateFlushSegmentsInfo( for id, s := range modSegments { m.segments.SetSegment(id, s) } + log.Info("update flush segments info successfully", zap.Int64("segmentId", segmentID)) return nil } @@ -502,7 +510,7 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo { // ** the last batch must contains at least one segment // 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel // since the flag is not marked so DataNode can re-consume the drop collection msg -// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel +// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error { // the limitation of etcd operations number per transaction is 128, since segment number might be enormous so we shall split @@ -510,23 +518,21 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm // since the removal flag shall always be with the last batch, so the last batch shall be maxOperationNumber - 1 for len(modSegments) > maxOperationsPerTxn-1 { - err := m.saveDropSegmentAndRemove(channel, modSegments, false, func(kv map[string]string, modSegments map[int64]*SegmentInfo) bool { - // batch filled or only one segment left - // since the last batch must contains at least on segment - return len(kv) == maxOperationsPerTxn || len(modSegments) == 1 - }) + err := m.saveDropSegmentAndRemove(channel, modSegments, false) if err != nil { return err } } // removal flag should be saved with last batch - return m.saveDropSegmentAndRemove(channel, modSegments, true, func(_ map[string]string, _ map[int64]*SegmentInfo) bool { return false }) + return m.saveDropSegmentAndRemove(channel, modSegments, true) } -func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool, stopper func(kv map[string]string, modSegment map[int64]*SegmentInfo) bool) error { +func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool) error { kv := make(map[string]string) update := make([]*SegmentInfo, 0, maxOperationsPerTxn) + + size := 0 for id, s := range modSegments { key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID()) delete(modSegments, id) @@ -536,7 +542,8 @@ func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*S } kv[key] = string(segBytes) update = append(update, s) - if stopper(kv, modSegments) { + size += len(key) + len(segBytes) + if len(kv) == maxOperationsPerTxn || len(modSegments) == 1 || size >= maxBytesPerTxn { break } }