From 8041fc3c75a565aaf4773fc146e3da6a4e809fbd Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 26 Oct 2023 10:10:10 +0800 Subject: [PATCH] avoid add empty growing segment to delegator distribution (#27930) Signed-off-by: Wei Liu --- .../querynodev2/delegator/delegator_data.go | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index b538f61a9e..ebce727bce 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -76,28 +76,6 @@ func (d *DeleteData) Append(ad DeleteData) { d.RowCount += ad.RowCount } -func (sd *shardDelegator) newGrowing(segmentID int64, insertData *InsertData) segments.Segment { - log := sd.getLogger(context.Background()).With(zap.Int64("segmentID", segmentID)) - - segment, err := segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName, segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition) - if err != nil { - log.Error("failed to create new segment", zap.Error(err)) - panic(err) - } - - sd.pkOracle.Register(segment, paramtable.GetNodeID()) - - sd.segmentManager.Put(segments.SegmentTypeGrowing, segment) - sd.addGrowing(SegmentEntry{ - NodeID: paramtable.GetNodeID(), - SegmentID: segmentID, - PartitionID: insertData.PartitionID, - Version: 0, - TargetVersion: initialTargetVersion, - }) - return segment -} - // ProcessInsert handles insert data in delegator. func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { method := "ProcessInsert" @@ -106,7 +84,15 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { for segmentID, insertData := range insertRecords { growing := sd.segmentManager.GetGrowing(segmentID) if growing == nil { - growing = sd.newGrowing(segmentID, insertData) + var err error + growing, err = segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName, + segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition) + if err != nil { + log.Error("failed to create new segment", + zap.Int64("segmentID", segmentID), + zap.Error(err)) + panic(err) + } } err := growing.Insert(insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord) @@ -124,6 +110,19 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { } growing.UpdateBloomFilter(insertData.PrimaryKeys) + if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) { + // register created growing segment after insert, avoid to add empty growing to delegator + sd.pkOracle.Register(growing, paramtable.GetNodeID()) + sd.segmentManager.Put(segments.SegmentTypeGrowing, growing) + sd.addGrowing(SegmentEntry{ + NodeID: paramtable.GetNodeID(), + SegmentID: segmentID, + PartitionID: insertData.PartitionID, + Version: 0, + TargetVersion: initialTargetVersion, + }) + } + log.Debug("insert into growing segment", zap.Int64("collectionID", growing.Collection()), zap.Int64("segmentID", segmentID),