enhance: add delegator catching up streaming data state tracking (#46551)

issue: #46550
- Add CatchUpStreamingDataTsLag parameter to control tolerable lag
  threshold for delegator to be considered caught up
- Add catchingUpStreamingData field in delegator to track whether
  delegator has caught up with streaming data
- Add catching_up_streaming_data field in LeaderViewStatus proto
- Check catching up status in CheckDelegatorDataReady, return not
  ready when delegator is still catching up streaming data
- Add unit tests for the new functionality

When tsafe lag exceeds the threshold, the distribution will not be
considered serviceable, preventing queries from timing out in waitTSafe.
This is useful when streaming message queue consumption is slow.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: a delegator must not be considered serviceable while
its tsafe lags behind the latest committed timestamp beyond a
configurable tolerance; a delegator is "caught-up" only when
(latestTsafe - delegator.GetTSafe()) < CatchUpStreamingDataTsLag
(configured by queryNode.delegator.catchUpStreamingDataTsLag, default
1s).
- New capability and where it takes effect: adds streaming-catchup
tracking to QueryNode/QueryCoord — an atomic catchingUpStreamingData
flag on shardDelegator (internal/querynodev2/delegator/delegator.go), a
new param CatchUpStreamingDataTsLag
(pkg/util/paramtable/component_param.go), and a
LeaderViewStatus.catching_up_streaming_data field in the proto
(pkg/proto/query_coord.proto). The flag is exposed in
GetDataDistribution (internal/querynodev2/services.go) and used by
QueryCoord readiness checks
(internal/querycoordv2/utils/util.go::CheckDelegatorDataReady) to reject
leaders that are still catching up.
- What logic is simplified/added (not removed): instead of relying
solely on segment distribution/worker heartbeats, the PR adds an
explicit readiness gate that returns "not available" when the delegator
reports catching-up-streaming-data. This is strictly additive — no
existing checks are removed; the new precondition runs before segment
availability validation to prevent premature routing to slow-consuming
delegators.
- Why this does NOT cause data loss or regress behavior: the change only
controls serviceability visibility and routing — it never drops or
mutates data. Concretely: shardDelegator starts with
catchingUpStreamingData=true and flips to false in UpdateTSafe once the
sampled lag falls below the configured threshold
(internal/querynodev2/delegator/delegator.go::UpdateTSafe). QueryCoord
will short-circuit in CheckDelegatorDataReady when
leader.Status.GetCatchingUpStreamingData() is true
(internal/querycoordv2/utils/util.go), returning a channel-not-available
error before any segment checks; when the flag clears, existing
segment-distribution checks (same code paths) resume. Tests added cover
both catching-up and caught-up paths
(internal/querynodev2/delegator/delegator_test.go,
internal/querycoordv2/utils/util_test.go,
internal/querynodev2/services_test.go), demonstrating convergence
without changed data flows or deletion of data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-12-29 17:15:21 +08:00 committed by GitHub
parent 4230a5beaa
commit 293838bb67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1472 additions and 1205 deletions

View File

@ -44,6 +44,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
// 2. All QueryNodes in the distribution are online
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
// 4. All segments of the shard in target should be in the distribution
// 5. The delegator has caught up with streaming data
func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
log := log.Ctx(context.TODO()).
WithRateGroup(fmt.Sprintf("util.CheckDelegatorDataReady-%d", leader.CollectionID), 1, 60).
@ -57,6 +58,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target
return fmt.Errorf("leader not available: %w", err)
}
// Check if delegator is still catching up with streaming data
if leader.Status != nil && leader.Status.GetCatchingUpStreamingData() {
log.RatedInfo(10, "leader is not available due to still catching up streaming data",
zap.String("channel", leader.Channel))
return merr.WrapErrChannelNotAvailable(leader.Channel, "still catching up streaming data")
}
segmentDist := targetMgr.GetSealedSegmentsByChannel(context.TODO(), leader.CollectionID, leader.Channel, scope)
// Check whether segments are fully loaded
for segmentID := range segmentDist {

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/blang/semver/v4"
"github.com/bytedance/mockey"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -157,6 +158,52 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.Error(err)
})
suite.Run("catching up streaming data", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
Status: &querypb.LeaderViewStatus{
Serviceable: true,
CatchingUpStreamingData: true, // still catching up
},
}
// When catching up, function returns early without calling targetMgr
// so we can pass nil as targetMgr
suite.setNodeAvailable(1, 2)
err := CheckDelegatorDataReady(suite.nodeMgr, nil, leadview, meta.CurrentTarget)
suite.Error(err)
suite.Contains(err.Error(), "catching up streaming data")
})
suite.Run("caught up streaming data", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
Status: &querypb.LeaderViewStatus{
Serviceable: true,
CatchingUpStreamingData: false, // already caught up
},
}
// Use mockey to mock TargetManager.GetSealedSegmentsByChannel
targetMgr := &meta.TargetManager{}
mockGetSealedSegments := mockey.Mock(mockey.GetMethod(targetMgr, "GetSealedSegmentsByChannel")).
Return(map[int64]*datapb.SegmentInfo{
2: {
ID: 2,
InsertChannel: "test",
},
}).Build()
defer mockGetSealedSegments.UnPatch()
suite.setNodeAvailable(1, 2)
err := CheckDelegatorDataReady(suite.nodeMgr, targetMgr, leadview, meta.CurrentTarget)
suite.NoError(err)
})
}
func (suite *UtilTestSuite) TestGetChannelRWAndRONodesFor260() {

View File

@ -108,6 +108,7 @@ type ShardDelegator interface {
// control
Serviceable() bool
CatchingUpStreamingData() bool
Start()
Close()
}
@ -165,6 +166,9 @@ type shardDelegator struct {
// schema version
schemaChangeMutex sync.RWMutex
schemaVersion uint64
// streaming data catch-up state
catchingUpStreamingData *atomic.Bool
}
// getLogger returns the zap logger with pre-defined shard attributes.
@ -996,10 +1000,29 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err
// updateTSafe read current tsafe value from tsafeManager.
func (sd *shardDelegator) UpdateTSafe(tsafe uint64) {
log := sd.getLogger(context.Background()).WithRateGroup(fmt.Sprintf("UpdateTSafe-%s", sd.vchannelName), 1, 60)
sd.tsCond.L.Lock()
if tsafe > sd.latestTsafe.Load() {
sd.latestTsafe.Store(tsafe)
sd.tsCond.Broadcast()
// Check if caught up with streaming data
if sd.catchingUpStreamingData.Load() {
lagThreshold := paramtable.Get().QueryNodeCfg.CatchUpStreamingDataTsLag.GetAsDurationByParse()
if lagThreshold > 0 {
tsafeTime := tsoutil.PhysicalTime(tsafe)
lag := time.Since(tsafeTime)
caughtUp := lag <= lagThreshold
log.RatedInfo(10, "delegator catching up streaming data progress",
zap.String("channel", sd.vchannelName),
zap.Duration("lag", lag),
zap.Duration("threshold", lagThreshold),
zap.Bool("caughtUp", caughtUp))
if caughtUp {
sd.catchingUpStreamingData.Store(false)
}
}
}
}
sd.tsCond.L.Unlock()
}
@ -1008,6 +1031,11 @@ func (sd *shardDelegator) GetTSafe() uint64 {
return sd.latestTsafe.Load()
}
// CatchingUpStreamingData returns true if delegator is still catching up with streaming data.
func (sd *shardDelegator) CatchingUpStreamingData() bool {
return sd.catchingUpStreamingData.Load()
}
func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.CollectionSchema, schVersion uint64) error {
log := sd.getLogger(ctx)
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
@ -1179,17 +1207,18 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
distribution: NewDistribution(channel, queryView),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock,
[]string{fmt.Sprint(paramtable.GetNodeID()), channel}),
pkOracle: pkoracle.NewPkOracle(),
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
queryHook: queryHook,
chunkManager: chunkManager,
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
excludedSegments: excludedSegments,
functionRunners: make(map[int64]function.FunctionRunner),
analyzerRunners: make(map[UniqueID]function.Analyzer),
isBM25Field: make(map[int64]bool),
l0ForwardPolicy: policy,
pkOracle: pkoracle.NewPkOracle(),
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
queryHook: queryHook,
chunkManager: chunkManager,
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
excludedSegments: excludedSegments,
functionRunners: make(map[int64]function.FunctionRunner),
analyzerRunners: make(map[UniqueID]function.Analyzer),
isBM25Field: make(map[int64]bool),
l0ForwardPolicy: policy,
catchingUpStreamingData: atomic.NewBool(true),
}
for _, tf := range collection.Schema().GetFunctions() {

View File

@ -20,6 +20,7 @@ import (
"context"
"io"
"os"
"sync"
"testing"
"time"
@ -30,6 +31,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -2040,3 +2042,84 @@ func TestNewRowCountBasedEvaluator_PartialResultAcceptance(t *testing.T) {
assert.Equal(t, 0.9, accessedRatio)
})
}
func TestDelegatorCatchingUpStreamingData(t *testing.T) {
paramtable.Init()
t.Run("initial state is catching up", func(t *testing.T) {
// Create a minimal delegator to test CatchingUpStreamingData
sd := &shardDelegator{
catchingUpStreamingData: atomic.NewBool(true),
}
assert.True(t, sd.CatchingUpStreamingData())
})
t.Run("state changes to caught up when lag is small", func(t *testing.T) {
// Mock the config to return 5 seconds threshold
mockParam := mockey.Mock(mockey.GetMethod(&paramtable.ParamItem{}, "GetAsDurationByParse")).Return(5 * time.Second).Build()
defer mockParam.UnPatch()
sd := &shardDelegator{
vchannelName: "test-channel",
latestTsafe: atomic.NewUint64(0),
catchingUpStreamingData: atomic.NewBool(true),
tsCond: sync.NewCond(&sync.Mutex{}),
}
// Initially catching up
assert.True(t, sd.CatchingUpStreamingData())
// Update tsafe with a recent timestamp (lag < 5s)
recentTs := tsoutil.ComposeTSByTime(time.Now(), 0)
sd.UpdateTSafe(recentTs)
// Should now be caught up
assert.False(t, sd.CatchingUpStreamingData())
})
t.Run("state remains catching up when lag is large", func(t *testing.T) {
// Mock the config to return 5 seconds threshold
mockParam := mockey.Mock(mockey.GetMethod(&paramtable.ParamItem{}, "GetAsDurationByParse")).Return(5 * time.Second).Build()
defer mockParam.UnPatch()
sd := &shardDelegator{
vchannelName: "test-channel",
latestTsafe: atomic.NewUint64(0),
catchingUpStreamingData: atomic.NewBool(true),
tsCond: sync.NewCond(&sync.Mutex{}),
}
// Initially catching up
assert.True(t, sd.CatchingUpStreamingData())
// Update tsafe with an old timestamp (lag > 5s)
oldTs := tsoutil.ComposeTSByTime(time.Now().Add(-10*time.Second), 0)
sd.UpdateTSafe(oldTs)
// Should still be catching up
assert.True(t, sd.CatchingUpStreamingData())
})
t.Run("threshold disabled when set to 0", func(t *testing.T) {
// Mock the config to return 0 (disabled)
mockParam := mockey.Mock(mockey.GetMethod(&paramtable.ParamItem{}, "GetAsDurationByParse")).Return(0 * time.Second).Build()
defer mockParam.UnPatch()
sd := &shardDelegator{
vchannelName: "test-channel",
latestTsafe: atomic.NewUint64(0),
catchingUpStreamingData: atomic.NewBool(true),
tsCond: sync.NewCond(&sync.Mutex{}),
}
// Initially catching up
assert.True(t, sd.CatchingUpStreamingData())
// Update tsafe with a recent timestamp
recentTs := tsoutil.ComposeTSByTime(time.Now(), 0)
sd.UpdateTSafe(recentTs)
// Should still be catching up (threshold disabled)
assert.True(t, sd.CatchingUpStreamingData())
})
}

