mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Merge get worker process for same nodeID (#25185)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
63b6a4a639
commit
1357ef7043
@ -18,10 +18,12 @@ package cluster
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,6 +38,7 @@ type WorkerBuilder func(nodeID int64) (Worker, error)
|
|||||||
type grpcWorkerManager struct {
|
type grpcWorkerManager struct {
|
||||||
workers *typeutil.ConcurrentMap[int64, Worker]
|
workers *typeutil.ConcurrentMap[int64, Worker]
|
||||||
builder WorkerBuilder
|
builder WorkerBuilder
|
||||||
|
sf conc.Singleflight[Worker] //singleflight.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetWorker returns worker with specified nodeID.
|
// GetWorker returns worker with specified nodeID.
|
||||||
@ -43,7 +46,7 @@ func (m *grpcWorkerManager) GetWorker(nodeID int64) (Worker, error) {
|
|||||||
worker, ok := m.workers.Get(nodeID)
|
worker, ok := m.workers.Get(nodeID)
|
||||||
var err error
|
var err error
|
||||||
if !ok {
|
if !ok {
|
||||||
//TODO merge request?
|
worker, err, _ = m.sf.Do(strconv.FormatInt(nodeID, 10), func() (Worker, error) {
|
||||||
worker, err = m.builder(nodeID)
|
worker, err = m.builder(nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to build worker",
|
log.Warn("failed to build worker",
|
||||||
@ -57,6 +60,11 @@ func (m *grpcWorkerManager) GetWorker(nodeID int64) (Worker, error) {
|
|||||||
worker.Stop()
|
worker.Stop()
|
||||||
worker = old
|
worker = old
|
||||||
}
|
}
|
||||||
|
return worker, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !worker.IsHealthy() {
|
if !worker.IsHealthy() {
|
||||||
// TODO wrap error
|
// TODO wrap error
|
||||||
|
|||||||
43
pkg/util/conc/singleflight.go
Normal file
43
pkg/util/conc/singleflight.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package conc
|
||||||
|
|
||||||
|
import "golang.org/x/sync/singleflight"
|
||||||
|
|
||||||
|
// Singleflight wraps golang.org/x/sync/singleflight.Group into generic one.
|
||||||
|
type Singleflight[T any] struct {
|
||||||
|
internal singleflight.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
// SingleflightResult is a generic Result wrapper for DoChan.
|
||||||
|
type SingleflightResult[T any] struct {
|
||||||
|
Val T
|
||||||
|
Err error
|
||||||
|
Shared bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sf *Singleflight[T]) Do(key string, fn func() (T, error)) (T, error, bool) {
|
||||||
|
raw, err, shared := sf.internal.Do(key, func() (any, error) {
|
||||||
|
return fn()
|
||||||
|
})
|
||||||
|
var t T
|
||||||
|
if raw != nil {
|
||||||
|
t = raw.(T)
|
||||||
|
}
|
||||||
|
return t, err, shared
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sf *Singleflight[T]) DoChan(key string, fn func() (T, error)) <-chan SingleflightResult[T] {
|
||||||
|
ch := make(chan SingleflightResult[T], 1)
|
||||||
|
go func() {
|
||||||
|
val, err, shared := sf.Do(key, fn)
|
||||||
|
ch <- SingleflightResult[T]{
|
||||||
|
Val: val,
|
||||||
|
Err: err,
|
||||||
|
Shared: shared,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sf *Singleflight[T]) Forget(key string) {
|
||||||
|
sf.internal.Forget(key)
|
||||||
|
}
|
||||||
106
pkg/util/conc/singleflight_test.go
Normal file
106
pkg/util/conc/singleflight_test.go
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
package conc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SingleflightSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SingleflightSuite) TestDo() {
|
||||||
|
counter, hasShared := atomic.Int32{}, atomic.Bool{}
|
||||||
|
|
||||||
|
sf := Singleflight[any]{}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
_, _, shared := sf.Do("test_do", func() (any, error) {
|
||||||
|
<-ch
|
||||||
|
counter.Add(1)
|
||||||
|
return struct{}{}, nil
|
||||||
|
})
|
||||||
|
if shared {
|
||||||
|
hasShared.Store(true)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
if hasShared.Load() {
|
||||||
|
s.Less(counter.Load(), int32(10))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SingleflightSuite) TestDoChan() {
|
||||||
|
counter, hasShared := atomic.Int32{}, atomic.Bool{}
|
||||||
|
|
||||||
|
sf := Singleflight[any]{}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
ch := sf.DoChan("test_dochan", func() (any, error) {
|
||||||
|
<-ch
|
||||||
|
counter.Add(1)
|
||||||
|
return struct{}{}, nil
|
||||||
|
})
|
||||||
|
result := <-ch
|
||||||
|
if result.Shared {
|
||||||
|
hasShared.Store(true)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
if hasShared.Load() {
|
||||||
|
s.Less(counter.Load(), int32(10))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SingleflightSuite) TestForget() {
|
||||||
|
counter, hasShared := atomic.Int32{}, atomic.Bool{}
|
||||||
|
|
||||||
|
sf := Singleflight[any]{}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
_, _, shared := sf.Do("test_forget", func() (any, error) {
|
||||||
|
<-ch
|
||||||
|
counter.Add(1)
|
||||||
|
return struct{}{}, nil
|
||||||
|
})
|
||||||
|
if shared {
|
||||||
|
hasShared.Store(true)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
flag := false
|
||||||
|
sf.Forget("test_forget")
|
||||||
|
sf.Do("test_forget", func() (any, error) {
|
||||||
|
flag = true
|
||||||
|
return struct{}{}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
s.True(flag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSingleFlight(t *testing.T) {
|
||||||
|
suite.Run(t, new(SingleflightSuite))
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user