enhance: [Cherry-pick] Cache segment row num, size, and insert count to reduce CGO calls (#28007) (#29679)

Cherry pick from master
pr: #28007
See also #29650

Signed-off-by: yah01 <yah2er0ne@outlook.com>
Co-authored-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
congqixia 2024-01-04 23:04:47 +08:00 committed by GitHub
parent 2d64cd780c
commit fc65f01ddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -144,8 +144,11 @@ type LocalSegment struct {
ptrLock sync.RWMutex // protects segmentPtr
ptr C.CSegmentInterface
size int64
row int64
// cached results, to avoid too many CGO calls
memSize *atomic.Int64
rowNum *atomic.Int64
insertCount *atomic.Int64
lastDeltaTimestamp *atomic.Uint64
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
}
@ -197,6 +200,10 @@ func NewSegment(ctx context.Context,
ptr: newPtr,
lastDeltaTimestamp: atomic.NewUint64(0),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
}
return segment, nil
@ -229,13 +236,8 @@ func (s *LocalSegment) InsertCount() int64 {
if !s.isValid() {
return 0
}
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRowCount(s.ptr)
return nil, nil
}).Await()
return int64(rowCount)
return s.insertCount.Load()
}
func (s *LocalSegment) RowNum() int64 {
@ -245,13 +247,19 @@ func (s *LocalSegment) RowNum() int64 {
if !s.isValid() {
return 0
}
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRealCount(s.ptr)
return nil, nil
}).Await()
return int64(rowCount)
rowNum := s.rowNum.Load()
if rowNum < 0 {
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRealCount(s.ptr)
s.rowNum.Store(int64(rowCount))
return nil, nil
}).Await()
rowNum = int64(rowCount)
}
return rowNum
}
func (s *LocalSegment) MemSize() int64 {
@ -261,13 +269,19 @@ func (s *LocalSegment) MemSize() int64 {
if !s.isValid() {
return 0
}
var memoryUsageInBytes C.int64_t
GetDynamicPool().Submit(func() (any, error) {
memoryUsageInBytes = C.GetMemoryUsageInBytes(s.ptr)
return nil, nil
}).Await()
return int64(memoryUsageInBytes)
memSize := s.memSize.Load()
if memSize < 0 {
var cMemSize C.int64_t
GetDynamicPool().Submit(func() (any, error) {
cMemSize = C.GetMemoryUsageInBytes(s.ptr)
s.memSize.Store(int64(cMemSize))
return nil, nil
}).Await()
memSize = int64(cMemSize)
}
return memSize
}
func (s *LocalSegment) LastDeltaTimestamp() uint64 {
@ -518,6 +532,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if err := HandleCStatus(ctx, &status, "Insert failed"); err != nil {
return err
}
s.insertCount.Add(int64(numOfRow))
s.rowNum.Store(-1)
s.memSize.Store(-1)
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(s.collectionID),
@ -596,6 +614,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
return err
}
s.rowNum.Store(-1)
s.lastDeltaTimestamp.Store(timestamps[len(timestamps)-1])
return nil
@ -651,6 +670,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
return err
}
s.insertCount.Store(rowCount)
log.Info("load mutil field done",
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))
@ -708,6 +728,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
return err
}
s.insertCount.Store(rowCount)
log.Info("load field done")
return nil
@ -837,6 +858,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
return err
}
s.rowNum.Store(-1)
s.lastDeltaTimestamp.Store(tss[len(tss)-1])
log.Info("load deleted record done",