fix: Block warmup submit if pool full in sync mode (#38690)

https://github.com/milvus-io/milvus/issues/38692

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-01-02 15:04:58 +08:00 committed by GitHub
parent d3a5282eaa
commit aa0a87eda7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 106 additions and 10 deletions

View File

@ -102,6 +102,7 @@ MmapChunkTarget::get() {
write(padding, padding_size); write(padding, padding_size);
flush(); flush();
file_.FFlush();
auto m = mmap( auto m = mmap(
nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_); nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_);

View File

@ -116,6 +116,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
DeltaPosition: insertData.StartPosition, DeltaPosition: insertData.StartPosition,
Level: datapb.SegmentLevel_L1, Level: datapb.SegmentLevel_L1,
}, },
nil,
) )
if err != nil { if err != nil {
log.Error("failed to create new segment", log.Error("failed to create new segment",

View File

@ -66,6 +66,7 @@ func (s *ManagerSuite) SetupTest() {
InsertChannel: s.channels[i], InsertChannel: s.channels[i],
Level: s.levels[i], Level: s.levels[i],
}, },
nil,
) )
s.Require().NoError(err) s.Require().NoError(err)
s.segments = append(s.segments, segment) s.segments = append(s.segments, segment)

View File

@ -142,7 +142,7 @@ func initWarmupPool() {
runtime.LockOSThread() runtime.LockOSThread()
C.SetThreadName(cgoTagWarmup) C.SetThreadName(cgoTagWarmup)
}), // lock os thread for cgo thread disposal }), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking conc.WithNonBlocking(false),
) )
warmupPool.Store(pool) warmupPool.Store(pool)

View File

@ -98,6 +98,7 @@ func (suite *RetrieveSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)
@ -126,6 +127,7 @@ func (suite *RetrieveSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)

View File

@ -88,6 +88,7 @@ func (suite *SearchSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)
@ -116,6 +117,7 @@ func (suite *SearchSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)

View File

