diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index 9cd7e935d7..a1aec5d8a2 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -137,7 +137,10 @@ type Allocator struct { } func (ta *Allocator) Start() error { - err := ta.connectMaster() + connectMasterFn := func() error { + return ta.connectMaster() + } + err := Retry(10, time.Millisecond*200, connectMasterFn) if err != nil { panic("connect to master failed") } diff --git a/internal/allocator/retry.go b/internal/allocator/retry.go new file mode 100644 index 0000000000..89ab43cd00 --- /dev/null +++ b/internal/allocator/retry.go @@ -0,0 +1,40 @@ +package allocator + +import ( + "log" + "time" +) + +// Reference: https://blog.cyeam.com/golang/2018/08/27/retry + +func RetryImpl(attempts int, sleep time.Duration, fn func() error, maxSleepTime time.Duration) error { + if err := fn(); err != nil { + if s, ok := err.(InterruptError); ok { + return s.error + } + + if attempts--; attempts > 0 { + log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep) + time.Sleep(sleep) + if sleep < maxSleepTime { + return RetryImpl(attempts, 2*sleep, fn, maxSleepTime) + } + return RetryImpl(attempts, maxSleepTime, fn, maxSleepTime) + } + return err + } + return nil +} + +func Retry(attempts int, sleep time.Duration, fn func() error) error { + maxSleepTime := time.Millisecond * 1000 + return RetryImpl(attempts, sleep, fn, maxSleepTime) +} + +type InterruptError struct { + error +} + +func NoRetryError(err error) InterruptError { + return InterruptError{err} +} diff --git a/internal/indexbuilder/indexbuilder.go b/internal/indexbuilder/indexbuilder.go index 4acfffc3d2..712d17ef80 100644 --- a/internal/indexbuilder/indexbuilder.go +++ b/internal/indexbuilder/indexbuilder.go @@ -54,34 +54,48 @@ func CreateBuilder(ctx context.Context) (*Builder, error) { loopCancel: cancel, } - etcdAddress := Params.EtcdAddress - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + connectEtcdFn := func() error { + etcdAddress := Params.EtcdAddress + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) + if err != nil { + return err + } + etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + metakv, err := NewMetaTable(etcdKV) + if err != nil { + return err + } + b.metaTable = metakv + return nil + } + err := Retry(10, time.Millisecond*200, connectEtcdFn) if err != nil { return nil, err } - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - metakv, err := NewMetaTable(etcdKV) - if err != nil { - return nil, err - } - b.metaTable = metakv idAllocator, err := allocator.NewIDAllocator(b.loopCtx, Params.MasterAddress) + b.idAllocator = idAllocator - option := &miniokv.Option{ - Address: Params.MinIOAddress, - AccessKeyID: Params.MinIOAccessKeyID, - SecretAccessKeyID: Params.MinIOSecretAccessKey, - UseSSL: Params.MinIOUseSSL, - BucketName: Params.MinioBucketName, - CreateBucket: true, + connectMinIOFn := func() error { + option := &miniokv.Option{ + Address: Params.MinIOAddress, + AccessKeyID: Params.MinIOAccessKeyID, + SecretAccessKeyID: Params.MinIOSecretAccessKey, + UseSSL: Params.MinIOUseSSL, + BucketName: Params.MinioBucketName, + CreateBucket: true, + } + + b.kv, err = miniokv.NewMinIOKV(b.loopCtx, option) + if err != nil { + return err + } + return nil } - - b.kv, err = miniokv.NewMinIOKV(b.loopCtx, option) + err = Retry(10, time.Millisecond*200, connectMinIOFn) if err != nil { return nil, err } - b.idAllocator = idAllocator b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator, b.kv, b.metaTable) if err != nil { diff --git a/internal/indexbuilder/retry.go b/internal/indexbuilder/retry.go new file mode 100644 index 0000000000..2cf4c6ecf5 --- /dev/null +++ b/internal/indexbuilder/retry.go @@ -0,0 +1,40 @@ +package indexbuilder + +import ( + "log" + "time" +) + +// Reference: https://blog.cyeam.com/golang/2018/08/27/retry + +func RetryImpl(attempts int, sleep time.Duration, fn func() error, maxSleepTime time.Duration) error { + if err := fn(); err != nil { + if s, ok := err.(InterruptError); ok { + return s.error + } + + if attempts--; attempts > 0 { + log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep) + time.Sleep(sleep) + if sleep < maxSleepTime { + return RetryImpl(attempts, 2*sleep, fn, maxSleepTime) + } + return RetryImpl(attempts, maxSleepTime, fn, maxSleepTime) + } + return err + } + return nil +} + +func Retry(attempts int, sleep time.Duration, fn func() error) error { + maxSleepTime := time.Millisecond * 1000 + return RetryImpl(attempts, sleep, fn, maxSleepTime) +} + +type InterruptError struct { + error +} + +func NoRetryError(err error) InterruptError { + return InterruptError{err} +}