Remove SetSegmentState and seal segment in segment allocator (#4994)

Segment should be sealed once it exceeds the limited size. This operation
should be done in segment allocator. Watcher only check the status to
decide whether the segment will be flushed.

Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-04-25 09:51:57 +08:00 committed by GitHub
parent ada58b5bde
commit 4d03da9c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 106 deletions

View File

@ -218,22 +218,6 @@ func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) {
return proto.Clone(segment).(*datapb.SegmentInfo), nil
}
func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error {
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
segInfo.OpenTime = timetick
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error {
meta.Lock()
defer meta.Unlock()
@ -244,6 +228,7 @@ func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error {
}
segInfo.SealedTime = timetick
segInfo.State = commonpb.SegmentState_Sealed
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
@ -266,21 +251,6 @@ func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error {
return nil
}
func (meta *meta) SetSegmentState(segmentID UniqueID, state commonpb.SegmentState) error {
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
segInfo.State = state
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID {
meta.RLock()
defer meta.RUnlock()

View File

@ -173,9 +173,6 @@ func TestMeta_Basic(t *testing.T) {
assert.EqualValues(t, 1, len(segIDs))
assert.Contains(t, segIDs, segID1_1)
// check OpenSegment/SealSegment/FlushSegment
err = meta.OpenSegment(segID0_0, 100)
assert.Nil(t, err)
err = meta.SealSegment(segID0_0, 200)
assert.Nil(t, err)
err = meta.FlushSegment(segID0_0, 300)
@ -183,7 +180,6 @@ func TestMeta_Basic(t *testing.T) {
info0_0, err = meta.GetSegment(segID0_0)
assert.Nil(t, err)
assert.NotZero(t, info0_0.OpenTime)
assert.NotZero(t, info0_0.SealedTime)
assert.NotZero(t, info0_0.FlushedTime)
@ -288,10 +284,6 @@ func TestMeta_Basic(t *testing.T) {
err = meta.DropSegment(segIDInvalid)
assert.NotNil(t, err)
// check open non-exist segment
err = meta.OpenSegment(segIDInvalid, 100)
assert.NotNil(t, err)
// check seal non-exist segment
err = meta.SealSegment(segIDInvalid, 200)
assert.NotNil(t, err)

View File

@ -113,19 +113,19 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...Optio
return alloc
}
func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
func (s *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
if _, ok := allocator.segments[segmentInfo.ID]; ok {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.segments[segmentInfo.ID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.ID)
}
return allocator.open(segmentInfo)
return s.open(segmentInfo)
}
func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
func (s *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
totalRows, err := s.estimateTotalRows(segmentInfo.CollectionID)
if err != nil {
return err
}
@ -133,7 +133,7 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", totalRows))
allocator.segments[segmentInfo.ID] = &segmentStatus{
s.segments[segmentInfo.ID] = &segmentStatus{
id: segmentInfo.ID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
@ -145,20 +145,20 @@ func (allocator *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
return nil
}
func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range allocator.segments {
for _, segStatus := range s.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
segStatus.insertChannel != channelName {
continue
}
var success bool
success, err = allocator.alloc(segStatus, requestRows)
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
@ -172,12 +172,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI
}
var segStatus *segmentStatus
segStatus, err = allocator.openNewSegment(ctx, collectionID, partitionID, channelName)
segStatus, err = s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
return
}
var success bool
success, err = allocator.alloc(segStatus, requestRows)
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
@ -192,12 +192,12 @@ func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionI
return
}
func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
func (s *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
totalOfAllocations := 0
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
}
segMeta, err := allocator.mt.GetSegment(segStatus.id)
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
@ -209,12 +209,12 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int)
return false, nil
}
ts, err := allocator.allocator.allocTimestamp()
ts, err := s.allocator.allocTimestamp()
if err != nil {
return false, err
}
physicalTs, logicalTs := tsoutil.ParseTS(ts)
expirePhysicalTs := physicalTs.Add(time.Duration(allocator.segmentExpireDuration) * time.Millisecond)
expirePhysicalTs := physicalTs.Add(time.Duration(s.segmentExpireDuration) * time.Millisecond)
expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs))
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, &allocation{
@ -225,10 +225,10 @@ func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int)
return true, nil
}
func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := allocator.allocator.allocID()
id, err := s.allocator.allocID()
if err != nil {
return nil, err
}
@ -236,10 +236,10 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio
if err != nil {
return nil, err
}
if err = allocator.mt.AddSegment(segmentInfo); err != nil {
if err = s.mt.AddSegment(segmentInfo); err != nil {
return nil, err
}
if err = allocator.open(segmentInfo); err != nil {
if err = s.open(segmentInfo); err != nil {
return nil, err
}
infoMsg := &msgstream.SegmentInfoMsg{
@ -259,16 +259,16 @@ func (allocator *segmentAllocator) openNewSegment(ctx context.Context, collectio
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if allocator.segmentInfoStream != nil {
if err = allocator.segmentInfoStream.Produce(msgPack); err != nil {
if s.segmentInfoStream != nil {
if err = s.segmentInfoStream.Produce(msgPack); err != nil {
return nil, err
}
}
return allocator.segments[segmentInfo.ID], nil
return s.segments[segmentInfo.ID], nil
}
func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := allocator.mt.GetCollection(collectionID)
func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.mt.GetCollection(collectionID)
if err != nil {
return -1, err
}
@ -276,21 +276,27 @@ func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int
if err != nil {
return -1, err
}
return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
return int(s.segmentThreshold / float64(sizePerRecord)), nil
}
func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
func (s *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]UniqueID, 0)
for _, segStatus := range allocator.segments {
for _, segStatus := range s.segments {
if !segStatus.sealed {
sealed, err := allocator.checkSegmentSealed(segStatus)
sealed, err := s.checkSegmentSealed(segStatus)
if err != nil {
return nil, err
}
if !sealed {
continue
}
if err := s.sealSegmentInMeta(segStatus.id); err != nil {
return nil, err
}
segStatus.sealed = sealed
}
if segStatus.sealed {
@ -300,50 +306,62 @@ func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]Uni
return keys, nil
}
func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := allocator.mt.GetSegment(segStatus.id)
func (s *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
return float64(segMeta.NumRows) >= s.segmentThresholdFactor*float64(segStatus.total), nil
}
func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
status, ok := allocator.segments[segmentID]
s.mu.Lock()
defer s.mu.Unlock()
status, ok := s.segments[segmentID]
if !ok {
return nil
}
if err := s.sealSegmentInMeta(segmentID); err != nil {
return err
}
status.sealed = true
return nil
}
func (allocator *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool {
func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
_, ok := allocator.segments[segmentID]
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.segments[segmentID]
return ok
}
func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
delete(allocator.segments, segmentID)
func (s *segmentAllocator) sealSegmentInMeta(id UniqueID) error {
ts, err := s.allocator.allocTimestamp()
if err != nil {
return err
}
return s.mt.SealSegment(id, ts)
}
func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, segStatus := range allocator.segments {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.segments, segmentID)
}
func (s *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range s.segments {
for i := 0; i < len(segStatus.allocations); i++ {
if timeTick < segStatus.allocations[i].expireTime {
continue
@ -358,24 +376,24 @@ func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTi
return nil
}
func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
func (s *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.RLock()
defer allocator.mu.RUnlock()
status, ok := allocator.segments[segmentID]
s.mu.RLock()
defer s.mu.RUnlock()
status, ok := s.segments[segmentID]
if !ok {
return false, fmt.Errorf("segment %d not found", segmentID)
}
return status.lastExpireTime <= ts, nil
}
func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, status := range allocator.segments {
s.mu.Lock()
defer s.mu.Unlock()
for _, status := range s.segments {
if status.collectionID == collectionID {
if status.sealed {
continue

View File

@ -107,10 +107,6 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
if err = watcher.meta.SetSegmentState(id, commonpb.SegmentState_Sealed); err != nil {
log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
collID, segID := sInfo.CollectionID, sInfo.ID
coll2Segs[collID] = append(coll2Segs[collID], segID)
watcher.allocator.DropSegment(ctx, id)