diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index c25911bdc4..deeca0d91e 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -20,11 +20,12 @@ import ( "context" "fmt" "math" + "runtime" "sync" "time" "github.com/cockroachdb/errors" - "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/panjf2000/ants/v2" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -37,7 +38,10 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -96,6 +100,8 @@ type Channel interface { // getTotalMemorySize returns the sum of memory sizes of segments. getTotalMemorySize() int64 forceToSync() + + close() } // ChannelMeta contains channel meta and the latest segments infos of the channel. @@ -113,6 +119,9 @@ type ChannelMeta struct { metaService *metaService chunkManager storage.ChunkManager + workerPool *conc.Pool[struct{}] + + closed *atomic.Bool } type addSegmentReq struct { @@ -130,6 +139,8 @@ var _ Channel = &ChannelMeta{} func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta { metaService := newMetaService(rc, collID) + pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), ants.WithPreAlloc(false), ants.WithNonblocking(false)) + channel := ChannelMeta{ collectionID: collID, collSchema: schema, @@ -145,6 +156,8 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection metaService: metaService, chunkManager: cm, + workerPool: pool, + closed: atomic.NewBool(false), } return &channel @@ -306,13 +319,72 @@ func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment { } func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error { + if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { + // mark segment lazy loading + s.setLoadingLazy(true) + c.submitLoadStatsTask(s, statsBinlogs, ts) + return nil + } + return c.initPKstats(ctx, s, statsBinlogs, ts) +} + +func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) { + log := log.Ctx(context.TODO()).With( + zap.Int64("segmentID", s.segmentID), + zap.Int64("collectionID", s.collectionID), + ) + if c.closed.Load() { + // stop retry and resubmit if channel meta closed + return + } + // do submitting in a goroutine in case of task pool is full + go func() { + c.workerPool.Submit(func() (struct{}, error) { + stats, err := c.loadStats(context.Background(), s.segmentID, s.collectionID, statsBinlogs, ts) + if err != nil { + // TODO if not retryable, add rebuild statslog logic + log.Warn("failed to lazy load statslog for segment", zap.Error(err)) + if c.retryableLoadError(err) { + log.Info("retry load statslog") + c.submitLoadStatsTask(s, statsBinlogs, ts) + } + return struct{}{}, err + } + // get segment lock here + // it's ok that segment is dropped here + c.segMu.Lock() + defer c.segMu.Unlock() + s.historyStats = append(s.historyStats, stats...) + s.setLoadingLazy(false) + + log.Info("lazy loading segment statslog complete") + + return struct{}{}, nil + }) + }() +} + +func (c *ChannelMeta) retryableLoadError(err error) bool { + switch { + case errors.Is(err, merr.ErrParameterInvalid): + // statslog corrupted + return false + case errors.Is(err, storage.ErrNoSuchKey): + // statslog not found + return false + default: + return true + } +} + +func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collectionID int64, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) ([]*storage.PkStatistics, error) { startTs := time.Now() - log := log.With(zap.Int64("segmentID", s.segmentID)) - log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs))) - schema, err := c.getCollectionSchema(s.collectionID, ts) + log := log.With(zap.Int64("segmentID", segmentID)) + log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs))) + schema, err := c.getCollectionSchema(collectionID, ts) if err != nil { log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err)) - return err + return nil, err } // get pkfield id @@ -338,14 +410,14 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs // no stats log to parse, initialize a new BF if len(bloomFilterFiles) == 0 { log.Warn("no stats files to load") - return nil + return nil, nil } // read historical PK filter values, err := c.chunkManager.MultiRead(ctx, bloomFilterFiles) if err != nil { log.Warn("failed to load bloom filter files", zap.Error(err)) - return err + return nil, err } blobs := make([]*Blob, 0) for i := 0; i < len(values); i++ { @@ -355,9 +427,10 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs stats, err := storage.DeserializeStats(blobs) if err != nil { log.Warn("failed to deserialize bloom filter files", zap.Error(err)) - return err + return nil, merr.WrapErrParameterInvalid("valid statslog", "corrupted statslog", err.Error()) } var size uint + result := make([]*storage.PkStatistics, 0, len(stats)) for _, stat := range stats { pkStat := &storage.PkStatistics{ PkFilter: stat.BF, @@ -365,9 +438,19 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs MaxPK: stat.MaxPk, } size += stat.BF.Cap() - s.historyStats = append(s.historyStats, pkStat) + result = append(result, pkStat) } + log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size)) + return result, nil +} + +func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error { + stats, err := c.loadStats(ctx, s.segmentID, s.collectionID, statsBinlogs, ts) + if err != nil { + return err + } + s.historyStats = stats return nil } @@ -829,3 +912,7 @@ func (c *ChannelMeta) getTotalMemorySize() int64 { } return res } + +func (c *ChannelMeta) close() { + c.closed.Store(true) +} diff --git a/internal/datanode/channel_meta_test.go b/internal/datanode/channel_meta_test.go index 5b09eaa98a..b0996c59e0 100644 --- a/internal/datanode/channel_meta_test.go +++ b/internal/datanode/channel_meta_test.go @@ -22,16 +22,19 @@ import ( "fmt" "math/rand" "testing" + "time" bloom "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" @@ -990,3 +993,218 @@ func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) { func TestChannelMetaSuite(t *testing.T) { suite.Run(t, new(ChannelMetaSuite)) } + +type ChannelMetaMockSuite struct { + suite.Suite + channel *ChannelMeta + + collID UniqueID + partID UniqueID + vchanName string + cm *mocks.ChunkManager +} + +func (s *ChannelMetaMockSuite) SetupTest() { + rc := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + + s.cm = mocks.NewChunkManager(s.T()) + s.collID = 1 + s.channel = newChannel("channel", s.collID, nil, rc, s.cm) + s.vchanName = "channel" +} + +func (s *ChannelMetaMockSuite) TestAddSegment_SkipBFLoad() { + Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "true") + defer func() { + Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "false") + }() + + s.Run("success_load", func() { + defer s.SetupTest() + ch := make(chan struct{}) + s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}). + Run(func(_ context.Context, _ []string) { + <-ch + }).Return([][]byte{}, nil) + + err := s.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_Flushed, + segID: 100, + collID: s.collID, + partitionID: s.partID, + statsBinLogs: []*datapb.FieldBinlog{ + { + FieldID: 106, + Binlogs: []*datapb.Binlog{ + {LogPath: "rootPath/stats/1/0/100/10001"}, + }, + }, + }, + }) + + s.NoError(err) + + seg, ok := s.getSegmentByID(100) + s.Require().True(ok) + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + + close(ch) + s.Eventually(func() bool { + return !seg.isLoadingLazy() + }, time.Second, time.Millisecond*100) + }) + + s.Run("fail_nosuchkey", func() { + defer s.SetupTest() + ch := make(chan struct{}) + s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}). + Run(func(_ context.Context, _ []string) { + <-ch + }).Return(nil, storage.WrapErrNoSuchKey("rootPath/stats/1/0/100/10001")) + + err := s.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_Flushed, + segID: 100, + collID: s.collID, + partitionID: s.partID, + statsBinLogs: []*datapb.FieldBinlog{ + { + FieldID: 106, + Binlogs: []*datapb.Binlog{ + {LogPath: "rootPath/stats/1/0/100/10001"}, + }, + }, + }, + }) + + s.NoError(err) + + seg, ok := s.getSegmentByID(100) + s.Require().True(ok) + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + + close(ch) + + // cannot wait here, since running will not reduce immediately + /* + s.Eventually(func() bool { + // s.T().Log(s.channel.workerpool) + return s.channel.workerPool.Running() == 0 + }, time.Second, time.Millisecond*100)*/ + + // still return false positive + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + }) + + s.Run("fail_corrupted", func() { + defer s.SetupTest() + ch := make(chan struct{}) + s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}). + Run(func(_ context.Context, _ []string) { + <-ch + }).Return([][]byte{ + []byte("ABC"), + }, nil) + + err := s.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_Flushed, + segID: 100, + collID: s.collID, + partitionID: s.partID, + statsBinLogs: []*datapb.FieldBinlog{ + { + FieldID: 106, + Binlogs: []*datapb.Binlog{ + {LogPath: "rootPath/stats/1/0/100/10001"}, + }, + }, + }, + }) + + s.NoError(err) + + seg, ok := s.getSegmentByID(100) + s.Require().True(ok) + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + + close(ch) + // cannot wait here, since running will not reduce immediately + /* + s.Eventually(func() bool { + // not retryable + return s.channel.workerPool.Running() == 0 + }, time.Second, time.Millisecond*100)*/ + + // still return false positive + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + }) + + s.Run("transient_error", func() { + defer s.SetupTest() + ch := make(chan struct{}) + counter := 0 + s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).Call. + Return(func(_ context.Context, arg []string) [][]byte { + if counter == 0 { + return nil + } + return [][]byte{} + }, func(_ context.Context, arg []string) error { + if counter == 0 { + counter++ + return errors.New("transient error") + } + return nil + }) + + err := s.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_Flushed, + segID: 100, + collID: s.collID, + partitionID: s.partID, + statsBinLogs: []*datapb.FieldBinlog{ + { + FieldID: 106, + Binlogs: []*datapb.Binlog{ + {LogPath: "rootPath/stats/1/0/100/10001"}, + }, + }, + }, + }) + + s.NoError(err) + + seg, ok := s.getSegmentByID(100) + s.Require().True(ok) + s.True(seg.isLoadingLazy()) + s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100})) + + close(ch) + s.Eventually(func() bool { + return !seg.isLoadingLazy() + }, time.Second, time.Millisecond*100) + }) +} + +func (s *ChannelMetaMockSuite) getSegmentByID(id UniqueID) (*Segment, bool) { + s.channel.segMu.RLock() + defer s.channel.segMu.RUnlock() + + seg, ok := s.channel.segments[id] + if ok && seg.isValid() { + return seg, true + } + + return nil, false +} + +func TestChannelMetaMock(t *testing.T) { + suite.Run(t, new(ChannelMetaMockSuite)) +} diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 588a032274..419ebadfd9 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -168,6 +168,7 @@ func (dsService *dataSyncService) close() { close(dsService.flushCh) dsService.flushManager.close() dsService.cancelFn() + dsService.channel.close() }) } diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index 9f22a9f60f..c48c15eede 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -50,8 +50,21 @@ type Segment struct { currentStat *storage.PkStatistics historyStats []*storage.PkStatistics - lastSyncTs Timestamp - startPos *msgpb.MsgPosition // TODO readonly + lastSyncTs Timestamp + startPos *msgpb.MsgPosition // TODO readonly + lazyLoading atomic.Value +} + +func (s *Segment) isLoadingLazy() bool { + b, ok := s.lazyLoading.Load().(bool) + if !ok { + return false + } + return b +} + +func (s *Segment) setLoadingLazy(b bool) { + s.lazyLoading.Store(b) } func (s *Segment) isValid() bool { @@ -90,6 +103,10 @@ func (s *Segment) InitCurrentStat() { // check if PK exists is current func (s *Segment) isPKExist(pk primaryKey) bool { + // for integrity, report false positive while lazy loading + if s.isLoadingLazy() { + return true + } s.statLock.Lock() defer s.statLock.Unlock() if s.currentStat != nil && s.currentStat.PkExist(pk) {