View File

@ -1088,6 +1088,51 @@ func (_c *MockShardDelegator_Serviceable_Call) RunAndReturn(run func() bool) *Mo
return _c
}
// CatchingUpStreamingData provides a mock function with no fields
func (_m *MockShardDelegator) CatchingUpStreamingData() bool {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for CatchingUpStreamingData")
}
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockShardDelegator_CatchingUpStreamingData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CatchingUpStreamingData'
type MockShardDelegator_CatchingUpStreamingData_Call struct {
*mock.Call
}
// CatchingUpStreamingData is a helper method to define mock.On call
func (_e *MockShardDelegator_Expecter) CatchingUpStreamingData() *MockShardDelegator_CatchingUpStreamingData_Call {
return &MockShardDelegator_CatchingUpStreamingData_Call{Call: _e.mock.On("CatchingUpStreamingData")}
}
func (_c *MockShardDelegator_CatchingUpStreamingData_Call) Run(run func()) *MockShardDelegator_CatchingUpStreamingData_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockShardDelegator_CatchingUpStreamingData_Call) Return(_a0 bool) *MockShardDelegator_CatchingUpStreamingData_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockShardDelegator_CatchingUpStreamingData_Call) RunAndReturn(run func() bool) *MockShardDelegator_CatchingUpStreamingData_Call {
_c.Call.Return(run)
return _c
}
// Start provides a mock function with no fields
func (_m *MockShardDelegator) Start() {
_m.Called()

View File

@ -1304,7 +1304,8 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
PartitionStatsVersions: delegator.GetPartitionStatsVersions(ctx),
TargetVersion: queryView.GetVersion(),
Status: &querypb.LeaderViewStatus{
Serviceable: queryView.Serviceable(),
Serviceable: queryView.Serviceable(),
CatchingUpStreamingData: delegator.CatchingUpStreamingData(),
},
})
return true

