diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index ba4b2f94ad..225cf044a8 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -173,13 +173,27 @@ func (sd *shardDelegator) getLogger(ctx context.Context) *log.MLogger { ) } +func (sd *shardDelegator) NotStopped(state lifetime.State) error { + if state != lifetime.Stopped { + return nil + } + return merr.WrapErrChannelNotAvailable(sd.vchannelName, fmt.Sprintf("delegator is not ready, state: %s", state.String())) +} + +func (sd *shardDelegator) IsWorking(state lifetime.State) error { + if state == lifetime.Working { + return nil + } + return merr.WrapErrChannelNotAvailable(sd.vchannelName, fmt.Sprintf("delegator is not ready, state: %s", state.String())) +} + // Serviceable returns whether delegator is serviceable now. func (sd *shardDelegator) Serviceable() bool { - return lifetime.IsWorking(sd.lifetime.GetState()) == nil + return sd.IsWorking(sd.lifetime.GetState()) == nil } func (sd *shardDelegator) Stopped() bool { - return lifetime.NotStopped(sd.lifetime.GetState()) != nil + return sd.NotStopped(sd.lifetime.GetState()) != nil } // Start sets delegator to working state. @@ -357,7 +371,7 @@ func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest // Search preforms search operation on shard. func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) { log := sd.getLogger(ctx) - if err := sd.lifetime.Add(lifetime.IsWorking); err != nil { + if err := sd.lifetime.Add(sd.IsWorking); err != nil { return nil, err } defer sd.lifetime.Done() @@ -563,7 +577,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq // Query performs query operation on shard. func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error) { log := sd.getLogger(ctx) - if err := sd.lifetime.Add(lifetime.IsWorking); err != nil { + if err := sd.lifetime.Add(sd.IsWorking); err != nil { return nil, err } defer sd.lifetime.Done() @@ -658,7 +672,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) // GetStatistics returns statistics aggregated by delegator. func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error) { log := sd.getLogger(ctx) - if err := sd.lifetime.Add(lifetime.IsWorking); err != nil { + if err := sd.lifetime.Add(sd.IsWorking); err != nil { return nil, err } defer sd.lifetime.Done() @@ -976,7 +990,7 @@ func (sd *shardDelegator) GetTSafe() uint64 { func (sd *shardDelegator) UpdateSchema(ctx context.Context, schema *schemapb.CollectionSchema, schVersion uint64) error { log := sd.getLogger(ctx) - if err := sd.lifetime.Add(lifetime.IsWorking); err != nil { + if err := sd.lifetime.Add(sd.IsWorking); err != nil { return err } defer sd.lifetime.Done() diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index fedb057a11..92b3b3ae27 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/proto/segcorepb" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metric" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -258,6 +259,113 @@ func (s *DelegatorSuite) TestBasicInfo() { s.True(s.delegator.Serviceable()) } +// TestDelegatorStateChecks tests the state checking methods added/modified in the delegator +func (s *DelegatorSuite) TestDelegatorStateChecks() { + sd := s.delegator.(*shardDelegator) + + s.Run("test_state_methods_with_different_states", func() { + // Test Initializing state + sd.lifetime.SetState(lifetime.Initializing) + + // NotStopped should return nil for non-stopped states + err := sd.NotStopped(sd.lifetime.GetState()) + s.NoError(err) + + // IsWorking should return error for non-working states + err = sd.IsWorking(sd.lifetime.GetState()) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), "Initializing") + + // Serviceable should return false for non-working states + s.False(sd.Serviceable()) + + // Stopped should return false for non-stopped states + s.False(sd.Stopped()) + + // Test Working state + sd.lifetime.SetState(lifetime.Working) + + // NotStopped should return nil for non-stopped states + err = sd.NotStopped(sd.lifetime.GetState()) + s.NoError(err) + + // IsWorking should return nil for working state + err = sd.IsWorking(sd.lifetime.GetState()) + s.NoError(err) + + // Serviceable should return true for working state + s.True(sd.Serviceable()) + + // Stopped should return false for non-stopped states + s.False(sd.Stopped()) + + // Test Stopped state + sd.lifetime.SetState(lifetime.Stopped) + + // NotStopped should return error for stopped state + err = sd.NotStopped(sd.lifetime.GetState()) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), "Stopped") + + // IsWorking should return error for stopped state + err = sd.IsWorking(sd.lifetime.GetState()) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), "Stopped") + + // Serviceable should return false for stopped state + s.False(sd.Serviceable()) + + // Stopped should return true for stopped state + s.True(sd.Stopped()) + }) + + s.Run("test_state_methods_with_direct_state_parameter", func() { + // Test NotStopped with different states + err := sd.NotStopped(lifetime.Initializing) + s.NoError(err) + + err = sd.NotStopped(lifetime.Working) + s.NoError(err) + + err = sd.NotStopped(lifetime.Stopped) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), sd.vchannelName) + + // Test IsWorking with different states + err = sd.IsWorking(lifetime.Initializing) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), sd.vchannelName) + + err = sd.IsWorking(lifetime.Working) + s.NoError(err) + + err = sd.IsWorking(lifetime.Stopped) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + s.Contains(err.Error(), sd.vchannelName) + }) + + s.Run("test_error_messages_contain_channel_name", func() { + // Verify error messages contain the channel name for better debugging + err := sd.NotStopped(lifetime.Stopped) + s.Error(err) + s.Contains(err.Error(), sd.vchannelName) + + err = sd.IsWorking(lifetime.Initializing) + s.Error(err) + s.Contains(err.Error(), sd.vchannelName) + + err = sd.IsWorking(lifetime.Stopped) + s.Error(err) + s.Contains(err.Error(), sd.vchannelName) + }) +} + func (s *DelegatorSuite) TestGetSegmentInfo() { sealed, growing := s.delegator.GetSegmentInfo(false) s.Equal(0, len(sealed)) @@ -1436,6 +1544,173 @@ func (s *DelegatorSuite) TestRunAnalyzer() { }) } +// TestDelegatorLifetimeIntegration tests the integration of lifetime state checks with main delegator methods +func (s *DelegatorSuite) TestDelegatorLifetimeIntegration() { + sd := s.delegator.(*shardDelegator) + ctx := context.Background() + + s.Run("test_methods_fail_when_not_working", func() { + // Set delegator to Initializing state (not ready) + sd.lifetime.SetState(lifetime.Initializing) + + // Search should fail when not ready + _, err := sd.Search(ctx, &querypb.SearchRequest{ + Req: &internalpb.SearchRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // Query should fail when not ready + _, err = sd.Query(ctx, &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // GetStatistics should fail when not ready + _, err = sd.GetStatistics(ctx, &querypb.GetStatisticsRequest{ + Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // UpdateSchema should fail when not ready + err = sd.UpdateSchema(ctx, &schemapb.CollectionSchema{Name: "test"}, 1) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + }) + + s.Run("test_methods_fail_when_stopped", func() { + // Set delegator to Stopped state + sd.lifetime.SetState(lifetime.Stopped) + + // Search should fail when stopped + _, err := sd.Search(ctx, &querypb.SearchRequest{ + Req: &internalpb.SearchRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // Query should fail when stopped + _, err = sd.Query(ctx, &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // GetStatistics should fail when stopped + _, err = sd.GetStatistics(ctx, &querypb.GetStatisticsRequest{ + Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + + // UpdateSchema should fail when stopped + err = sd.UpdateSchema(ctx, &schemapb.CollectionSchema{Name: "test"}, 1) + s.Error(err) + s.Contains(err.Error(), "delegator is not ready") + }) +} + +// TestDelegatorStateTransitions tests state transitions and edge cases +func (s *DelegatorSuite) TestDelegatorStateTransitions() { + sd := s.delegator.(*shardDelegator) + + s.Run("test_state_transition_sequence", func() { + // Test normal state transition sequence + + // Start from Initializing + sd.lifetime.SetState(lifetime.Initializing) + s.False(sd.Serviceable()) + s.False(sd.Stopped()) + + // Transition to Working + sd.Start() // This calls lifetime.SetState(lifetime.Working) + s.True(sd.Serviceable()) + s.False(sd.Stopped()) + + // Transition to Stopped + sd.lifetime.SetState(lifetime.Stopped) + s.False(sd.Serviceable()) + s.True(sd.Stopped()) + }) + + s.Run("test_multiple_start_calls", func() { + // Test that multiple Start() calls don't cause issues + sd.lifetime.SetState(lifetime.Initializing) + s.False(sd.Serviceable()) + + // Call Start multiple times + sd.Start() + s.True(sd.Serviceable()) + + sd.Start() + sd.Start() + s.True(sd.Serviceable()) // Should remain serviceable + }) + + s.Run("test_start_after_stopped", func() { + // Test starting after being stopped + sd.lifetime.SetState(lifetime.Stopped) + s.True(sd.Stopped()) + s.False(sd.Serviceable()) + + // Start again + sd.Start() + s.False(sd.Stopped()) + s.True(sd.Serviceable()) + }) + + s.Run("test_consistency_between_methods", func() { + // Test consistency between Serviceable() and Stopped() methods + + // In Initializing state + sd.lifetime.SetState(lifetime.Initializing) + serviceable := sd.Serviceable() + stopped := sd.Stopped() + s.False(serviceable) + s.False(stopped) + + // In Working state + sd.lifetime.SetState(lifetime.Working) + serviceable = sd.Serviceable() + stopped = sd.Stopped() + s.True(serviceable) + s.False(stopped) + + // In Stopped state + sd.lifetime.SetState(lifetime.Stopped) + serviceable = sd.Serviceable() + stopped = sd.Stopped() + s.False(serviceable) + s.True(stopped) + }) + + s.Run("test_error_types_and_wrapping", func() { + // Test that errors are properly wrapped with channel information + + // Test NotStopped error + err := sd.NotStopped(lifetime.Stopped) + s.Error(err) + s.True(errors.Is(err, merr.ErrChannelNotAvailable)) + + // Test IsWorking error + err = sd.IsWorking(lifetime.Initializing) + s.Error(err) + s.True(errors.Is(err, merr.ErrChannelNotAvailable)) + + err = sd.IsWorking(lifetime.Stopped) + s.Error(err) + s.True(errors.Is(err, merr.ErrChannelNotAvailable)) + }) +} + func TestDelegatorSuite(t *testing.T) { suite.Run(t, new(DelegatorSuite)) }