diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c2d7c6c880..d7736d735b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -863,6 +863,7 @@ common: threshold: info: 500 # minimum milliseconds for printing durations in info level warn: 1000 # minimum milliseconds for printing durations in warn level + maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional storage: scheme: s3 enablev2: false diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 2d07baabc5..e142431fad 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -414,7 +414,7 @@ func (s *LocalSegment) initializeSegment() error { // Provide ONLY the read lock operations, // don't make `ptrLock` public to avoid abusing of the mutex. func (s *LocalSegment) PinIfNotReleased() error { - if !s.ptrLock.PinIfNotReleased() { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } return nil @@ -430,10 +430,10 @@ func (s *LocalSegment) InsertCount() int64 { func (s *LocalSegment) RowNum() int64 { // if segment is not loaded, return 0 (maybe not loaded or release by lru) - if !s.ptrLock.RLockIf(state.IsDataLoaded) { + if !s.ptrLock.PinIf(state.IsDataLoaded) { return 0 } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() rowNum := s.rowNum.Load() if rowNum < 0 { @@ -447,10 +447,10 @@ func (s *LocalSegment) RowNum() int64 { } func (s *LocalSegment) MemSize() int64 { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return 0 } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() memSize := s.memSize.Load() if memSize < 0 { @@ -481,10 +481,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool { } func (s *LocalSegment) HasRawData(fieldID int64) bool { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return false } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() return s.csegment.HasRawData(fieldID) } @@ -511,11 +511,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ zap.String("segmentType", s.segmentType.String()), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() hasIndex := s.ExistIndex(searchReq.SearchFieldID()) log = log.With(zap.Bool("withIndex", hasIndex)) @@ -533,11 +533,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ } func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log.Debug("begin to retrieve") @@ -580,11 +580,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan) } func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log.Debug("begin to retrieve by offsets") tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets") @@ -642,10 +642,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] if s.Type() != SegmentTypeGrowing { return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String()) } - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var result *segcore.InsertResult var err error @@ -689,10 +689,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe if primaryKeys.Len() == 0 { return nil } - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var err error GetDynamicPool().Submit(func() (any, error) { @@ -726,10 +726,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error { rowCount := loadInfo.GetNumOfRows() fields := loadInfo.GetBinlogPaths() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), @@ -770,10 +770,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error { } func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID)) defer sp.End() @@ -826,10 +826,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun } func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).WithLazy( zap.Int64("collectionID", s.Collection()), @@ -866,10 +866,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps() rowNum := deltaData.DeleteRowCount() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), @@ -1111,10 +1111,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F zap.Int64("segmentID", s.ID()), zap.Int64("fieldID", indexInfo.FieldID), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var status C.CStatus GetDynamicPool().Submit(func() (any, error) { @@ -1149,10 +1149,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap zap.Int64("fieldID", fieldID), zap.Bool("mmapEnabled", mmapEnabled), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var status C.CStatus @@ -1172,14 +1172,15 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap }).Await() case "async": GetWarmupPool().Submit(func() (any, error) { - // bad implemtation, warmup is async at another goroutine and hold the rlock. - // the state transition of segment in segment loader will blocked. - // add a waiter to avoid it. - s.ptrLock.BlockUntilDataLoadedOrReleased() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + // failed to wait for state update, return directly + if !s.ptrLock.BlockUntilDataLoadedOrReleased() { return nil, nil } - defer s.ptrLock.RUnlock() + + if s.PinIfNotReleased() != nil { + return nil, nil + } + defer s.Unpin() cFieldID := C.int64_t(fieldID) cMmapEnabled := C.bool(mmapEnabled) diff --git a/internal/querynodev2/segments/state/load_state_lock.go b/internal/querynodev2/segments/state/load_state_lock.go index 6d2f601e17..e774a373ce 100644 --- a/internal/querynodev2/segments/state/load_state_lock.go +++ b/internal/querynodev2/segments/state/load_state_lock.go @@ -3,13 +3,20 @@ package state import ( "fmt" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type loadStateEnum int +func noop() {} + // LoadState represent the state transition of segment. // LoadStateOnlyMeta: segment is created with meta, but not loaded. // LoadStateDataLoading: segment is loading data. @@ -69,32 +76,17 @@ type LoadStateLock struct { } // RLockIfNotReleased locks the segment if the state is not released. -func (ls *LoadStateLock) RLockIf(pred StatePredicate) bool { - ls.mu.RLock() - if !pred(ls.state) { - ls.mu.RUnlock() - return false - } - return true -} - -// RUnlock unlocks the segment. -func (ls *LoadStateLock) RUnlock() { - ls.mu.RUnlock() -} - -// PinIfNotReleased pin the segment into memory, avoid ReleaseAll to release it. -func (ls *LoadStateLock) PinIfNotReleased() bool { +func (ls *LoadStateLock) PinIf(pred StatePredicate) bool { ls.mu.RLock() defer ls.mu.RUnlock() - if ls.state == LoadStateReleased { + if !pred(ls.state) { return false } ls.refCnt.Inc() return true } -// Unpin unpin the segment, then segment can be released by ReleaseAll. +// Unpin unlocks the segment. func (ls *LoadStateLock) Unpin() { ls.mu.RLock() defer ls.mu.RUnlock() @@ -108,6 +100,12 @@ func (ls *LoadStateLock) Unpin() { } } +// PinIfNotReleased pin the segment if the state is not released. +// grammar suger for PinIf(IsNotReleased). +func (ls *LoadStateLock) PinIfNotReleased() bool { + return ls.PinIf(IsNotReleased) +} + // StartLoadData starts load segment data // Fast fail if segment is not in LoadStateOnlyMeta. func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) { @@ -129,76 +127,84 @@ func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) { // StartReleaseData wait until the segment is releasable and starts releasing segment data. func (ls *LoadStateLock) StartReleaseData() (g LoadStateLockGuard) { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() - - ls.waitUntilCanReleaseData() - - switch ls.state { - case LoadStateDataLoaded: - ls.state = LoadStateDataReleasing - ls.cv.Broadcast() - return newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta) - case LoadStateOnlyMeta: - // already transit to target state, do nothing. - return nil - case LoadStateReleased: - // do nothing for empty segment. - return nil - default: - panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) - } + ls.waitOrPanic(ls.canReleaseData, func() { + switch ls.state { + case LoadStateDataLoaded: + ls.state = LoadStateDataReleasing + ls.cv.Broadcast() + g = newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta) + case LoadStateOnlyMeta: + // already transit to target state, do nothing. + g = nil + case LoadStateReleased: + // do nothing for empty segment. + g = nil + default: + panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) + } + }) + return g } // StartReleaseAll wait until the segment is releasable and starts releasing all segment. func (ls *LoadStateLock) StartReleaseAll() (g LoadStateLockGuard) { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() + ls.waitOrPanic(ls.canReleaseAll, func() { + switch ls.state { + case LoadStateDataLoaded: + ls.state = LoadStateReleased + ls.cv.Broadcast() + g = newNopLoadStateLockGuard() + case LoadStateOnlyMeta: + ls.state = LoadStateReleased + ls.cv.Broadcast() + g = newNopLoadStateLockGuard() + case LoadStateReleased: + // already transit to target state, do nothing. + g = nil + default: + panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) + } + }) - ls.waitUntilCanReleaseAll() - - switch ls.state { - case LoadStateDataLoaded: - ls.state = LoadStateReleased - ls.cv.Broadcast() - return newNopLoadStateLockGuard() - case LoadStateOnlyMeta: - ls.state = LoadStateReleased - ls.cv.Broadcast() - return newNopLoadStateLockGuard() - case LoadStateReleased: - // already transit to target state, do nothing. - return nil - default: - panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) - } + return g } // blockUntilDataLoadedOrReleased blocks until the segment is loaded or released. -func (ls *LoadStateLock) BlockUntilDataLoadedOrReleased() { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() - - for ls.state != LoadStateDataLoaded && ls.state != LoadStateReleased { - ls.cv.Wait() - } +func (ls *LoadStateLock) BlockUntilDataLoadedOrReleased() bool { + var ok bool + ls.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded || state == LoadStateReleased + }, func() { ok = true }) + return ok } // waitUntilCanReleaseData waits until segment is release data able. -func (ls *LoadStateLock) waitUntilCanReleaseData() { - state := ls.state - for state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased { - ls.cv.Wait() - state = ls.state - } +func (ls *LoadStateLock) canReleaseData(state loadStateEnum) bool { + return state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased } // waitUntilCanReleaseAll waits until segment is releasable. -func (ls *LoadStateLock) waitUntilCanReleaseAll() { - state := ls.state - for (state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased) || ls.refCnt.Load() != 0 { - ls.cv.Wait() - state = ls.state +func (ls *LoadStateLock) canReleaseAll(state loadStateEnum) bool { + return (state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased) && ls.refCnt.Load() == 0 +} + +func (ls *LoadStateLock) waitOrPanic(ready func(state loadStateEnum) bool, then func()) { + ch := make(chan struct{}) + maxWaitTime := paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.GetAsDuration(time.Second) + go func() { + ls.cv.L.Lock() + defer ls.cv.L.Unlock() + defer close(ch) + for !ready(ls.state) { + ls.cv.Wait() + } + then() + }() + + select { + case <-time.After(maxWaitTime): + log.Error("load state lock wait timeout", zap.Duration("maxWaitTime", maxWaitTime)) + case <-ch: } } diff --git a/internal/querynodev2/segments/state/load_state_lock_test.go b/internal/querynodev2/segments/state/load_state_lock_test.go index 27d3a94933..b2053488b2 100644 --- a/internal/querynodev2/segments/state/load_state_lock_test.go +++ b/internal/querynodev2/segments/state/load_state_lock_test.go @@ -6,9 +6,12 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestLoadStateLoadData(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Load Data, roll back g, err := l.StartLoadData() @@ -44,6 +47,7 @@ func TestLoadStateLoadData(t *testing.T) { } func TestStartReleaseData(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Release Data, nothing to do on only meta. g := l.StartReleaseData() @@ -104,6 +108,7 @@ func TestStartReleaseData(t *testing.T) { } func TestBlockUntilDataLoadedOrReleased(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) ch := make(chan struct{}) go func() { @@ -122,6 +127,7 @@ func TestBlockUntilDataLoadedOrReleased(t *testing.T) { } func TestStartReleaseAll(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Release All, nothing to do on only meta. g := l.StartReleaseAll() @@ -183,22 +189,57 @@ func TestStartReleaseAll(t *testing.T) { assert.Equal(t, LoadStateReleased, l.state) } -func TestRLock(t *testing.T) { +func TestWaitOrPanic(t *testing.T) { + paramtable.Init() + + t.Run("normal", func(t *testing.T) { + paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "600") + defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) + + l := NewLoadStateLock(LoadStateDataLoaded) + executed := false + + assert.NotPanics(t, func() { + l.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded + }, func() { executed = true }) + }) + + assert.True(t, executed) + }) + + t.Run("timeout_panic", func(t *testing.T) { + paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) + + l := NewLoadStateLock(LoadStateOnlyMeta) + executed := false + + assert.NotPanics(t, func() { + l.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded + }, noop) + }) + assert.False(t, executed) + }) +} + +func TestPinIf(t *testing.T) { l := NewLoadStateLock(LoadStateOnlyMeta) - assert.True(t, l.RLockIf(IsNotReleased)) - l.RUnlock() - assert.False(t, l.RLockIf(IsDataLoaded)) + assert.True(t, l.PinIf(IsNotReleased)) + l.Unpin() + assert.False(t, l.PinIf(IsDataLoaded)) l = NewLoadStateLock(LoadStateDataLoaded) - assert.True(t, l.RLockIf(IsNotReleased)) - l.RUnlock() - assert.True(t, l.RLockIf(IsDataLoaded)) - l.RUnlock() + assert.True(t, l.PinIf(IsNotReleased)) + l.Unpin() + assert.True(t, l.PinIf(IsDataLoaded)) + l.Unpin() l = NewLoadStateLock(LoadStateOnlyMeta) l.StartReleaseAll().Done(nil) - assert.False(t, l.RLockIf(IsNotReleased)) - assert.False(t, l.RLockIf(IsDataLoaded)) + assert.False(t, l.PinIf(IsNotReleased)) + assert.False(t, l.PinIf(IsDataLoaded)) } func TestPin(t *testing.T) { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 02fbc7e4cd..65f20d7a25 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -254,9 +254,10 @@ type commonConfig struct { MetricsPort ParamItem `refreshable:"false"` // lock related params - EnableLockMetrics ParamItem `refreshable:"false"` - LockSlowLogInfoThreshold ParamItem `refreshable:"true"` - LockSlowLogWarnThreshold ParamItem `refreshable:"true"` + EnableLockMetrics ParamItem `refreshable:"false"` + LockSlowLogInfoThreshold ParamItem `refreshable:"true"` + LockSlowLogWarnThreshold ParamItem `refreshable:"true"` + MaxWLockConditionalWaitTime ParamItem `refreshable:"true"` StorageScheme ParamItem `refreshable:"false"` EnableStorageV2 ParamItem `refreshable:"false"` @@ -753,6 +754,15 @@ like the old password verification when updating the credential`, } p.LockSlowLogWarnThreshold.Init(base.mgr) + p.MaxWLockConditionalWaitTime = ParamItem{ + Key: "common.locks.maxWLockConditionalWaitTime", + Version: "2.5.4", + DefaultValue: "600", + Doc: "maximum seconds for waiting wlock conditional", + Export: true, + } + p.MaxWLockConditionalWaitTime.Init(base.mgr) + p.EnableStorageV2 = ParamItem{ Key: "common.storage.enablev2", Version: "2.3.1",