From 4d03da9c4fb280dfd4888d950513fe05e886d561 Mon Sep 17 00:00:00 2001 From: sunby Date: Sun, 25 Apr 2021 09:51:57 +0800 Subject: [PATCH] Remove `SetSegmentState` and seal segment in segment allocator (#4994) Segment should be sealed once it exceeds the limited size. This operation should be done in segment allocator. Watcher only check the status to decide whether the segment will be flushed. Signed-off-by: sunby --- internal/dataservice/meta.go | 32 +---- internal/dataservice/meta_test.go | 8 -- internal/dataservice/segment_allocator.go | 144 ++++++++++++---------- internal/dataservice/watcher.go | 4 - 4 files changed, 82 insertions(+), 106 deletions(-) diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index db6adf13ce..6ba7e58f96 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -218,22 +218,6 @@ func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) { return proto.Clone(segment).(*datapb.SegmentInfo), nil } -func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error { - meta.Lock() - defer meta.Unlock() - - segInfo, ok := meta.segments[segmentID] - if !ok { - return newErrSegmentNotFound(segmentID) - } - - segInfo.OpenTime = timetick - if err := meta.saveSegmentInfo(segInfo); err != nil { - return err - } - return nil -} - func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error { meta.Lock() defer meta.Unlock() @@ -244,6 +228,7 @@ func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error { } segInfo.SealedTime = timetick + segInfo.State = commonpb.SegmentState_Sealed if err := meta.saveSegmentInfo(segInfo); err != nil { return err } @@ -266,21 +251,6 @@ func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error { return nil } -func (meta *meta) SetSegmentState(segmentID UniqueID, state commonpb.SegmentState) error { - meta.Lock() - defer meta.Unlock() - - segInfo, ok := meta.segments[segmentID] - if !ok { - return newErrSegmentNotFound(segmentID) - } - segInfo.State = state - if err := meta.saveSegmentInfo(segInfo); err != nil { - return err - } - return nil -} - func (meta *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID { meta.RLock() defer meta.RUnlock() diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index 88a445b75c..44276109d9 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -173,9 +173,6 @@ func TestMeta_Basic(t *testing.T) { assert.EqualValues(t, 1, len(segIDs)) assert.Contains(t, segIDs, segID1_1) - // check OpenSegment/SealSegment/FlushSegment - err = meta.OpenSegment(segID0_0, 100) - assert.Nil(t, err) err = meta.SealSegment(segID0_0, 200) assert.Nil(t, err) err = meta.FlushSegment(segID0_0, 300) @@ -183,7 +180,6 @@ func TestMeta_Basic(t *testing.T) { info0_0, err = meta.GetSegment(segID0_0) assert.Nil(t, err) - assert.NotZero(t, info0_0.OpenTime) assert.NotZero(t, info0_0.SealedTime) assert.NotZero(t, info0_0.FlushedTime) @@ -288,10 +284,6 @@ func TestMeta_Basic(t *testing.T) { err = meta.DropSegment(segIDInvalid) assert.NotNil(t, err) - // check open non-exist segment - err = meta.OpenSegment(segIDInvalid, 100) - assert.NotNil(t, err) - // check seal non-exist segment err = meta.SealSegment(segIDInvalid, 200) assert.NotNil(t, err) diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index 93c63738e3..626e59d11a 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -113,19 +113,19 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...Optio return alloc } -func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error { +func (s *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - if _, ok := allocator.segments[segmentInfo.ID]; ok { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.segments[segmentInfo.ID]; ok { return fmt.Errorf("segment %d already exist", segmentInfo.ID) } - return allocator.open(segmentInfo) + return s.open(segmentInfo) } -func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { - totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID) +func (s *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { + totalRows, err := s.estimateTotalRows(segmentInfo.CollectionID) if err != nil { return err } @@ -133,7 +133,7 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), zap.Int("Rows", totalRows)) - allocator.segments[segmentInfo.ID] = &segmentStatus{ + s.segments[segmentInfo.ID] = &segmentStatus{ id: segmentInfo.ID, collectionID: segmentInfo.CollectionID, partitionID: segmentInfo.PartitionID, @@ -145,20 +145,20 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { return nil } -func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID, +func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() - for _, segStatus := range allocator.segments { + for _, segStatus := range s.segments { if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID || segStatus.insertChannel != channelName { continue } var success bool - success, err = allocator.alloc(segStatus, requestRows) + success, err = s.alloc(segStatus, requestRows) if err != nil { return } @@ -172,12 +172,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI } var segStatus *segmentStatus - segStatus, err = allocator.openNewSegment(ctx, collectionID, partitionID, channelName) + segStatus, err = s.openNewSegment(ctx, collectionID, partitionID, channelName) if err != nil { return } var success bool - success, err = allocator.alloc(segStatus, requestRows) + success, err = s.alloc(segStatus, requestRows) if err != nil { return } @@ -192,12 +192,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI return } -func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) { +func (s *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) { totalOfAllocations := 0 for _, allocation := range segStatus.allocations { totalOfAllocations += allocation.rowNums } - segMeta, err := allocator.mt.GetSegment(segStatus.id) + segMeta, err := s.mt.GetSegment(segStatus.id) if err != nil { return false, err } @@ -209,12 +209,12 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) return false, nil } - ts, err := allocator.allocator.allocTimestamp() + ts, err := s.allocator.allocTimestamp() if err != nil { return false, err } physicalTs, logicalTs := tsoutil.ParseTS(ts) - expirePhysicalTs := physicalTs.Add(time.Duration(allocator.segmentExpireDuration) * time.Millisecond) + expirePhysicalTs := physicalTs.Add(time.Duration(s.segmentExpireDuration) * time.Millisecond) expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs)) segStatus.lastExpireTime = expireTs segStatus.allocations = append(segStatus.allocations, &allocation{ @@ -225,10 +225,10 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) return true, nil } -func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) { +func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - id, err := allocator.allocator.allocID() + id, err := s.allocator.allocID() if err != nil { return nil, err } @@ -236,10 +236,10 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio if err != nil { return nil, err } - if err = allocator.mt.AddSegment(segmentInfo); err != nil { + if err = s.mt.AddSegment(segmentInfo); err != nil { return nil, err } - if err = allocator.open(segmentInfo); err != nil { + if err = s.open(segmentInfo); err != nil { return nil, err } infoMsg := &msgstream.SegmentInfoMsg{ @@ -259,16 +259,16 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio msgPack := &msgstream.MsgPack{ Msgs: []msgstream.TsMsg{infoMsg}, } - if allocator.segmentInfoStream != nil { - if err = allocator.segmentInfoStream.Produce(msgPack); err != nil { + if s.segmentInfoStream != nil { + if err = s.segmentInfoStream.Produce(msgPack); err != nil { return nil, err } } - return allocator.segments[segmentInfo.ID], nil + return s.segments[segmentInfo.ID], nil } -func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { - collMeta, err := allocator.mt.GetCollection(collectionID) +func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { + collMeta, err := s.mt.GetCollection(collectionID) if err != nil { return -1, err } @@ -276,21 +276,27 @@ func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int if err != nil { return -1, err } - return int(allocator.segmentThreshold / float64(sizePerRecord)), nil + return int(s.segmentThreshold / float64(sizePerRecord)), nil } -func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) { +func (s *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() keys := make([]UniqueID, 0) - for _, segStatus := range allocator.segments { + for _, segStatus := range s.segments { if !segStatus.sealed { - sealed, err := allocator.checkSegmentSealed(segStatus) + sealed, err := s.checkSegmentSealed(segStatus) if err != nil { return nil, err } + if !sealed { + continue + } + if err := s.sealSegmentInMeta(segStatus.id); err != nil { + return nil, err + } segStatus.sealed = sealed } if segStatus.sealed { @@ -300,50 +306,62 @@ func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]Uni return keys, nil } -func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { - segMeta, err := allocator.mt.GetSegment(segStatus.id) +func (s *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { + segMeta, err := s.mt.GetSegment(segStatus.id) if err != nil { return false, err } - return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil + return float64(segMeta.NumRows) >= s.segmentThresholdFactor*float64(segStatus.total), nil } -func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error { +func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - status, ok := allocator.segments[segmentID] + s.mu.Lock() + defer s.mu.Unlock() + status, ok := s.segments[segmentID] if !ok { return nil } + + if err := s.sealSegmentInMeta(segmentID); err != nil { + return err + } status.sealed = true return nil } -func (allocator *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool { +func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - _, ok := allocator.segments[segmentID] + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.segments[segmentID] return ok } -func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) { - sp, _ := trace.StartSpanFromContext(ctx) - defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - delete(allocator.segments, segmentID) +func (s *segmentAllocator) sealSegmentInMeta(id UniqueID) error { + ts, err := s.allocator.allocTimestamp() + if err != nil { + return err + } + return s.mt.SealSegment(id, ts) } -func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error { +func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - for _, segStatus := range allocator.segments { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.segments, segmentID) +} + +func (s *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error { + sp, _ := trace.StartSpanFromContext(ctx) + defer sp.Finish() + s.mu.Lock() + defer s.mu.Unlock() + for _, segStatus := range s.segments { for i := 0; i < len(segStatus.allocations); i++ { if timeTick < segStatus.allocations[i].expireTime { continue @@ -358,24 +376,24 @@ func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTi return nil } -func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) { +func (s *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.RLock() - defer allocator.mu.RUnlock() - status, ok := allocator.segments[segmentID] + s.mu.RLock() + defer s.mu.RUnlock() + status, ok := s.segments[segmentID] if !ok { return false, fmt.Errorf("segment %d not found", segmentID) } return status.lastExpireTime <= ts, nil } -func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) { +func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - for _, status := range allocator.segments { + s.mu.Lock() + defer s.mu.Unlock() + for _, status := range s.segments { if status.collectionID == collectionID { if status.sealed { continue diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 25802d3ced..8ba133b9a7 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -107,10 +107,6 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err)) continue } - if err = watcher.meta.SetSegmentState(id, commonpb.SegmentState_Sealed); err != nil { - log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err)) - continue - } collID, segID := sInfo.CollectionID, sInfo.ID coll2Segs[collID] = append(coll2Segs[collID], segID) watcher.allocator.DropSegment(ctx, id)