milvus/internal/dataservice/segment_allocator.go
sunby 4d03da9c4f
Remove SetSegmentState and seal segment in segment allocator (#4994)
Segment should be sealed once it exceeds the limited size. This operation
should be done in segment allocator. Watcher only check the status to
decide whether the segment will be flushed.

Signed-off-by: sunby <bingyi.sun@zilliz.com>
2021-04-25 01:51:57 +00:00

405 lines
12 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
type errRemainInSufficient struct {
requestRows int
}
func newErrRemainInSufficient(requestRows int) errRemainInSufficient {
return errRemainInSufficient{requestRows: requestRows}
}
func (err errRemainInSufficient) Error() string {
return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows)
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocatorInterface interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation.
AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment.
GetSealedSegments(ctx context.Context) ([]UniqueID, error)
// SealSegment set segment sealed, the segment will not be allocated anymore.
SealSegment(ctx context.Context, segmentID UniqueID) error
// DropSegment drop the segment from allocator.
DropSegment(ctx context.Context, segmentID UniqueID)
// ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(ctx context.Context, timeTick Timestamp) error
// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
SealAllSegments(ctx context.Context, collectionID UniqueID)
// IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error)
}
type segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
total int
sealed bool
lastExpireTime Timestamp
allocations []*allocation
insertChannel string
}
type allocation struct {
rowNums int
expireTime Timestamp
}
type segmentAllocator struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
segmentExpireDuration int64
segmentThreshold float64
segmentThresholdFactor float64
mu sync.RWMutex
allocator allocatorInterface
segmentInfoStream msgstream.MsgStream
}
type Option struct {
apply func(alloc *segmentAllocator)
}
func WithSegmentStream(stream msgstream.MsgStream) Option {
return Option{
apply: func(alloc *segmentAllocator) {
alloc.segmentInfoStream = stream
},
}
}
func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...Option) *segmentAllocator {
alloc := &segmentAllocator{
mt: meta,
segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
allocator: allocator,
}
for _, opt := range opts {
opt.apply(alloc)
}
return alloc
}
func (s *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.segments[segmentInfo.ID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.ID)
}
return s.open(segmentInfo)
}
func (s *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
totalRows, err := s.estimateTotalRows(segmentInfo.CollectionID)
if err != nil {
return err
}
log.Debug("dataservice: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", totalRows))
s.segments[segmentInfo.ID] = &segmentStatus{
id: segmentInfo.ID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
total: totalRows,
sealed: false,
lastExpireTime: 0,
insertChannel: segmentInfo.InsertChannel,
}
return nil
}
func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range s.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
segStatus.insertChannel != channelName {
continue
}
var success bool
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
if !success {
continue
}
segID = segStatus.id
retCount = requestRows
expireTime = segStatus.lastExpireTime
return
}
var segStatus *segmentStatus
segStatus, err = s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
return
}
var success bool
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
if !success {
err = newErrRemainInSufficient(requestRows)
return
}
segID = segStatus.id
retCount = requestRows
expireTime = segStatus.lastExpireTime
return
}
func (s *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
totalOfAllocations := 0
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
}
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
free := segStatus.total - int(segMeta.NumRows) - totalOfAllocations
log.Debug("dataservice::alloc: ",
zap.Any("segMeta.NumRows", int(segMeta.NumRows)),
zap.Any("totalOfAllocations", totalOfAllocations))
if numRows > free {
return false, nil
}
ts, err := s.allocator.allocTimestamp()
if err != nil {
return false, err
}
physicalTs, logicalTs := tsoutil.ParseTS(ts)
expirePhysicalTs := physicalTs.Add(time.Duration(s.segmentExpireDuration) * time.Millisecond)
expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs))
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, &allocation{
numRows,
expireTs,
})
return true, nil
}
func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
if err != nil {
return nil, err
}
segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName)
if err != nil {
return nil, err
}
if err = s.mt.AddSegment(segmentInfo); err != nil {
return nil, err
}
if err = s.open(segmentInfo); err != nil {
return nil, err
}
infoMsg := &msgstream.SegmentInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Segment: segmentInfo,
},
}
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if s.segmentInfoStream != nil {
if err = s.segmentInfoStream.Produce(msgPack); err != nil {
return nil, err
}
}
return s.segments[segmentInfo.ID], nil
}
func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.mt.GetCollection(collectionID)
if err != nil {
return -1, err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(collMeta.Schema)
if err != nil {
return -1, err
}
return int(s.segmentThreshold / float64(sizePerRecord)), nil
}
func (s *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]UniqueID, 0)
for _, segStatus := range s.segments {
if !segStatus.sealed {
sealed, err := s.checkSegmentSealed(segStatus)
if err != nil {
return nil, err
}
if !sealed {
continue
}
if err := s.sealSegmentInMeta(segStatus.id); err != nil {
return nil, err
}
segStatus.sealed = sealed
}
if segStatus.sealed {
keys = append(keys, segStatus.id)
}
}
return keys, nil
}
func (s *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
return float64(segMeta.NumRows) >= s.segmentThresholdFactor*float64(segStatus.total), nil
}
func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
status, ok := s.segments[segmentID]
if !ok {
return nil
}
if err := s.sealSegmentInMeta(segmentID); err != nil {
return err
}
status.sealed = true
return nil
}
func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.segments[segmentID]
return ok
}
func (s *segmentAllocator) sealSegmentInMeta(id UniqueID) error {
ts, err := s.allocator.allocTimestamp()
if err != nil {
return err
}
return s.mt.SealSegment(id, ts)
}
func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
delete(s.segments, segmentID)
}
func (s *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range s.segments {
for i := 0; i < len(segStatus.allocations); i++ {
if timeTick < segStatus.allocations[i].expireTime {
continue
}
log.Debug("dataservice::ExpireAllocations: ",
zap.Any("segStatus.id", segStatus.id),
zap.Any("segStatus.allocations.rowNums", segStatus.allocations[i].rowNums))
segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...)
i--
}
}
return nil
}
func (s *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.RLock()
defer s.mu.RUnlock()
status, ok := s.segments[segmentID]
if !ok {
return false, fmt.Errorf("segment %d not found", segmentID)
}
return status.lastExpireTime <= ts, nil
}
func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, status := range s.segments {
if status.collectionID == collectionID {
if status.sealed {
continue
}
status.sealed = true
}
}
}