enhance: refine delegator state checking error msg (#42673)

issue: #42661
Add NotStopped() and IsWorking() methods to shardDelegator for better
state management and error handling.

Changes include:
- Add instance state checking methods with proper error messages
- Replace lifetime package calls with delegator instance methods
- Add comprehensive unit tests for state transitions and error cases
- Improve error reporting with channel name for better debugging

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-06-17 10:40:38 +08:00 committed by GitHub
parent 880915e08b
commit 679930bb93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 295 additions and 6 deletions

View File

@ -173,13 +173,27 @@ func (sd *shardDelegator) getLogger(ctx context.Context) *log.MLogger {
)
}
func (sd *shardDelegator) NotStopped(state lifetime.State) error {
if state != lifetime.Stopped {
return nil
}
return merr.WrapErrChannelNotAvailable(sd.vchannelName, fmt.Sprintf("delegator is not ready, state: %s", state.String()))
}
func (sd *shardDelegator) IsWorking(state lifetime.State) error {
if state == lifetime.Working {
return nil
}
return merr.WrapErrChannelNotAvailable(sd.vchannelName, fmt.Sprintf("delegator is not ready, state: %s", state.String()))
}
// Serviceable returns whether delegator is serviceable now.
func (sd *shardDelegator) Serviceable() bool {
return lifetime.IsWorking(sd.lifetime.GetState()) == nil
return sd.IsWorking(sd.lifetime.GetState()) == nil
}
func (sd *shardDelegator) Stopped() bool {
return lifetime.NotStopped(sd.lifetime.GetState()) != nil
return sd.NotStopped(sd.lifetime.GetState()) != nil
}
// Start sets delegator to working state.
@ -357,7 +371,7 @@ func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest
// Search preforms search operation on shard.
func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) {
log := sd.getLogger(ctx)
if err := sd.lifetime.Add(lifetime.IsWorking); err != nil {
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
return nil, err
}
defer sd.lifetime.Done()
@ -563,7 +577,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
// Query performs query operation on shard.
func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error) {
log := sd.getLogger(ctx)
if err := sd.lifetime.Add(lifetime.IsWorking); err != nil {
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
return nil, err
}
defer sd.lifetime.Done()
@ -658,7 +672,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
// GetStatistics returns statistics aggregated by delegator.
func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error) {
log := sd.getLogger(ctx)
if err := sd.lifetime.Add(lifetime.IsWorking); err != nil {
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
return nil, err
}
defer sd.lifetime.Done()
@ -976,7 +990,7 @@ func (sd *shardDelegator) GetTSafe() uint64 {
func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.CollectionSchema, schVersion uint64) error {
log := sd.getLogger(ctx)
if err := sd.lifetime.Add(lifetime.IsWorking); err != nil {
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
return err
}
defer sd.lifetime.Done()

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/segcorepb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metric"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -258,6 +259,113 @@ func (s *DelegatorSuite) TestBasicInfo() {
s.True(s.delegator.Serviceable())
}
// TestDelegatorStateChecks tests the state checking methods added/modified in the delegator
func (s *DelegatorSuite) TestDelegatorStateChecks() {
sd := s.delegator.(*shardDelegator)
s.Run("test_state_methods_with_different_states", func() {
// Test Initializing state
sd.lifetime.SetState(lifetime.Initializing)
// NotStopped should return nil for non-stopped states
err := sd.NotStopped(sd.lifetime.GetState())
s.NoError(err)
// IsWorking should return error for non-working states
err = sd.IsWorking(sd.lifetime.GetState())
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), "Initializing")
// Serviceable should return false for non-working states
s.False(sd.Serviceable())
// Stopped should return false for non-stopped states
s.False(sd.Stopped())
// Test Working state
sd.lifetime.SetState(lifetime.Working)
// NotStopped should return nil for non-stopped states
err = sd.NotStopped(sd.lifetime.GetState())
s.NoError(err)
// IsWorking should return nil for working state
err = sd.IsWorking(sd.lifetime.GetState())
s.NoError(err)
// Serviceable should return true for working state
s.True(sd.Serviceable())
// Stopped should return false for non-stopped states
s.False(sd.Stopped())
// Test Stopped state
sd.lifetime.SetState(lifetime.Stopped)
// NotStopped should return error for stopped state
err = sd.NotStopped(sd.lifetime.GetState())
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), "Stopped")
// IsWorking should return error for stopped state
err = sd.IsWorking(sd.lifetime.GetState())
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), "Stopped")
// Serviceable should return false for stopped state
s.False(sd.Serviceable())
// Stopped should return true for stopped state
s.True(sd.Stopped())
})
s.Run("test_state_methods_with_direct_state_parameter", func() {
// Test NotStopped with different states
err := sd.NotStopped(lifetime.Initializing)
s.NoError(err)
err = sd.NotStopped(lifetime.Working)
s.NoError(err)
err = sd.NotStopped(lifetime.Stopped)
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), sd.vchannelName)
// Test IsWorking with different states
err = sd.IsWorking(lifetime.Initializing)
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), sd.vchannelName)
err = sd.IsWorking(lifetime.Working)
s.NoError(err)
err = sd.IsWorking(lifetime.Stopped)
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
s.Contains(err.Error(), sd.vchannelName)
})
s.Run("test_error_messages_contain_channel_name", func() {
// Verify error messages contain the channel name for better debugging
err := sd.NotStopped(lifetime.Stopped)
s.Error(err)
s.Contains(err.Error(), sd.vchannelName)
err = sd.IsWorking(lifetime.Initializing)
s.Error(err)
s.Contains(err.Error(), sd.vchannelName)
err = sd.IsWorking(lifetime.Stopped)
s.Error(err)
s.Contains(err.Error(), sd.vchannelName)
})
}
func (s *DelegatorSuite) TestGetSegmentInfo() {
sealed, growing := s.delegator.GetSegmentInfo(false)
s.Equal(0, len(sealed))
@ -1436,6 +1544,173 @@ func (s *DelegatorSuite) TestRunAnalyzer() {
})
}
// TestDelegatorLifetimeIntegration tests the integration of lifetime state checks with main delegator methods
func (s *DelegatorSuite) TestDelegatorLifetimeIntegration() {
sd := s.delegator.(*shardDelegator)
ctx := context.Background()
s.Run("test_methods_fail_when_not_working", func() {
// Set delegator to Initializing state (not ready)
sd.lifetime.SetState(lifetime.Initializing)
// Search should fail when not ready
_, err := sd.Search(ctx, &querypb.SearchRequest{
Req: &internalpb.SearchRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// Query should fail when not ready
_, err = sd.Query(ctx, &querypb.QueryRequest{
Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// GetStatistics should fail when not ready
_, err = sd.GetStatistics(ctx, &querypb.GetStatisticsRequest{
Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// UpdateSchema should fail when not ready
err = sd.UpdateSchema(ctx, &schemapb.CollectionSchema{Name: "test"}, 1)
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
})
s.Run("test_methods_fail_when_stopped", func() {
// Set delegator to Stopped state
sd.lifetime.SetState(lifetime.Stopped)
// Search should fail when stopped
_, err := sd.Search(ctx, &querypb.SearchRequest{
Req: &internalpb.SearchRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// Query should fail when stopped
_, err = sd.Query(ctx, &querypb.QueryRequest{
Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// GetStatistics should fail when stopped
_, err = sd.GetStatistics(ctx, &querypb.GetStatisticsRequest{
Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
// UpdateSchema should fail when stopped
err = sd.UpdateSchema(ctx, &schemapb.CollectionSchema{Name: "test"}, 1)
s.Error(err)
s.Contains(err.Error(), "delegator is not ready")
})
}
// TestDelegatorStateTransitions tests state transitions and edge cases
func (s *DelegatorSuite) TestDelegatorStateTransitions() {
sd := s.delegator.(*shardDelegator)
s.Run("test_state_transition_sequence", func() {
// Test normal state transition sequence
// Start from Initializing
sd.lifetime.SetState(lifetime.Initializing)
s.False(sd.Serviceable())
s.False(sd.Stopped())
// Transition to Working
sd.Start() // This calls lifetime.SetState(lifetime.Working)
s.True(sd.Serviceable())
s.False(sd.Stopped())
// Transition to Stopped
sd.lifetime.SetState(lifetime.Stopped)
s.False(sd.Serviceable())
s.True(sd.Stopped())
})
s.Run("test_multiple_start_calls", func() {
// Test that multiple Start() calls don't cause issues
sd.lifetime.SetState(lifetime.Initializing)
s.False(sd.Serviceable())
// Call Start multiple times
sd.Start()
s.True(sd.Serviceable())
sd.Start()
sd.Start()
s.True(sd.Serviceable()) // Should remain serviceable
})
s.Run("test_start_after_stopped", func() {
// Test starting after being stopped
sd.lifetime.SetState(lifetime.Stopped)
s.True(sd.Stopped())
s.False(sd.Serviceable())
// Start again
sd.Start()
s.False(sd.Stopped())
s.True(sd.Serviceable())
})
s.Run("test_consistency_between_methods", func() {
// Test consistency between Serviceable() and Stopped() methods
// In Initializing state
sd.lifetime.SetState(lifetime.Initializing)
serviceable := sd.Serviceable()
stopped := sd.Stopped()
s.False(serviceable)
s.False(stopped)
// In Working state
sd.lifetime.SetState(lifetime.Working)
serviceable = sd.Serviceable()
stopped = sd.Stopped()
s.True(serviceable)
s.False(stopped)
// In Stopped state
sd.lifetime.SetState(lifetime.Stopped)
serviceable = sd.Serviceable()
stopped = sd.Stopped()
s.False(serviceable)
s.True(stopped)
})
s.Run("test_error_types_and_wrapping", func() {
// Test that errors are properly wrapped with channel information
// Test NotStopped error
err := sd.NotStopped(lifetime.Stopped)
s.Error(err)
s.True(errors.Is(err, merr.ErrChannelNotAvailable))
// Test IsWorking error
err = sd.IsWorking(lifetime.Initializing)
s.Error(err)
s.True(errors.Is(err, merr.ErrChannelNotAvailable))
err = sd.IsWorking(lifetime.Stopped)
s.Error(err)
s.True(errors.Is(err, merr.ErrChannelNotAvailable))
})
}
func TestDelegatorSuite(t *testing.T) {
suite.Run(t, new(DelegatorSuite))
}