Make datanode load statslog lazy if SkipBFStatsLog is true (#23877)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-05-06 15:12:39 +08:00 committed by GitHub
parent 884f7904a1
commit 345855c984
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 334 additions and 11 deletions

View File

@ -20,11 +20,12 @@ import (
"context"
"fmt"
"math"
"runtime"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/panjf2000/ants/v2"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -37,7 +38,10 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -96,6 +100,8 @@ type Channel interface {
// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
forceToSync()
close()
}
// ChannelMeta contains channel meta and the latest segments infos of the channel.
@ -113,6 +119,9 @@ type ChannelMeta struct {
metaService *metaService
chunkManager storage.ChunkManager
workerPool *conc.Pool[struct{}]
closed *atomic.Bool
}
type addSegmentReq struct {
@ -130,6 +139,8 @@ var _ Channel = &ChannelMeta{}
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), ants.WithPreAlloc(false), ants.WithNonblocking(false))
channel := ChannelMeta{
collectionID: collID,
collSchema: schema,
@ -145,6 +156,8 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
metaService: metaService,
chunkManager: cm,
workerPool: pool,
closed: atomic.NewBool(false),
}
return &channel
@ -306,13 +319,72 @@ func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment {
}
func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
// mark segment lazy loading
s.setLoadingLazy(true)
c.submitLoadStatsTask(s, statsBinlogs, ts)
return nil
}
return c.initPKstats(ctx, s, statsBinlogs, ts)
}
func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) {
log := log.Ctx(context.TODO()).With(
zap.Int64("segmentID", s.segmentID),
zap.Int64("collectionID", s.collectionID),
)
if c.closed.Load() {
// stop retry and resubmit if channel meta closed
return
}
// do submitting in a goroutine in case of task pool is full
go func() {
c.workerPool.Submit(func() (struct{}, error) {
stats, err := c.loadStats(context.Background(), s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
// TODO if not retryable, add rebuild statslog logic
log.Warn("failed to lazy load statslog for segment", zap.Error(err))
if c.retryableLoadError(err) {
log.Info("retry load statslog")
c.submitLoadStatsTask(s, statsBinlogs, ts)
}
return struct{}{}, err
}
// get segment lock here
// it's ok that segment is dropped here
c.segMu.Lock()
defer c.segMu.Unlock()
s.historyStats = append(s.historyStats, stats...)
s.setLoadingLazy(false)
log.Info("lazy loading segment statslog complete")
return struct{}{}, nil
})
}()
}
func (c *ChannelMeta) retryableLoadError(err error) bool {
switch {
case errors.Is(err, merr.ErrParameterInvalid):
// statslog corrupted
return false
case errors.Is(err, storage.ErrNoSuchKey):
// statslog not found
return false
default:
return true
}
}
func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collectionID int64, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) ([]*storage.PkStatistics, error) {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", s.segmentID))
log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs)))
schema, err := c.getCollectionSchema(s.collectionID, ts)
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))
schema, err := c.getCollectionSchema(collectionID, ts)
if err != nil {
log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err))
return err
return nil, err
}
// get pkfield id
@ -338,14 +410,14 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs
// no stats log to parse, initialize a new BF
if len(bloomFilterFiles) == 0 {
log.Warn("no stats files to load")
return nil
return nil, nil
}
// read historical PK filter
values, err := c.chunkManager.MultiRead(ctx, bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
return err
return nil, err
}
blobs := make([]*Blob, 0)
for i := 0; i < len(values); i++ {
@ -355,9 +427,10 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs
stats, err := storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return err
return nil, merr.WrapErrParameterInvalid("valid statslog", "corrupted statslog", err.Error())
}
var size uint
result := make([]*storage.PkStatistics, 0, len(stats))
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
@ -365,9 +438,19 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs
MaxPK: stat.MaxPk,
}
size += stat.BF.Cap()
s.historyStats = append(s.historyStats, pkStat)
result = append(result, pkStat)
}
log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size))
return result, nil
}
func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
stats, err := c.loadStats(ctx, s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
return err
}
s.historyStats = stats
return nil
}
@ -829,3 +912,7 @@ func (c *ChannelMeta) getTotalMemorySize() int64 {
}
return res
}
func (c *ChannelMeta) close() {
c.closed.Store(true)
}

View File

