diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index db6adf13ce..6ba7e58f96 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -218,22 +218,6 @@ func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) { return proto.Clone(segment).(*datapb.SegmentInfo), nil } -func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error { - meta.Lock() - defer meta.Unlock() - - segInfo, ok := meta.segments[segmentID] - if !ok { - return newErrSegmentNotFound(segmentID) - } - - segInfo.OpenTime = timetick - if err := meta.saveSegmentInfo(segInfo); err != nil { - return err - } - return nil -} - func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error { meta.Lock() defer meta.Unlock() @@ -244,6 +228,7 @@ func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error { } segInfo.SealedTime = timetick + segInfo.State = commonpb.SegmentState_Sealed if err := meta.saveSegmentInfo(segInfo); err != nil { return err } @@ -266,21 +251,6 @@ func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error { return nil } -func (meta *meta) SetSegmentState(segmentID UniqueID, state commonpb.SegmentState) error { - meta.Lock() - defer meta.Unlock() - - segInfo, ok := meta.segments[segmentID] - if !ok { - return newErrSegmentNotFound(segmentID) - } - segInfo.State = state - if err := meta.saveSegmentInfo(segInfo); err != nil { - return err - } - return nil -} - func (meta *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID { meta.RLock() defer meta.RUnlock() diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index 88a445b75c..44276109d9 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -173,9 +173,6 @@ func TestMeta_Basic(t *testing.T) { assert.EqualValues(t, 1, len(segIDs)) assert.Contains(t, segIDs, segID1_1) - // check OpenSegment/SealSegment/FlushSegment - err = meta.OpenSegment(segID0_0, 100) - assert.Nil(t, err) err = meta.SealSegment(segID0_0, 200) assert.Nil(t, err) err = meta.FlushSegment(segID0_0, 300) @@ -183,7 +180,6 @@ func TestMeta_Basic(t *testing.T) { info0_0, err = meta.GetSegment(segID0_0) assert.Nil(t, err) - assert.NotZero(t, info0_0.OpenTime) assert.NotZero(t, info0_0.SealedTime) assert.NotZero(t, info0_0.FlushedTime) @@ -288,10 +284,6 @@ func TestMeta_Basic(t *testing.T) { err = meta.DropSegment(segIDInvalid) assert.NotNil(t, err) - // check open non-exist segment - err = meta.OpenSegment(segIDInvalid, 100) - assert.NotNil(t, err) - // check seal non-exist segment err = meta.SealSegment(segIDInvalid, 200) assert.NotNil(t, err) diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index 93c63738e3..626e59d11a 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -113,19 +113,19 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...Optio return alloc } -func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error { +func (s *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - if _, ok := allocator.segments[segmentInfo.ID]; ok { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.segments[segmentInfo.ID]; ok { return fmt.Errorf("segment %d already exist", segmentInfo.ID) } - return allocator.open(segmentInfo) + return s.open(segmentInfo) } -func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { - totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID) +func (s *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { + totalRows, err := s.estimateTotalRows(segmentInfo.CollectionID) if err != nil { return err } @@ -133,7 +133,7 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), zap.Int("Rows", totalRows)) - allocator.segments[segmentInfo.ID] = &segmentStatus{ + s.segments[segmentInfo.ID] = &segmentStatus{ id: segmentInfo.ID, collectionID: segmentInfo.CollectionID, partitionID: segmentInfo.PartitionID, @@ -145,20 +145,20 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error { return nil } -func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID, +func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() - for _, segStatus := range allocator.segments { + for _, segStatus := range s.segments { if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID || segStatus.insertChannel != channelName { continue } var success bool - success, err = allocator.alloc(segStatus, requestRows) + success, err = s.alloc(segStatus, requestRows) if err != nil { return } @@ -172,12 +172,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI } var segStatus *segmentStatus - segStatus, err = allocator.openNewSegment(ctx, collectionID, partitionID, channelName) + segStatus, err = s.openNewSegment(ctx, collectionID, partitionID, channelName) if err != nil { return } var success bool - success, err = allocator.alloc(segStatus, requestRows) + success, err = s.alloc(segStatus, requestRows) if err != nil { return } @@ -192,12 +192,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI return } -func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) { +func (s *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) { totalOfAllocations := 0 for _, allocation := range segStatus.allocations { totalOfAllocations += allocation.rowNums } - segMeta, err := allocator.mt.GetSegment(segStatus.id) + segMeta, err := s.mt.GetSegment(segStatus.id) if err != nil { return false, err } @@ -209,12 +209,12 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) return false, nil } - ts, err := allocator.allocator.allocTimestamp() + ts, err := s.allocator.allocTimestamp() if err != nil { return false, err } physicalTs, logicalTs := tsoutil.ParseTS(ts) - expirePhysicalTs := physicalTs.Add(time.Duration(allocator.segmentExpireDuration) * time.Millisecond) + expirePhysicalTs := physicalTs.Add(time.Duration(s.segmentExpireDuration) * time.Millisecond) expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs)) segStatus.lastExpireTime = expireTs segStatus.allocations = append(segStatus.allocations, &allocation{ @@ -225,10 +225,10 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) return true, nil } -func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) { +func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - id, err := allocator.allocator.allocID() + id, err := s.allocator.allocID() if err != nil { return nil, err } @@ -236,10 +236,10 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio if err != nil { return nil, err } - if err = allocator.mt.AddSegment(segmentInfo); err != nil { + if err = s.mt.AddSegment(segmentInfo); err != nil { return nil, err } - if err = allocator.open(segmentInfo); err != nil { + if err = s.open(segmentInfo); err != nil { return nil, err } infoMsg := &msgstream.SegmentInfoMsg{ @@ -259,16 +259,16 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio msgPack := &msgstream.MsgPack{ Msgs: []msgstream.TsMsg{infoMsg}, } - if allocator.segmentInfoStream != nil { - if err = allocator.segmentInfoStream.Produce(msgPack); err != nil { + if s.segmentInfoStream != nil { + if err = s.segmentInfoStream.Produce(msgPack); err != nil { return nil, err } } - return allocator.segments[segmentInfo.ID], nil + return s.segments[segmentInfo.ID], nil } -func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { - collMeta, err := allocator.mt.GetCollection(collectionID) +func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { + collMeta, err := s.mt.GetCollection(collectionID) if err != nil { return -1, err } @@ -276,21 +276,27 @@ func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int if err != nil { return -1, err } - return int(allocator.segmentThreshold / float64(sizePerRecord)), nil + return int(s.segmentThreshold / float64(sizePerRecord)), nil } -func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) { +func (s *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() keys := make([]UniqueID, 0) - for _, segStatus := range allocator.segments { + for _, segStatus := range s.segments { if !segStatus.sealed { - sealed, err := allocator.checkSegmentSealed(segStatus) + sealed, err := s.checkSegmentSealed(segStatus) if err != nil { return nil, err } + if !sealed { + continue + } + if err := s.sealSegmentInMeta(segStatus.id); err != nil { + return nil, err + } segStatus.sealed = sealed } if segStatus.sealed { @@ -300,50 +306,62 @@ func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]Uni return keys, nil } -func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { - segMeta, err := allocator.mt.GetSegment(segStatus.id) +func (s *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { + segMeta, err := s.mt.GetSegment(segStatus.id) if err != nil { return false, err } - return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil + return float64(segMeta.NumRows) >= s.segmentThresholdFactor*float64(segStatus.total), nil } -func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error { +func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - status, ok := allocator.segments[segmentID] + s.mu.Lock() + defer s.mu.Unlock() + status, ok := s.segments[segmentID] if !ok { return nil } + + if err := s.sealSegmentInMeta(segmentID); err != nil { + return err + } status.sealed = true return nil } -func (allocator *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool { +func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - _, ok := allocator.segments[segmentID] + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.segments[segmentID] return ok } -func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) { - sp, _ := trace.StartSpanFromContext(ctx) - defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - delete(allocator.segments, segmentID) +func (s *segmentAllocator) sealSegmentInMeta(id UniqueID) error { + ts, err := s.allocator.allocTimestamp() + if err != nil { + return err + } + return s.mt.SealSegment(id, ts) } -func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error { +func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - for _, segStatus := range allocator.segments { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.segments, segmentID) +} + +func (s *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error { + sp, _ := trace.StartSpanFromContext(ctx) + defer sp.Finish() + s.mu.Lock() + defer s.mu.Unlock() + for _, segStatus := range s.segments { for i := 0; i < len(segStatus.allocations); i++ { if timeTick < segStatus.allocations[i].expireTime { continue @@ -358,24 +376,24 @@ func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTi return nil } -func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) { +func (s *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.RLock() - defer allocator.mu.RUnlock() - status, ok := allocator.segments[segmentID] + s.mu.RLock() + defer s.mu.RUnlock() + status, ok := s.segments[segmentID] if !ok { return false, fmt.Errorf("segment %d not found", segmentID) } return status.lastExpireTime <= ts, nil } -func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) { +func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() - allocator.mu.Lock() - defer allocator.mu.Unlock() - for _, status := range allocator.segments { + s.mu.Lock() + defer s.mu.Unlock() + for _, status := range s.segments { if status.collectionID == collectionID { if status.sealed { continue diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 25802d3ced..8ba133b9a7 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -107,10 +107,6 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err)) continue } - if err = watcher.meta.SetSegmentState(id, commonpb.SegmentState_Sealed); err != nil { - log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err)) - continue - } collID, segID := sInfo.CollectionID, sInfo.ID coll2Segs[collID] = append(coll2Segs[collID], segID) watcher.allocator.DropSegment(ctx, id)