From fc65f01ddd65cc638ab95f85eb38e6904f67bd81 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 4 Jan 2024 23:04:47 +0800 Subject: [PATCH] enhance: [Cherry-pick] Cache segment row num, size, and insert count to reduce CGO calls (#28007) (#29679) Cherry pick from master pr: #28007 See also #29650 Signed-off-by: yah01 Co-authored-by: yah01 --- internal/querynodev2/segments/segment.go | 62 ++++++++++++++++-------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 2c574a140c..e2d1ac5ab9 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -144,8 +144,11 @@ type LocalSegment struct { ptrLock sync.RWMutex // protects segmentPtr ptr C.CSegmentInterface - size int64 - row int64 + // cached results, to avoid too many CGO calls + memSize *atomic.Int64 + rowNum *atomic.Int64 + insertCount *atomic.Int64 + lastDeltaTimestamp *atomic.Uint64 fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] } @@ -197,6 +200,10 @@ func NewSegment(ctx context.Context, ptr: newPtr, lastDeltaTimestamp: atomic.NewUint64(0), fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), + + memSize: atomic.NewInt64(-1), + rowNum: atomic.NewInt64(-1), + insertCount: atomic.NewInt64(0), } return segment, nil @@ -229,13 +236,8 @@ func (s *LocalSegment) InsertCount() int64 { if !s.isValid() { return 0 } - var rowCount C.int64_t - GetDynamicPool().Submit(func() (any, error) { - rowCount = C.GetRowCount(s.ptr) - return nil, nil - }).Await() - return int64(rowCount) + return s.insertCount.Load() } func (s *LocalSegment) RowNum() int64 { @@ -245,13 +247,19 @@ func (s *LocalSegment) RowNum() int64 { if !s.isValid() { return 0 } - var rowCount C.int64_t - GetDynamicPool().Submit(func() (any, error) { - rowCount = C.GetRealCount(s.ptr) - return nil, nil - }).Await() - return int64(rowCount) + rowNum := s.rowNum.Load() + if rowNum < 0 { + var rowCount C.int64_t + GetDynamicPool().Submit(func() (any, error) { + rowCount = C.GetRealCount(s.ptr) + s.rowNum.Store(int64(rowCount)) + return nil, nil + }).Await() + rowNum = int64(rowCount) + } + + return rowNum } func (s *LocalSegment) MemSize() int64 { @@ -261,13 +269,19 @@ func (s *LocalSegment) MemSize() int64 { if !s.isValid() { return 0 } - var memoryUsageInBytes C.int64_t - GetDynamicPool().Submit(func() (any, error) { - memoryUsageInBytes = C.GetMemoryUsageInBytes(s.ptr) - return nil, nil - }).Await() - return int64(memoryUsageInBytes) + memSize := s.memSize.Load() + if memSize < 0 { + var cMemSize C.int64_t + GetDynamicPool().Submit(func() (any, error) { + cMemSize = C.GetMemoryUsageInBytes(s.ptr) + s.memSize.Store(int64(cMemSize)) + return nil, nil + }).Await() + + memSize = int64(cMemSize) + } + return memSize } func (s *LocalSegment) LastDeltaTimestamp() uint64 { @@ -518,6 +532,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] if err := HandleCStatus(ctx, &status, "Insert failed"); err != nil { return err } + + s.insertCount.Add(int64(numOfRow)) + s.rowNum.Store(-1) + s.memSize.Store(-1) metrics.QueryNodeNumEntities.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.collectionID), @@ -596,6 +614,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary return err } + s.rowNum.Store(-1) s.lastDeltaTimestamp.Store(timestamps[len(timestamps)-1]) return nil @@ -651,6 +670,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f return err } + s.insertCount.Store(rowCount) log.Info("load mutil field done", zap.Int64("row count", rowCount), zap.Int64("segmentID", s.ID())) @@ -708,6 +728,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun return err } + s.insertCount.Store(rowCount) log.Info("load field done") return nil @@ -837,6 +858,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del return err } + s.rowNum.Store(-1) s.lastDeltaTimestamp.Store(tss[len(tss)-1]) log.Info("load deleted record done",