@ -22,16 +22,19 @@ import (
"fmt"
"math/rand"
"testing"
"time"
bloom "github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
@ -990,3 +993,218 @@ func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
func TestChannelMetaSuite(t *testing.T) {
suite.Run(t, new(ChannelMetaSuite))
}
type ChannelMetaMockSuite struct {
suite.Suite
channel *ChannelMeta
collID UniqueID
partID UniqueID
vchanName string
cm *mocks.ChunkManager
}
func (s *ChannelMetaMockSuite) SetupTest() {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
s.cm = mocks.NewChunkManager(s.T())
s.collID = 1
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.vchanName = "channel"
}
func (s *ChannelMetaMockSuite) TestAddSegment_SkipBFLoad() {
Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "true")
defer func() {
Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "false")
}()
s.Run("success_load", func() {
defer s.SetupTest()
ch := make(chan struct{})
s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).
Run(func(_ context.Context, _ []string) {
<-ch
}).Return([][]byte{}, nil)
err := s.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: 100,
collID: s.collID,
partitionID: s.partID,
statsBinLogs: []*datapb.FieldBinlog{
{
FieldID: 106,
Binlogs: []*datapb.Binlog{
{LogPath: "rootPath/stats/1/0/100/10001"},
},
},
},
})
s.NoError(err)
seg, ok := s.getSegmentByID(100)
s.Require().True(ok)
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
close(ch)
s.Eventually(func() bool {
return !seg.isLoadingLazy()
}, time.Second, time.Millisecond*100)
})
s.Run("fail_nosuchkey", func() {
defer s.SetupTest()
ch := make(chan struct{})
s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).
Run(func(_ context.Context, _ []string) {
<-ch
}).Return(nil, storage.WrapErrNoSuchKey("rootPath/stats/1/0/100/10001"))
err := s.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: 100,
collID: s.collID,
partitionID: s.partID,
statsBinLogs: []*datapb.FieldBinlog{
{
FieldID: 106,
Binlogs: []*datapb.Binlog{
{LogPath: "rootPath/stats/1/0/100/10001"},
},
},
},
})
s.NoError(err)
seg, ok := s.getSegmentByID(100)
s.Require().True(ok)
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
close(ch)
// cannot wait here, since running will not reduce immediately
/*
s.Eventually(func() bool {
// s.T().Log(s.channel.workerpool)
return s.channel.workerPool.Running() == 0
}, time.Second, time.Millisecond*100)*/
// still return false positive
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
})
s.Run("fail_corrupted", func() {
defer s.SetupTest()
ch := make(chan struct{})
s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).
Run(func(_ context.Context, _ []string) {
<-ch
}).Return([][]byte{
[]byte("ABC"),
}, nil)
err := s.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: 100,
collID: s.collID,
partitionID: s.partID,
statsBinLogs: []*datapb.FieldBinlog{
{
FieldID: 106,
Binlogs: []*datapb.Binlog{
{LogPath: "rootPath/stats/1/0/100/10001"},
},
},
},
})
s.NoError(err)
seg, ok := s.getSegmentByID(100)
s.Require().True(ok)
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
close(ch)
// cannot wait here, since running will not reduce immediately
/*
s.Eventually(func() bool {
// not retryable
return s.channel.workerPool.Running() == 0
}, time.Second, time.Millisecond*100)*/
// still return false positive
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
})
s.Run("transient_error", func() {
defer s.SetupTest()
ch := make(chan struct{})
counter := 0
s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).Call.
Return(func(_ context.Context, arg []string) [][]byte {
if counter == 0 {
return nil
}
return [][]byte{}
}, func(_ context.Context, arg []string) error {
if counter == 0 {
counter++
return errors.New("transient error")
}
return nil
})
err := s.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: 100,
collID: s.collID,
partitionID: s.partID,
statsBinLogs: []*datapb.FieldBinlog{
{
FieldID: 106,
Binlogs: []*datapb.Binlog{
{LogPath: "rootPath/stats/1/0/100/10001"},
},
},
},
})
s.NoError(err)
seg, ok := s.getSegmentByID(100)
s.Require().True(ok)
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
close(ch)
s.Eventually(func() bool {
return !seg.isLoadingLazy()
}, time.Second, time.Millisecond*100)
})
}
func (s *ChannelMetaMockSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
s.channel.segMu.RLock()
defer s.channel.segMu.RUnlock()
seg, ok := s.channel.segments[id]
if ok && seg.isValid() {
return seg, true
}
return nil, false
}
func TestChannelMetaMock(t *testing.T) {
suite.Run(t, new(ChannelMetaMockSuite))
}

View File

@ -168,6 +168,7 @@ func (dsService *dataSyncService) close() {
close(dsService.flushCh)
dsService.flushManager.close()
dsService.cancelFn()
dsService.channel.close()
})
}

View File

@ -50,8 +50,21 @@ type Segment struct {
currentStat *storage.PkStatistics
historyStats []*storage.PkStatistics
lastSyncTs Timestamp
startPos *msgpb.MsgPosition // TODO readonly
lastSyncTs Timestamp
startPos *msgpb.MsgPosition // TODO readonly
lazyLoading atomic.Value
}
func (s *Segment) isLoadingLazy() bool {
b, ok := s.lazyLoading.Load().(bool)
if !ok {
return false
}
return b
}
func (s *Segment) setLoadingLazy(b bool) {
s.lazyLoading.Store(b)
}
func (s *Segment) isValid() bool {
@ -90,6 +103,10 @@ func (s *Segment) InitCurrentStat() {
// check if PK exists is current
func (s *Segment) isPKExist(pk primaryKey) bool {
// for integrity, report false positive while lazy loading
if s.isLoadingLazy() {
return true
}
s.statLock.Lock()
defer s.statLock.Unlock()
if s.currentStat != nil && s.currentStat.PkExist(pk) {