diff --git a/configs/milvus.yaml b/configs/milvus.yaml index db4bb1dcae..f7d33c24dd 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -230,6 +230,10 @@ queryNode: # Max read concurrency must greater than or equal to 1, and less than or equal to runtime.NumCPU * 100. maxReadConcurrentRatio: 2.0 # (0, 100] cpuRatio: 10.0 # ratio used to estimate read task cpu usage. + # maxTimestampLag is the max ts lag between serviceable and guarantee timestamp. + # if the lag is larger than this config, scheduler will return error without waiting. + # the valid value is [3600, infinite) + maxTimestampLag: 86400 grouping: enabled: true diff --git a/internal/querynode/errors.go b/internal/querynode/errors.go index 7949a299c5..210d90d152 100644 --- a/internal/querynode/errors.go +++ b/internal/querynode/errors.go @@ -19,16 +19,26 @@ package querynode import ( "errors" "fmt" + "time" ) var ( + // ErrShardNotAvailable shard not available error base. ErrShardNotAvailable = errors.New("ShardNotAvailable") + // ErrTsLagTooLarge serviceable and guarantee lag too large. + ErrTsLagTooLarge = errors.New("Timestamp lag too large") ) +// WrapErrShardNotAvailable wraps ErrShardNotAvailable with replica id and channel name. func WrapErrShardNotAvailable(replicaID int64, shard string) error { return fmt.Errorf("%w(replica=%d, shard=%s)", ErrShardNotAvailable, replicaID, shard) } +// WrapErrTsLagTooLarge wraps ErrTsLagTooLarge with lag and max value. +func WrapErrTsLagTooLarge(duration time.Duration, maxLag time.Duration) error { + return fmt.Errorf("%w lag(%s) max(%s)", ErrTsLagTooLarge, duration, maxLag) +} + // msgQueryNodeIsUnhealthy is the error msg of unhealthy query node func msgQueryNodeIsUnhealthy(nodeID UniqueID) string { return fmt.Sprintf("query node %d is not ready", nodeID) diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index 26a7e66c1a..ec20de99f8 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -168,15 +168,26 @@ func (b *baseReadTask) Ready() (bool, error) { gt, _ := tsoutil.ParseTS(guaranteeTs) st, _ := tsoutil.ParseTS(serviceTime) if guaranteeTs > serviceTime { + lag := gt.Sub(st) + maxLag := Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second) + if lag > maxLag { + log.Warn("guarantee and servicable ts larger than MaxLag", + zap.Time("guaranteeTime", gt), + zap.Time("serviceableTime", st), + zap.Duration("lag", lag), + zap.Duration("maxTsLag", maxLag), + ) + return false, WrapErrTsLagTooLarge(lag, maxLag) + } return false, nil } log.Debug("query msg can do", - zap.Any("collectionID", b.CollectionID), - zap.Any("sm.GuaranteeTimestamp", gt), - zap.Any("serviceTime", st), - zap.Any("delta milliseconds", gt.Sub(st).Milliseconds()), - zap.Any("channel", channel), - zap.Any("msgID", b.ID())) + zap.Int64("collectionID", b.CollectionID), + zap.Time("sm.GuaranteeTimestamp", gt), + zap.Time("serviceTime", st), + zap.Int64("delta milliseconds", gt.Sub(st).Milliseconds()), + zap.String("channel", channel), + zap.Int64("msgID", b.ID())) b.waitTsDur = b.waitTSafeTr.Elapse("wait for tsafe done") return true, nil } diff --git a/internal/querynode/task_read_test.go b/internal/querynode/task_read_test.go index 72ade4777b..877faccfad 100644 --- a/internal/querynode/task_read_test.go +++ b/internal/querynode/task_read_test.go @@ -6,7 +6,10 @@ import ( "time" "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -30,6 +33,7 @@ func (s *baseReadTaskSuite) SetupSuite() { lcm := &mocks.ChunkManager{} tsafe := &MockTSafeReplicaInterface{} + s.tsafe = tsafe qs, err := newQueryShard(context.Background(), defaultCollectionID, defaultDMLChannel, defaultReplicaID, nil, meta, tsafe, lcm, rcm, false) s.Require().NoError(err) @@ -107,7 +111,49 @@ func (s *baseReadTaskSuite) TestTimeoutError() { s.Assert().ErrorIs(s.task.TimeoutError(), context.DeadlineExceeded) }) +} +func (s *baseReadTaskSuite) TestReady() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.task.ctx = ctx + baseTime := time.Now() + serviceable := tsoutil.ComposeTSByTime(baseTime, 0) + s.tsafe.EXPECT().getTSafe(mock.AnythingOfType("string")).Return(serviceable, nil) + s.Run("lag too large", func() { + tooLargeGuarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)).Add(time.Second) + guaranteeTs := tsoutil.ComposeTSByTime(tooLargeGuarantee, 0) + s.task.GuaranteeTimestamp = guaranteeTs + s.task.DataScope = querypb.DataScope_Historical + + ready, err := s.task.Ready() + s.False(ready) + s.Error(err) + s.ErrorIs(err, ErrTsLagTooLarge) + }) + + s.Run("not ready", func() { + guarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)).Add(-time.Second) + guaranteeTs := tsoutil.ComposeTSByTime(guarantee, 0) + s.task.GuaranteeTimestamp = guaranteeTs + s.task.DataScope = querypb.DataScope_Historical + + ready, err := s.task.Ready() + s.False(ready) + s.NoError(err) + }) + + s.Run("ready", func() { + guarantee := baseTime.Add(-time.Second) + guaranteeTs := tsoutil.ComposeTSByTime(guarantee, 0) + s.task.GuaranteeTimestamp = guaranteeTs + s.task.DataScope = querypb.DataScope_Historical + + ready, err := s.task.Ready() + s.True(ready) + s.NoError(err) + }) } func TestBaseReadTask(t *testing.T) { diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 18ff262248..79ee5dc814 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1041,6 +1041,7 @@ type queryNodeConfig struct { MaxGroupNQ ParamItem `refreshable:"true"` TopKMergeRatio ParamItem `refreshable:"true"` CPURatio ParamItem `refreshable:"true"` + MaxTimestampLag ParamItem `refreshable:"true"` GCHelperEnabled ParamItem `refreshable:"false"` MinimumGOGCConfig ParamItem `refreshable:"false"` @@ -1254,6 +1255,13 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.MaxDiskUsagePercentage.Init(base.mgr) + p.MaxTimestampLag = ParamItem{ + Key: "queryNode.scheduler.maxTimestampLag", + Version: "2.2.3", + DefaultValue: "86400", + } + p.MaxTimestampLag.Init(base.mgr) + p.GCHelperEnabled = ParamItem{ Key: "queryNode.gchelper.enabled", Version: "2.0.0",