milvus/internal/querycoordv2/observers/leader_cache_observer_test.go
congqixia ef6d9c25c2
fix: check final result only in LeaderCacheObserver flaky test (#46601)
Related to #46600

The test previously checked if all 3 collection IDs were batched
together in a single InvalidateShardLeaderCache call. This caused
flakiness because the observer may split events across multiple calls.

Fix by accumulating all collection IDs across multiple calls and
verifying that eventually all expected IDs (1, 2, 3) are processed.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: the test asserts that all registered collection IDs
{1,2,3} are eventually processed by InvalidateShardLeaderCache across
any number of calls — i.e., the observer must invalidate every
registered collection ID, not necessarily in a single batched RPC (fixes
flaky assumption from issue #46600).
- Logic removed/simplified: the strict expectation that all three IDs
arrive in one InvalidateShardLeaderCache call was replaced by
accumulating IDs into a ConcurrentSet (collectionIDs.Upsert in the mock)
and asserting eventual containment of 1,2,3. This removes the brittle
per-call batching assertion and uses a set-based accumulation (lines
where the mock calls Upsert and final Eventually checks
collectionIDs.Contain(...)).
- Why this is safe (no data loss or behavior regression): only test
assertions changed — production code (LeaderCacheObserver calling
InvalidateShardLeaderCache) is unchanged. The mock intercepts
InvalidateShardLeaderCache and accumulates req.GetCollectionIDs(); the
test still verifies single-ID handling via the existing len==1 &&
lo.Contains(... ) check (first mock block) and verifies that all IDs
were invalidated over time in the batch scenario (second mock block). No
production code paths were modified, so invalidation behavior and RPC
usage remain identical.
- Bug-fix note: this is a targeted test-only fix for issue #46600 — it
tolerates legitimate splitting of events across multiple
InvalidateShardLeaderCache invocations by aggregating IDs across calls
in the test mock, eliminating flakiness without altering runtime
behavior.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-12-26 10:17:19 +08:00

90 lines
3.0 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package observers
import (
"context"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type LeaderCacheObserverTestSuite struct {
suite.Suite
mockProxyManager *proxyutil.MockProxyClientManager
observer *LeaderCacheObserver
}
func (suite *LeaderCacheObserverTestSuite) SetupSuite() {
paramtable.Init()
suite.mockProxyManager = proxyutil.NewMockProxyClientManager(suite.T())
suite.observer = NewLeaderCacheObserver(suite.mockProxyManager)
}
func (suite *LeaderCacheObserverTestSuite) TestInvalidateShardLeaderCache() {
suite.observer.Start(context.TODO())
defer suite.observer.Stop()
ret := atomic.NewBool(false)
collectionIDs := typeutil.NewConcurrentSet[int64]()
suite.mockProxyManager.EXPECT().InvalidateShardLeaderCache(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, req *proxypb.InvalidateShardLeaderCacheRequest) error {
collectionIDs.Upsert(req.GetCollectionIDs()...)
collectionIDs := req.GetCollectionIDs()
if len(collectionIDs) == 1 && lo.Contains(collectionIDs, 1) {
ret.Store(true)
}
return nil
})
suite.observer.RegisterEvent(1)
suite.Eventually(func() bool {
return ret.Load()
}, 3*time.Second, 1*time.Second)
// test batch submit events
collectionIDs = typeutil.NewConcurrentSet[int64]()
suite.mockProxyManager.ExpectedCalls = nil
suite.mockProxyManager.EXPECT().InvalidateShardLeaderCache(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, req *proxypb.InvalidateShardLeaderCacheRequest) error {
collectionIDs.Upsert(req.GetCollectionIDs()...)
return nil
})
suite.observer.RegisterEvent(1)
suite.observer.RegisterEvent(2)
suite.observer.RegisterEvent(3)
suite.Eventually(func() bool {
return collectionIDs.Contain(1) && collectionIDs.Contain(2) && collectionIDs.Contain(3)
}, 3*time.Second, 1*time.Second)
}
func TestLeaderCacheObserverTestSuite(t *testing.T) {
suite.Run(t, new(LeaderCacheObserverTestSuite))
}