diff --git a/Makefile b/Makefile index 1349c91e2c..d30a0a7336 100644 --- a/Makefile +++ b/Makefile @@ -441,6 +441,7 @@ generate-mockery-utils: getdeps $(INSTALL_PATH)/mockery --name=Factory --dir=internal/util/dependency --output=internal/util/dependency --filename=mock_factory.go --with-expecter --structname=MockFactory --inpackage # tso.Allocator $(INSTALL_PATH)/mockery --name=Allocator --dir=internal/tso --output=internal/tso/mocks --filename=allocator.go --with-expecter --structname=Allocator --outpkg=mocktso + $(INSTALL_PATH)/mockery --name=SessionInterface --dir=$(PWD)/internal/util/sessionutil --output=$(PWD)/internal/util/sessionutil --filename=mock_session.go --with-expecter --structname=MockSession --inpackage generate-mockery-kv: getdeps $(INSTALL_PATH)/mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=txn_kv.go --with-expecter diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 4cbedf96fc..57f9763155 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -262,7 +262,10 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), Timestamps: delRecord.Timestamps, }) - if errors.Is(err, merr.ErrSegmentNotFound) { + if errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("try to delete data on non-exist node") + return retry.Unrecoverable(err) + } else if errors.Is(err, merr.ErrSegmentNotFound) { log.Warn("try to delete data of released segment") return nil } else if err != nil { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index fb8da49979..2d93cac83b 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -327,6 +328,18 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + + // test worker offline + worker1.ExpectedCalls = nil + worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrNodeNotFound) + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) } func (s *DelegatorDataSuite) TestLoadSegments() { diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 058d6cf0d3..d407d428bc 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -48,6 +48,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // GrpcClient abstracts client of grpc @@ -105,7 +106,7 @@ type ClientBase[T interface { maxCancelError int32 NodeID atomic.Int64 - sess *sessionutil.Session + sess sessionutil.SessionInterface } func NewClientBase[T interface { @@ -321,9 +322,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { } func (c *ClientBase[T]) verifySession(ctx context.Context) error { - if funcutil.CheckCtxValid(ctx) { + if !funcutil.CheckCtxValid(ctx) { return nil } + log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole())) if time.Since(c.lastSessionCheck.Load()) < c.minSessionCheckInterval { log.Debug("skip session check, verify too frequent") @@ -410,6 +412,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er defer cancel() err := retry.Do(ctx, func() error { if generic.IsZero(client) { + switch c.GetRole() { + case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole: + // if session doesn't exist, no need to reset connection for datanode/indexnode/querynode + err := c.verifySession(ctx) + if err != nil && errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("failed to verify node session", zap.Error(err)) + // stop retry + return retry.Unrecoverable(err) + } + } + err := errors.Wrap(clientErr, "empty grpc client") log.Warn("grpc client is nil, maybe fail to get client in the retry state", zap.Error(err)) resetClientFunc() @@ -428,6 +441,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er log.Warn("start to reset connection because of specific reasons", zap.Error(err)) resetClientFunc() } else { + // err occurs but no need to reset connection, try to verify session err := c.verifySession(ctx) if err != nil { log.Warn("failed to verify session, reset connection", zap.Error(err)) diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index dd66b57bd7..e567f71b26 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/examples/helloworld/helloworld" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -92,6 +94,29 @@ func TestClientBase_connect(t *testing.T) { }) } +func TestClientBase_NodeSessionNotExist(t *testing.T) { + base := ClientBase[*mockClient]{ + maxCancelError: 10, + MaxAttempts: 3, + } + base.SetGetAddrFunc(func() (string, error) { + return "", errors.New("mocked address error") + }) + base.role = typeutil.QueryNodeRole + mockSession := sessionutil.NewMockSession(t) + mockSession.EXPECT().GetSessions(mock.Anything).Return(nil, 0, nil) + base.sess = mockSession + base.grpcClientMtx.Lock() + base.grpcClient = nil + base.grpcClientMtx.Unlock() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err := base.Call(ctx, func(client *mockClient) (any, error) { + return struct{}{}, nil + }) + assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) +} + func TestClientBase_Call(t *testing.T) { testCall(t, false) } diff --git a/internal/util/sessionutil/mock_session.go b/internal/util/sessionutil/mock_session.go new file mode 100644 index 0000000000..0e771d3ab0 --- /dev/null +++ b/internal/util/sessionutil/mock_session.go @@ -0,0 +1,864 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package sessionutil + +import ( + context "context" + + semver "github.com/blang/semver/v4" + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// MockSession is an autogenerated mock type for the SessionInterface type +type MockSession struct { + mock.Mock +} + +type MockSession_Expecter struct { + mock *mock.Mock +} + +func (_m *MockSession) EXPECT() *MockSession_Expecter { + return &MockSession_Expecter{mock: &_m.Mock} +} + +// Disconnected provides a mock function with given fields: +func (_m *MockSession) Disconnected() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSession_Disconnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnected' +type MockSession_Disconnected_Call struct { + *mock.Call +} + +// Disconnected is a helper method to define mock.On call +func (_e *MockSession_Expecter) Disconnected() *MockSession_Disconnected_Call { + return &MockSession_Disconnected_Call{Call: _e.mock.On("Disconnected")} +} + +func (_c *MockSession_Disconnected_Call) Run(run func()) *MockSession_Disconnected_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_Disconnected_Call) Return(_a0 bool) *MockSession_Disconnected_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_Disconnected_Call) RunAndReturn(run func() bool) *MockSession_Disconnected_Call { + _c.Call.Return(run) + return _c +} + +// ForceActiveStandby provides a mock function with given fields: activateFunc +func (_m *MockSession) ForceActiveStandby(activateFunc func() error) error { + ret := _m.Called(activateFunc) + + var r0 error + if rf, ok := ret.Get(0).(func(func() error) error); ok { + r0 = rf(activateFunc) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSession_ForceActiveStandby_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceActiveStandby' +type MockSession_ForceActiveStandby_Call struct { + *mock.Call +} + +// ForceActiveStandby is a helper method to define mock.On call +// - activateFunc func() error +func (_e *MockSession_Expecter) ForceActiveStandby(activateFunc interface{}) *MockSession_ForceActiveStandby_Call { + return &MockSession_ForceActiveStandby_Call{Call: _e.mock.On("ForceActiveStandby", activateFunc)} +} + +func (_c *MockSession_ForceActiveStandby_Call) Run(run func(activateFunc func() error)) *MockSession_ForceActiveStandby_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func() error)) + }) + return _c +} + +func (_c *MockSession_ForceActiveStandby_Call) Return(_a0 error) *MockSession_ForceActiveStandby_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() error) error) *MockSession_ForceActiveStandby_Call { + _c.Call.Return(run) + return _c +} + +// GetSessions provides a mock function with given fields: prefix +func (_m *MockSession) GetSessions(prefix string) (map[string]*Session, int64, error) { + ret := _m.Called(prefix) + + var r0 map[string]*Session + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(string) (map[string]*Session, int64, error)); ok { + return rf(prefix) + } + if rf, ok := ret.Get(0).(func(string) map[string]*Session); ok { + r0 = rf(prefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]*Session) + } + } + + if rf, ok := ret.Get(1).(func(string) int64); ok { + r1 = rf(prefix) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(prefix) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockSession_GetSessions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessions' +type MockSession_GetSessions_Call struct { + *mock.Call +} + +// GetSessions is a helper method to define mock.On call +// - prefix string +func (_e *MockSession_Expecter) GetSessions(prefix interface{}) *MockSession_GetSessions_Call { + return &MockSession_GetSessions_Call{Call: _e.mock.On("GetSessions", prefix)} +} + +func (_c *MockSession_GetSessions_Call) Run(run func(prefix string)) *MockSession_GetSessions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockSession_GetSessions_Call) Return(_a0 map[string]*Session, _a1 int64, _a2 error) *MockSession_GetSessions_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockSession_GetSessions_Call) RunAndReturn(run func(string) (map[string]*Session, int64, error)) *MockSession_GetSessions_Call { + _c.Call.Return(run) + return _c +} + +// GetSessionsWithVersionRange provides a mock function with given fields: prefix, r +func (_m *MockSession) GetSessionsWithVersionRange(prefix string, r semver.Range) (map[string]*Session, int64, error) { + ret := _m.Called(prefix, r) + + var r0 map[string]*Session + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(string, semver.Range) (map[string]*Session, int64, error)); ok { + return rf(prefix, r) + } + if rf, ok := ret.Get(0).(func(string, semver.Range) map[string]*Session); ok { + r0 = rf(prefix, r) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]*Session) + } + } + + if rf, ok := ret.Get(1).(func(string, semver.Range) int64); ok { + r1 = rf(prefix, r) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(string, semver.Range) error); ok { + r2 = rf(prefix, r) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockSession_GetSessionsWithVersionRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessionsWithVersionRange' +type MockSession_GetSessionsWithVersionRange_Call struct { + *mock.Call +} + +// GetSessionsWithVersionRange is a helper method to define mock.On call +// - prefix string +// - r semver.Range +func (_e *MockSession_Expecter) GetSessionsWithVersionRange(prefix interface{}, r interface{}) *MockSession_GetSessionsWithVersionRange_Call { + return &MockSession_GetSessionsWithVersionRange_Call{Call: _e.mock.On("GetSessionsWithVersionRange", prefix, r)} +} + +func (_c *MockSession_GetSessionsWithVersionRange_Call) Run(run func(prefix string, r semver.Range)) *MockSession_GetSessionsWithVersionRange_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(semver.Range)) + }) + return _c +} + +func (_c *MockSession_GetSessionsWithVersionRange_Call) Return(_a0 map[string]*Session, _a1 int64, _a2 error) *MockSession_GetSessionsWithVersionRange_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockSession_GetSessionsWithVersionRange_Call) RunAndReturn(run func(string, semver.Range) (map[string]*Session, int64, error)) *MockSession_GetSessionsWithVersionRange_Call { + _c.Call.Return(run) + return _c +} + +// GoingStop provides a mock function with given fields: +func (_m *MockSession) GoingStop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSession_GoingStop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GoingStop' +type MockSession_GoingStop_Call struct { + *mock.Call +} + +// GoingStop is a helper method to define mock.On call +func (_e *MockSession_Expecter) GoingStop() *MockSession_GoingStop_Call { + return &MockSession_GoingStop_Call{Call: _e.mock.On("GoingStop")} +} + +func (_c *MockSession_GoingStop_Call) Run(run func()) *MockSession_GoingStop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_GoingStop_Call) Return(_a0 error) *MockSession_GoingStop_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_GoingStop_Call) RunAndReturn(run func() error) *MockSession_GoingStop_Call { + _c.Call.Return(run) + return _c +} + +// Init provides a mock function with given fields: serverName, address, exclusive, triggerKill +func (_m *MockSession) Init(serverName string, address string, exclusive bool, triggerKill bool) { + _m.Called(serverName, address, exclusive, triggerKill) +} + +// MockSession_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init' +type MockSession_Init_Call struct { + *mock.Call +} + +// Init is a helper method to define mock.On call +// - serverName string +// - address string +// - exclusive bool +// - triggerKill bool +func (_e *MockSession_Expecter) Init(serverName interface{}, address interface{}, exclusive interface{}, triggerKill interface{}) *MockSession_Init_Call { + return &MockSession_Init_Call{Call: _e.mock.On("Init", serverName, address, exclusive, triggerKill)} +} + +func (_c *MockSession_Init_Call) Run(run func(serverName string, address string, exclusive bool, triggerKill bool)) *MockSession_Init_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].(bool), args[3].(bool)) + }) + return _c +} + +func (_c *MockSession_Init_Call) Return() *MockSession_Init_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_Init_Call) RunAndReturn(run func(string, string, bool, bool)) *MockSession_Init_Call { + _c.Call.Return(run) + return _c +} + +// LivenessCheck provides a mock function with given fields: ctx, callback +func (_m *MockSession) LivenessCheck(ctx context.Context, callback func()) { + _m.Called(ctx, callback) +} + +// MockSession_LivenessCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LivenessCheck' +type MockSession_LivenessCheck_Call struct { + *mock.Call +} + +// LivenessCheck is a helper method to define mock.On call +// - ctx context.Context +// - callback func() +func (_e *MockSession_Expecter) LivenessCheck(ctx interface{}, callback interface{}) *MockSession_LivenessCheck_Call { + return &MockSession_LivenessCheck_Call{Call: _e.mock.On("LivenessCheck", ctx, callback)} +} + +func (_c *MockSession_LivenessCheck_Call) Run(run func(ctx context.Context, callback func())) *MockSession_LivenessCheck_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(func())) + }) + return _c +} + +func (_c *MockSession_LivenessCheck_Call) Return() *MockSession_LivenessCheck_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_LivenessCheck_Call) RunAndReturn(run func(context.Context, func())) *MockSession_LivenessCheck_Call { + _c.Call.Return(run) + return _c +} + +// MarshalJSON provides a mock function with given fields: +func (_m *MockSession) MarshalJSON() ([]byte, error) { + ret := _m.Called() + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func() ([]byte, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockSession_MarshalJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarshalJSON' +type MockSession_MarshalJSON_Call struct { + *mock.Call +} + +// MarshalJSON is a helper method to define mock.On call +func (_e *MockSession_Expecter) MarshalJSON() *MockSession_MarshalJSON_Call { + return &MockSession_MarshalJSON_Call{Call: _e.mock.On("MarshalJSON")} +} + +func (_c *MockSession_MarshalJSON_Call) Run(run func()) *MockSession_MarshalJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_MarshalJSON_Call) Return(_a0 []byte, _a1 error) *MockSession_MarshalJSON_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockSession_MarshalJSON_Call) RunAndReturn(run func() ([]byte, error)) *MockSession_MarshalJSON_Call { + _c.Call.Return(run) + return _c +} + +// ProcessActiveStandBy provides a mock function with given fields: activateFunc +func (_m *MockSession) ProcessActiveStandBy(activateFunc func() error) error { + ret := _m.Called(activateFunc) + + var r0 error + if rf, ok := ret.Get(0).(func(func() error) error); ok { + r0 = rf(activateFunc) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSession_ProcessActiveStandBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessActiveStandBy' +type MockSession_ProcessActiveStandBy_Call struct { + *mock.Call +} + +// ProcessActiveStandBy is a helper method to define mock.On call +// - activateFunc func() error +func (_e *MockSession_Expecter) ProcessActiveStandBy(activateFunc interface{}) *MockSession_ProcessActiveStandBy_Call { + return &MockSession_ProcessActiveStandBy_Call{Call: _e.mock.On("ProcessActiveStandBy", activateFunc)} +} + +func (_c *MockSession_ProcessActiveStandBy_Call) Run(run func(activateFunc func() error)) *MockSession_ProcessActiveStandBy_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func() error)) + }) + return _c +} + +func (_c *MockSession_ProcessActiveStandBy_Call) Return(_a0 error) *MockSession_ProcessActiveStandBy_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_ProcessActiveStandBy_Call) RunAndReturn(run func(func() error) error) *MockSession_ProcessActiveStandBy_Call { + _c.Call.Return(run) + return _c +} + +// Register provides a mock function with given fields: +func (_m *MockSession) Register() { + _m.Called() +} + +// MockSession_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register' +type MockSession_Register_Call struct { + *mock.Call +} + +// Register is a helper method to define mock.On call +func (_e *MockSession_Expecter) Register() *MockSession_Register_Call { + return &MockSession_Register_Call{Call: _e.mock.On("Register")} +} + +func (_c *MockSession_Register_Call) Run(run func()) *MockSession_Register_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_Register_Call) Return() *MockSession_Register_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_Register_Call) RunAndReturn(run func()) *MockSession_Register_Call { + _c.Call.Return(run) + return _c +} + +// Registered provides a mock function with given fields: +func (_m *MockSession) Registered() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSession_Registered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Registered' +type MockSession_Registered_Call struct { + *mock.Call +} + +// Registered is a helper method to define mock.On call +func (_e *MockSession_Expecter) Registered() *MockSession_Registered_Call { + return &MockSession_Registered_Call{Call: _e.mock.On("Registered")} +} + +func (_c *MockSession_Registered_Call) Run(run func()) *MockSession_Registered_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_Registered_Call) Return(_a0 bool) *MockSession_Registered_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_Registered_Call) RunAndReturn(run func() bool) *MockSession_Registered_Call { + _c.Call.Return(run) + return _c +} + +// Revoke provides a mock function with given fields: timeout +func (_m *MockSession) Revoke(timeout time.Duration) { + _m.Called(timeout) +} + +// MockSession_Revoke_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Revoke' +type MockSession_Revoke_Call struct { + *mock.Call +} + +// Revoke is a helper method to define mock.On call +// - timeout time.Duration +func (_e *MockSession_Expecter) Revoke(timeout interface{}) *MockSession_Revoke_Call { + return &MockSession_Revoke_Call{Call: _e.mock.On("Revoke", timeout)} +} + +func (_c *MockSession_Revoke_Call) Run(run func(timeout time.Duration)) *MockSession_Revoke_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Duration)) + }) + return _c +} + +func (_c *MockSession_Revoke_Call) Return() *MockSession_Revoke_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_Revoke_Call) RunAndReturn(run func(time.Duration)) *MockSession_Revoke_Call { + _c.Call.Return(run) + return _c +} + +// SetDisconnected provides a mock function with given fields: b +func (_m *MockSession) SetDisconnected(b bool) { + _m.Called(b) +} + +// MockSession_SetDisconnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetDisconnected' +type MockSession_SetDisconnected_Call struct { + *mock.Call +} + +// SetDisconnected is a helper method to define mock.On call +// - b bool +func (_e *MockSession_Expecter) SetDisconnected(b interface{}) *MockSession_SetDisconnected_Call { + return &MockSession_SetDisconnected_Call{Call: _e.mock.On("SetDisconnected", b)} +} + +func (_c *MockSession_SetDisconnected_Call) Run(run func(b bool)) *MockSession_SetDisconnected_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(bool)) + }) + return _c +} + +func (_c *MockSession_SetDisconnected_Call) Return() *MockSession_SetDisconnected_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_SetDisconnected_Call) RunAndReturn(run func(bool)) *MockSession_SetDisconnected_Call { + _c.Call.Return(run) + return _c +} + +// SetEnableActiveStandBy provides a mock function with given fields: enable +func (_m *MockSession) SetEnableActiveStandBy(enable bool) { + _m.Called(enable) +} + +// MockSession_SetEnableActiveStandBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetEnableActiveStandBy' +type MockSession_SetEnableActiveStandBy_Call struct { + *mock.Call +} + +// SetEnableActiveStandBy is a helper method to define mock.On call +// - enable bool +func (_e *MockSession_Expecter) SetEnableActiveStandBy(enable interface{}) *MockSession_SetEnableActiveStandBy_Call { + return &MockSession_SetEnableActiveStandBy_Call{Call: _e.mock.On("SetEnableActiveStandBy", enable)} +} + +func (_c *MockSession_SetEnableActiveStandBy_Call) Run(run func(enable bool)) *MockSession_SetEnableActiveStandBy_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(bool)) + }) + return _c +} + +func (_c *MockSession_SetEnableActiveStandBy_Call) Return() *MockSession_SetEnableActiveStandBy_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_SetEnableActiveStandBy_Call) RunAndReturn(run func(bool)) *MockSession_SetEnableActiveStandBy_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockSession) Stop() { + _m.Called() +} + +// MockSession_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockSession_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockSession_Expecter) Stop() *MockSession_Stop_Call { + return &MockSession_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockSession_Stop_Call) Run(run func()) *MockSession_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_Stop_Call) Return() *MockSession_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_Stop_Call) RunAndReturn(run func()) *MockSession_Stop_Call { + _c.Call.Return(run) + return _c +} + +// String provides a mock function with given fields: +func (_m *MockSession) String() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockSession_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' +type MockSession_String_Call struct { + *mock.Call +} + +// String is a helper method to define mock.On call +func (_e *MockSession_Expecter) String() *MockSession_String_Call { + return &MockSession_String_Call{Call: _e.mock.On("String")} +} + +func (_c *MockSession_String_Call) Run(run func()) *MockSession_String_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_String_Call) Return(_a0 string) *MockSession_String_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_String_Call) RunAndReturn(run func() string) *MockSession_String_Call { + _c.Call.Return(run) + return _c +} + +// UnmarshalJSON provides a mock function with given fields: data +func (_m *MockSession) UnmarshalJSON(data []byte) error { + ret := _m.Called(data) + + var r0 error + if rf, ok := ret.Get(0).(func([]byte) error); ok { + r0 = rf(data) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSession_UnmarshalJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnmarshalJSON' +type MockSession_UnmarshalJSON_Call struct { + *mock.Call +} + +// UnmarshalJSON is a helper method to define mock.On call +// - data []byte +func (_e *MockSession_Expecter) UnmarshalJSON(data interface{}) *MockSession_UnmarshalJSON_Call { + return &MockSession_UnmarshalJSON_Call{Call: _e.mock.On("UnmarshalJSON", data)} +} + +func (_c *MockSession_UnmarshalJSON_Call) Run(run func(data []byte)) *MockSession_UnmarshalJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]byte)) + }) + return _c +} + +func (_c *MockSession_UnmarshalJSON_Call) Return(_a0 error) *MockSession_UnmarshalJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_UnmarshalJSON_Call) RunAndReturn(run func([]byte) error) *MockSession_UnmarshalJSON_Call { + _c.Call.Return(run) + return _c +} + +// UpdateRegistered provides a mock function with given fields: b +func (_m *MockSession) UpdateRegistered(b bool) { + _m.Called(b) +} + +// MockSession_UpdateRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateRegistered' +type MockSession_UpdateRegistered_Call struct { + *mock.Call +} + +// UpdateRegistered is a helper method to define mock.On call +// - b bool +func (_e *MockSession_Expecter) UpdateRegistered(b interface{}) *MockSession_UpdateRegistered_Call { + return &MockSession_UpdateRegistered_Call{Call: _e.mock.On("UpdateRegistered", b)} +} + +func (_c *MockSession_UpdateRegistered_Call) Run(run func(b bool)) *MockSession_UpdateRegistered_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(bool)) + }) + return _c +} + +func (_c *MockSession_UpdateRegistered_Call) Return() *MockSession_UpdateRegistered_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSession_UpdateRegistered_Call) RunAndReturn(run func(bool)) *MockSession_UpdateRegistered_Call { + _c.Call.Return(run) + return _c +} + +// WatchServices provides a mock function with given fields: prefix, revision, rewatch +func (_m *MockSession) WatchServices(prefix string, revision int64, rewatch Rewatch) <-chan *SessionEvent { + ret := _m.Called(prefix, revision, rewatch) + + var r0 <-chan *SessionEvent + if rf, ok := ret.Get(0).(func(string, int64, Rewatch) <-chan *SessionEvent); ok { + r0 = rf(prefix, revision, rewatch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *SessionEvent) + } + } + + return r0 +} + +// MockSession_WatchServices_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchServices' +type MockSession_WatchServices_Call struct { + *mock.Call +} + +// WatchServices is a helper method to define mock.On call +// - prefix string +// - revision int64 +// - rewatch Rewatch +func (_e *MockSession_Expecter) WatchServices(prefix interface{}, revision interface{}, rewatch interface{}) *MockSession_WatchServices_Call { + return &MockSession_WatchServices_Call{Call: _e.mock.On("WatchServices", prefix, revision, rewatch)} +} + +func (_c *MockSession_WatchServices_Call) Run(run func(prefix string, revision int64, rewatch Rewatch)) *MockSession_WatchServices_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int64), args[2].(Rewatch)) + }) + return _c +} + +func (_c *MockSession_WatchServices_Call) Return(eventChannel <-chan *SessionEvent) *MockSession_WatchServices_Call { + _c.Call.Return(eventChannel) + return _c +} + +func (_c *MockSession_WatchServices_Call) RunAndReturn(run func(string, int64, Rewatch) <-chan *SessionEvent) *MockSession_WatchServices_Call { + _c.Call.Return(run) + return _c +} + +// WatchServicesWithVersionRange provides a mock function with given fields: prefix, r, revision, rewatch +func (_m *MockSession) WatchServicesWithVersionRange(prefix string, r semver.Range, revision int64, rewatch Rewatch) <-chan *SessionEvent { + ret := _m.Called(prefix, r, revision, rewatch) + + var r0 <-chan *SessionEvent + if rf, ok := ret.Get(0).(func(string, semver.Range, int64, Rewatch) <-chan *SessionEvent); ok { + r0 = rf(prefix, r, revision, rewatch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *SessionEvent) + } + } + + return r0 +} + +// MockSession_WatchServicesWithVersionRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchServicesWithVersionRange' +type MockSession_WatchServicesWithVersionRange_Call struct { + *mock.Call +} + +// WatchServicesWithVersionRange is a helper method to define mock.On call +// - prefix string +// - r semver.Range +// - revision int64 +// - rewatch Rewatch +func (_e *MockSession_Expecter) WatchServicesWithVersionRange(prefix interface{}, r interface{}, revision interface{}, rewatch interface{}) *MockSession_WatchServicesWithVersionRange_Call { + return &MockSession_WatchServicesWithVersionRange_Call{Call: _e.mock.On("WatchServicesWithVersionRange", prefix, r, revision, rewatch)} +} + +func (_c *MockSession_WatchServicesWithVersionRange_Call) Run(run func(prefix string, r semver.Range, revision int64, rewatch Rewatch)) *MockSession_WatchServicesWithVersionRange_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(semver.Range), args[2].(int64), args[3].(Rewatch)) + }) + return _c +} + +func (_c *MockSession_WatchServicesWithVersionRange_Call) Return(eventChannel <-chan *SessionEvent) *MockSession_WatchServicesWithVersionRange_Call { + _c.Call.Return(eventChannel) + return _c +} + +func (_c *MockSession_WatchServicesWithVersionRange_Call) RunAndReturn(run func(string, semver.Range, int64, Rewatch) <-chan *SessionEvent) *MockSession_WatchServicesWithVersionRange_Call { + _c.Call.Return(run) + return _c +} + +// NewMockSession creates a new instance of MockSession. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockSession(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSession { + mock := &MockSession{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/util/sessionutil/session.go b/internal/util/sessionutil/session.go new file mode 100644 index 0000000000..4d676b673f --- /dev/null +++ b/internal/util/sessionutil/session.go @@ -0,0 +1,49 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package sessionutil + +import ( + "context" + "time" + + "github.com/blang/semver/v4" +) + +type SessionInterface interface { + UnmarshalJSON(data []byte) error + MarshalJSON() ([]byte, error) + + Init(serverName, address string, exclusive bool, triggerKill bool) + String() string + Register() + + GetSessions(prefix string) (map[string]*Session, int64, error) + GetSessionsWithVersionRange(prefix string, r semver.Range) (map[string]*Session, int64, error) + + GoingStop() error + WatchServices(prefix string, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent) + WatchServicesWithVersionRange(prefix string, r semver.Range, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent) + LivenessCheck(ctx context.Context, callback func()) + Stop() + Revoke(timeout time.Duration) + UpdateRegistered(b bool) + Registered() bool + SetDisconnected(b bool) + Disconnected() bool + SetEnableActiveStandBy(enable bool) + ProcessActiveStandBy(activateFunc func() error) error + ForceActiveStandby(activateFunc func() error) error +}