@ -30,6 +30,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"unsafe" "unsafe"
@ -279,6 +280,7 @@ type LocalSegment struct {
lastDeltaTimestamp *atomic.Uint64 lastDeltaTimestamp *atomic.Uint64
fields *typeutil.ConcurrentMap[int64, *FieldInfo] fields *typeutil.ConcurrentMap[int64, *FieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
warmupDispatcher *AsyncWarmupDispatcher
} }
func NewSegment(ctx context.Context, func NewSegment(ctx context.Context,
@ -286,6 +288,7 @@ func NewSegment(ctx context.Context,
segmentType SegmentType, segmentType SegmentType,
version int64, version int64,
loadInfo *querypb.SegmentLoadInfo, loadInfo *querypb.SegmentLoadInfo,
warmupDispatcher *AsyncWarmupDispatcher,
) (Segment, error) { ) (Segment, error) {
log := log.Ctx(ctx) log := log.Ctx(ctx)
/* /*
@ -345,9 +348,10 @@ func NewSegment(ctx context.Context,
fields: typeutil.NewConcurrentMap[int64, *FieldInfo](), fields: typeutil.NewConcurrentMap[int64, *FieldInfo](),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
memSize: atomic.NewInt64(-1), memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1), rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0), insertCount: atomic.NewInt64(0),
warmupDispatcher: warmupDispatcher,
} }
if err := segment.initializeSegment(); err != nil { if err := segment.initializeSegment(); err != nil {
@ -1157,7 +1161,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
return nil, nil return nil, nil
}).Await() }).Await()
case "async": case "async":
GetWarmupPool().Submit(func() (any, error) { task := func() (any, error) {
// bad implemtation, warmup is async at another goroutine and hold the rlock. // bad implemtation, warmup is async at another goroutine and hold the rlock.
// the state transition of segment in segment loader will blocked. // the state transition of segment in segment loader will blocked.
// add a waiter to avoid it. // add a waiter to avoid it.
@ -1176,7 +1180,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
} }
log.Info("warming up chunk cache asynchronously done") log.Info("warming up chunk cache asynchronously done")
return nil, nil return nil, nil
}) }
s.warmupDispatcher.AddTask(task)
default: default:
// no warming up // no warming up
} }
@ -1347,3 +1352,55 @@ func (s *LocalSegment) indexNeedLoadRawData(schema *schemapb.CollectionSchema, i
} }
return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil
} }
type (
WarmupTask = func() (any, error)
AsyncWarmupDispatcher struct {
mu sync.RWMutex
tasks []WarmupTask
notify chan struct{}
}
)
func NewWarmupDispatcher() *AsyncWarmupDispatcher {
return &AsyncWarmupDispatcher{
notify: make(chan struct{}, 1),
}
}
func (d *AsyncWarmupDispatcher) AddTask(task func() (any, error)) {
d.mu.Lock()
d.tasks = append(d.tasks, task)
d.mu.Unlock()
select {
case d.notify <- struct{}{}:
default:
}
}
func (d *AsyncWarmupDispatcher) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-d.notify:
d.mu.RLock()
tasks := make([]WarmupTask, len(d.tasks))
copy(tasks, d.tasks)
d.mu.RUnlock()
for _, task := range tasks {
select {
case <-ctx.Done():
return
default:
GetDynamicPool().Submit(task)
}
}
d.mu.Lock()
d.tasks = d.tasks[len(tasks):]
d.mu.Unlock()
}
}
}

View File

@ -147,6 +147,7 @@ type resourceEstimateFactor struct {
} }
func NewLoader( func NewLoader(
ctx context.Context,
manager *Manager, manager *Manager,
cm storage.ChunkManager, cm storage.ChunkManager,
) *segmentLoader { ) *segmentLoader {
@ -167,11 +168,14 @@ func NewLoader(
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
warmupDispatcher := NewWarmupDispatcher()
go warmupDispatcher.Run(ctx)
loader := &segmentLoader{ loader := &segmentLoader{
manager: manager, manager: manager,
cm: cm, cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
committedResourceNotifier: syncutil.NewVersionedNotifier(), committedResourceNotifier: syncutil.NewVersionedNotifier(),
warmupDispatcher: warmupDispatcher,
} }
return loader return loader
@ -212,6 +216,8 @@ type segmentLoader struct {
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
committedResource LoadResource committedResource LoadResource
committedResourceNotifier *syncutil.VersionedNotifier committedResourceNotifier *syncutil.VersionedNotifier
warmupDispatcher *AsyncWarmupDispatcher
} }
var _ Loader = (*segmentLoader)(nil) var _ Loader = (*segmentLoader)(nil)
@ -294,6 +300,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
segmentType, segmentType,
version, version,
loadInfo, loadInfo,
loader.warmupDispatcher,
) )
if err != nil { if err != nil {
log.Warn("load segment failed when create new segment", log.Warn("load segment failed when create new segment",

View File

@ -81,7 +81,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
// Dependencies // Dependencies
suite.manager = NewManager() suite.manager = NewManager()
suite.loader = NewLoader(suite.manager, suite.chunkManager) suite.loader = NewLoader(ctx, suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get()) initcore.InitRemoteChunkManager(paramtable.Get())
// Data // Data
@ -98,7 +98,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
func (suite *SegmentLoaderSuite) SetupBM25() { func (suite *SegmentLoaderSuite) SetupBM25() {
// Dependencies // Dependencies
suite.manager = NewManager() suite.manager = NewManager()
suite.loader = NewLoader(suite.manager, suite.chunkManager) suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get()) initcore.InitRemoteChunkManager(paramtable.Get())
suite.schema = mock_segcore.GenTestBM25CollectionSchema("test") suite.schema = mock_segcore.GenTestBM25CollectionSchema("test")
@ -798,7 +798,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
ctx := context.Background() ctx := context.Background()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
suite.loader = NewLoader(suite.manager, suite.chunkManager) suite.loader = NewLoader(ctx, suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get()) initcore.InitRemoteChunkManager(paramtable.Get())
// Data // Data

View File

@ -5,8 +5,11 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks/util/mock_segcore" "github.com/milvus-io/milvus/internal/mocks/util/mock_segcore"
@ -91,6 +94,7 @@ func (suite *SegmentSuite) SetupTest() {
}, },
}, },
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)
@ -122,6 +126,7 @@ func (suite *SegmentSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
}, },
nil,
) )
suite.Require().NoError(err) suite.Require().NoError(err)
@ -222,3 +227,22 @@ func (suite *SegmentSuite) TestSegmentReleased() {
func TestSegment(t *testing.T) { func TestSegment(t *testing.T) {
suite.Run(t, new(SegmentSuite)) suite.Run(t, new(SegmentSuite))
} }
func TestWarmupDispatcher(t *testing.T) {
d := NewWarmupDispatcher()
ctx := context.Background()
go d.Run(ctx)
completed := atomic.NewInt64(0)
taskCnt := 10000
for i := 0; i < taskCnt; i++ {
d.AddTask(func() (any, error) {
completed.Inc()
return nil, nil
})
}
assert.Eventually(t, func() bool {
return completed.Load() == int64(taskCnt)
}, 10*time.Second, time.Second)
}

View File

@ -369,7 +369,7 @@ func (node *QueryNode) Init() error {
node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.subscribingChannels = typeutil.NewConcurrentSet[string]()
node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()
node.manager = segments.NewManager() node.manager = segments.NewManager()
node.loader = segments.NewLoader(node.manager, node.chunkManager) node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager)
node.manager.SetLoader(node.loader) node.manager.SetLoader(node.loader)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID())
// init pipeline manager // init pipeline manager

View File

@ -236,6 +236,7 @@ func (suite *QueryNodeSuite) TestStop() {
Level: datapb.SegmentLevel_Legacy, Level: datapb.SegmentLevel_Legacy,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", 1), InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", 1),
}, },
nil,
) )
suite.NoError(err) suite.NoError(err)
suite.node.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, segment) suite.node.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, segment)