View File

@ -1958,6 +1958,32 @@ func (suite *ServiceSuite) TestGetDataDistribution_Failed() {
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
}
func (suite *ServiceSuite) TestGetDataDistribution_LeaderViewStatus() {
ctx := context.Background()
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
req := &querypb.GetDataDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
},
}
resp, err := suite.node.GetDataDistribution(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// Verify LeaderView has Status field with CatchingUpStreamingData
suite.NotEmpty(resp.LeaderViews)
for _, leaderView := range resp.LeaderViews {
suite.NotNil(leaderView.Status, "LeaderView should have Status field")
// Initially delegator is catching up streaming data (true)
suite.True(leaderView.Status.CatchingUpStreamingData,
"New delegator should be catching up streaming data")
}
}
func (suite *ServiceSuite) TestSyncDistribution_Normal() {
ctx := context.Background()
// prepare

View File

@ -675,6 +675,7 @@ message LeaderView {
message LeaderViewStatus {
bool serviceable = 1;
bool catching_up_streaming_data = 2; // true = still catching up, not ready
}
message SegmentDist {

File diff suppressed because it is too large Load Diff

View File

@ -3332,8 +3332,9 @@ type queryNodeConfig struct {
GracefulStopTimeout ParamItem `refreshable:"false"`
// tsafe
MaxTimestampLag ParamItem `refreshable:"true"`
DowngradeTsafe ParamItem `refreshable:"true"`
MaxTimestampLag ParamItem `refreshable:"true"`
DowngradeTsafe ParamItem `refreshable:"true"`
CatchUpStreamingDataTsLag ParamItem `refreshable:"true"`
// delete buffer
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
@ -4299,6 +4300,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
}
p.DowngradeTsafe.Init(base.mgr)
p.CatchUpStreamingDataTsLag = ParamItem{
Key: "queryNode.delegator.catchUpStreamingDataTsLag",
Version: "2.6.8",
DefaultValue: "1s",
Doc: "Tolerable lag for delegator to be considered caught up with streaming data",
}
p.CatchUpStreamingDataTsLag.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "queryNode.gracefulStopTimeout",
Version: "2.2.1",

View File

@ -525,6 +525,13 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1.0, Params.PartialResultRequiredDataRatio.GetAsFloat())
params.Save(Params.PartialResultRequiredDataRatio.Key, "0.8")
assert.Equal(t, 0.8, Params.PartialResultRequiredDataRatio.GetAsFloat())
// test CatchUpStreamingDataTsLag parameter
assert.Equal(t, 1*time.Second, Params.CatchUpStreamingDataTsLag.GetAsDurationByParse())
params.Save(Params.CatchUpStreamingDataTsLag.Key, "5s")
assert.Equal(t, 5*time.Second, Params.CatchUpStreamingDataTsLag.GetAsDurationByParse())
params.Save(Params.CatchUpStreamingDataTsLag.Key, "0s")
assert.Equal(t, time.Duration(0), Params.CatchUpStreamingDataTsLag.GetAsDurationByParse())
})
t.Run("test dataCoordConfig", func(t *testing.T) {