From 4845e4d67908c9e9bf9de9d980a05d0d66dbfe01 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 21 Nov 2024 21:35:29 +0800 Subject: [PATCH] enhance: [10kcp] Revert "enhance: remove the rpc level of coordinator (#37914) Signed-off-by: bigsheeper --- cmd/milvus/util.go | 8 +- go.mod | 2 - go.sum | 3 - internal/coordinator/coordclient/registry.go | 164 ------------------ .../coordinator/coordclient/registry_test.go | 74 -------- internal/datacoord/server.go | 4 +- internal/distributed/datacoord/service.go | 2 - internal/distributed/querycoord/service.go | 16 +- internal/distributed/rootcoord/service.go | 29 +++- .../distributed/rootcoord/service_test.go | 12 +- pkg/util/syncutil/future.go | 56 ------ pkg/util/syncutil/future_test.go | 51 ------ 12 files changed, 42 insertions(+), 379 deletions(-) delete mode 100644 internal/coordinator/coordclient/registry.go delete mode 100644 internal/coordinator/coordclient/registry_test.go delete mode 100644 pkg/util/syncutil/future.go delete mode 100644 pkg/util/syncutil/future_test.go diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index baa94db368..35068a6d32 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/cmd/roles" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -172,12 +171,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) } - coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ - ServerType: serverType, - EnableQueryCoord: role.EnableQueryCoord, - EnableDataCoord: role.EnableDataCoord, - EnableRootCoord: role.EnableRootCoord, - }) + return role } diff --git a/go.mod b/go.mod index 0425af6ff4..12891387bb 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,6 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7 require ( github.com/bits-and-blooms/bitset v1.10.0 github.com/bytedance/sonic v1.9.1 - github.com/fullstorydev/grpchan v1.1.1 github.com/greatroar/blobloom v0.8.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 @@ -140,7 +139,6 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect - github.com/jhump/protoreflect v1.12.0 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect diff --git a/go.sum b/go.sum index 814fe0c955..4fc1b2a749 100644 --- a/go.sum +++ b/go.sum @@ -265,8 +265,6 @@ github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas= -github.com/fullstorydev/grpchan v1.1.1/go.mod h1:f4HpiV8V6htfY/K44GWV1ESQzHBTq7DinhzqQ95lpgc= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= @@ -499,7 +497,6 @@ github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSl github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= -github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01HR10= github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM= diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go deleted file mode 100644 index 98737424c2..0000000000 --- a/internal/coordinator/coordclient/registry.go +++ /dev/null @@ -1,164 +0,0 @@ -package coordclient - -import ( - "context" - "fmt" - - "go.uber.org/zap" - - "github.com/fullstorydev/grpchan/inprocgrpc" - dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" - qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client" - rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/syncutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -// localClient is a client that can access local server directly -type localClient struct { - queryCoordClient *syncutil.Future[types.QueryCoordClient] - dataCoordClient *syncutil.Future[types.DataCoordClient] - rootCoordClient *syncutil.Future[types.RootCoordClient] -} - -var ( - enableLocal *LocalClientRoleConfig // a global map to store all can be local accessible roles. - glocalClient *localClient // !!! WARNING: local client will ignore all interceptor of grpc client and server. -) - -func init() { - enableLocal = &LocalClientRoleConfig{} - glocalClient = &localClient{ - queryCoordClient: syncutil.NewFuture[types.QueryCoordClient](), - dataCoordClient: syncutil.NewFuture[types.DataCoordClient](), - rootCoordClient: syncutil.NewFuture[types.RootCoordClient](), - } -} - -type LocalClientRoleConfig struct { - ServerType string - EnableQueryCoord bool - EnableDataCoord bool - EnableRootCoord bool -} - -// EnableLocalClientRole init localable roles -func EnableLocalClientRole(cfg *LocalClientRoleConfig) { - if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole { - return - } - enableLocal = cfg -} - -// RegisterQueryCoordServer register query coord server -func RegisterQueryCoordServer(server querypb.QueryCoordServer) { - if !enableLocal.EnableQueryCoord { - return - } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&querypb.QueryCoord_ServiceDesc, server) - newLocalClient := querypb.NewQueryCoordClient(channel) - glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient}) - log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal)) -} - -// RegsterDataCoordServer register data coord server -func RegisterDataCoordServer(server datapb.DataCoordServer) { - if !enableLocal.EnableDataCoord { - return - } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&datapb.DataCoord_ServiceDesc, server) - newLocalClient := datapb.NewDataCoordClient(channel) - glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient}) - log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal)) -} - -// RegisterRootCoordServer register root coord server -func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { - if !enableLocal.EnableRootCoord { - return - } - channel := &inprocgrpc.Channel{} - channel.RegisterService(&rootcoordpb.RootCoord_ServiceDesc, server) - newLocalClient := rootcoordpb.NewRootCoordClient(channel) - glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient}) - log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal)) -} - -// GetQueryCoordClient return query coord client -func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient { - var client types.QueryCoordClient - var err error - if enableLocal.EnableQueryCoord { - client, err = glocalClient.queryCoordClient.GetWithContext(ctx) - } else { - // TODO: we should make a singleton here. but most unittest rely on a dedicated client. - client, err = qcc.NewClient(ctx) - } - if err != nil { - panic(fmt.Sprintf("get query coord client failed: %v", err)) - } - return client -} - -// GetDataCoordClient return data coord client -func GetDataCoordClient(ctx context.Context) types.DataCoordClient { - var client types.DataCoordClient - var err error - if enableLocal.EnableDataCoord { - client, err = glocalClient.dataCoordClient.GetWithContext(ctx) - } else { - // TODO: we should make a singleton here. but most unittest rely on a dedicated client. - client, err = dcc.NewClient(ctx) - } - if err != nil { - panic(fmt.Sprintf("get data coord client failed: %v", err)) - } - return client -} - -// GetRootCoordClient return root coord client -func GetRootCoordClient(ctx context.Context) types.RootCoordClient { - var client types.RootCoordClient - var err error - if enableLocal.EnableRootCoord { - client, err = glocalClient.rootCoordClient.GetWithContext(ctx) - } else { - // TODO: we should make a singleton here. but most unittest rely on a dedicated client. - client, err = rcc.NewClient(ctx) - } - if err != nil { - panic(fmt.Sprintf("get root coord client failed: %v", err)) - } - return client -} - -type nopCloseQueryCoordClient struct { - querypb.QueryCoordClient -} - -func (n *nopCloseQueryCoordClient) Close() error { - return nil -} - -type nopCloseDataCoordClient struct { - datapb.DataCoordClient -} - -func (n *nopCloseDataCoordClient) Close() error { - return nil -} - -type nopCloseRootCoordClient struct { - rootcoordpb.RootCoordClient -} - -func (n *nopCloseRootCoordClient) Close() error { - return nil -} diff --git a/internal/coordinator/coordclient/registry_test.go b/internal/coordinator/coordclient/registry_test.go deleted file mode 100644 index 8ed97ac3d5..0000000000 --- a/internal/coordinator/coordclient/registry_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package coordclient - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -func TestRegistry(t *testing.T) { - assert.False(t, enableLocal.EnableQueryCoord) - assert.False(t, enableLocal.EnableDataCoord) - assert.False(t, enableLocal.EnableRootCoord) - - EnableLocalClientRole(&LocalClientRoleConfig{ - ServerType: typeutil.RootCoordRole, - EnableQueryCoord: true, - EnableDataCoord: true, - EnableRootCoord: true, - }) - assert.False(t, enableLocal.EnableQueryCoord) - assert.False(t, enableLocal.EnableDataCoord) - assert.False(t, enableLocal.EnableRootCoord) - - RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{}) - RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{}) - RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{}) - assert.False(t, glocalClient.dataCoordClient.Ready()) - assert.False(t, glocalClient.queryCoordClient.Ready()) - assert.False(t, glocalClient.rootCoordClient.Ready()) - - enableLocal = &LocalClientRoleConfig{} - - EnableLocalClientRole(&LocalClientRoleConfig{ - ServerType: typeutil.StandaloneRole, - EnableQueryCoord: true, - EnableDataCoord: true, - EnableRootCoord: true, - }) - assert.True(t, enableLocal.EnableDataCoord) - assert.True(t, enableLocal.EnableQueryCoord) - assert.True(t, enableLocal.EnableRootCoord) - - RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{}) - RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{}) - RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{}) - assert.True(t, glocalClient.dataCoordClient.Ready()) - assert.True(t, glocalClient.queryCoordClient.Ready()) - assert.True(t, glocalClient.rootCoordClient.Ready()) - - enableLocal = &LocalClientRoleConfig{} - - EnableLocalClientRole(&LocalClientRoleConfig{ - ServerType: typeutil.MixtureRole, - EnableQueryCoord: true, - EnableDataCoord: true, - EnableRootCoord: true, - }) - assert.True(t, enableLocal.EnableDataCoord) - assert.True(t, enableLocal.EnableQueryCoord) - assert.True(t, enableLocal.EnableRootCoord) - - assert.NotNil(t, GetQueryCoordClient(context.Background())) - assert.NotNil(t, GetDataCoordClient(context.Background())) - assert.NotNil(t, GetRootCoordClient(context.Background())) - GetQueryCoordClient(context.Background()).Close() - GetDataCoordClient(context.Background()).Close() - GetRootCoordClient(context.Background()).Close() -} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index fa59e01486..ff3b235edc 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -35,11 +35,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/datacoord/dataview" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" + rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -254,7 +254,7 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64) } func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) { - return coordclient.GetRootCoordClient(ctx), nil + return rootcoordclient.NewClient(ctx) } // QuitSignal returns signal when server quits diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index a3801c8d73..2cd78db79d 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" "github.com/milvus-io/milvus/internal/datacoord" "github.com/milvus-io/milvus/internal/distributed/utils" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -201,7 +200,6 @@ func (s *Server) startGrpcLoop() { grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) indexpb.RegisterIndexCoordServer(s.grpcServer, s) datapb.RegisterDataCoordServer(s.grpcServer, s) - coordclient.RegisterDataCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { s.grpcErrChan <- err diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 77d4cc2596..25b903c4ed 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -31,7 +31,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" + dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" + rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/distributed/utils" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -168,7 +169,11 @@ func (s *Server) init() error { // --- Master Server Client --- if s.rootCoord == nil { - s.rootCoord = coordclient.GetRootCoordClient(s.loopCtx) + s.rootCoord, err = rcc.NewClient(s.loopCtx) + if err != nil { + log.Error("QueryCoord try to new RootCoord client failed", zap.Error(err)) + panic(err) + } } // wait for master init or healthy @@ -186,7 +191,11 @@ func (s *Server) init() error { // --- Data service client --- if s.dataCoord == nil { - s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx) + s.dataCoord, err = dcc.NewClient(s.loopCtx) + if err != nil { + log.Error("QueryCoord try to new DataCoord client failed", zap.Error(err)) + panic(err) + } } log.Info("QueryCoord try to wait for DataCoord ready") @@ -249,7 +258,6 @@ func (s *Server) startGrpcLoop() { grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), ) querypb.RegisterQueryCoordServer(s.grpcServer, s) - coordclient.RegisterQueryCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index e99a81dd50..4f245d1b07 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -31,7 +31,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/coordinator/coordclient" + dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" + qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client" "github.com/milvus-io/milvus/internal/distributed/utils" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -71,8 +72,8 @@ type Server struct { dataCoord types.DataCoordClient queryCoord types.QueryCoordClient - newDataCoordClient func(ctx context.Context) types.DataCoordClient - newQueryCoordClient func(ctx context.Context) types.QueryCoordClient + newDataCoordClient func() types.DataCoordClient + newQueryCoordClient func() types.QueryCoordClient } func (s *Server) DescribeDatabase(ctx context.Context, request *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) { @@ -156,8 +157,21 @@ func (s *Server) Prepare() error { } func (s *Server) setClient() { - s.newDataCoordClient = coordclient.GetDataCoordClient - s.newQueryCoordClient = coordclient.GetQueryCoordClient + s.newDataCoordClient = func() types.DataCoordClient { + dsClient, err := dcc.NewClient(s.ctx) + if err != nil { + panic(err) + } + return dsClient + } + + s.newQueryCoordClient = func() types.QueryCoordClient { + qsClient, err := qcc.NewClient(s.ctx) + if err != nil { + panic(err) + } + return qsClient + } } // Run initializes and starts RootCoord's grpc service. @@ -220,7 +234,7 @@ func (s *Server) init() error { if s.newDataCoordClient != nil { log.Info("RootCoord start to create DataCoord client") - dataCoord := s.newDataCoordClient(s.ctx) + dataCoord := s.newDataCoordClient() s.dataCoord = dataCoord if err := s.rootCoord.SetDataCoordClient(dataCoord); err != nil { panic(err) @@ -229,7 +243,7 @@ func (s *Server) init() error { if s.newQueryCoordClient != nil { log.Info("RootCoord start to create QueryCoord client") - queryCoord := s.newQueryCoordClient(s.ctx) + queryCoord := s.newQueryCoordClient() s.queryCoord = queryCoord if err := s.rootCoord.SetQueryCoordClient(queryCoord); err != nil { panic(err) @@ -291,7 +305,6 @@ func (s *Server) startGrpcLoop() { )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) rootcoordpb.RegisterRootCoordServer(s.grpcServer, s) - coordclient.RegisterRootCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 5965b47883..43c9ba6ba0 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -142,13 +142,13 @@ func TestRun(t *testing.T) { mockDataCoord := mocks.NewMockDataCoordClient(t) mockDataCoord.EXPECT().Close().Return(nil) - svr.newDataCoordClient = func(_ context.Context) types.DataCoordClient { + svr.newDataCoordClient = func() types.DataCoordClient { return mockDataCoord } mockQueryCoord := mocks.NewMockQueryCoordClient(t) mockQueryCoord.EXPECT().Close().Return(nil) - svr.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient { + svr.newQueryCoordClient = func() types.QueryCoordClient { return mockQueryCoord } @@ -238,7 +238,7 @@ func TestServerRun_DataCoordClientInitErr(t *testing.T) { mockDataCoord := mocks.NewMockDataCoordClient(t) mockDataCoord.EXPECT().Close().Return(nil) - server.newDataCoordClient = func(_ context.Context) types.DataCoordClient { + server.newDataCoordClient = func() types.DataCoordClient { return mockDataCoord } err = server.Prepare() @@ -268,7 +268,7 @@ func TestServerRun_DataCoordClientStartErr(t *testing.T) { mockDataCoord := mocks.NewMockDataCoordClient(t) mockDataCoord.EXPECT().Close().Return(nil) - server.newDataCoordClient = func(_ context.Context) types.DataCoordClient { + server.newDataCoordClient = func() types.DataCoordClient { return mockDataCoord } err = server.Prepare() @@ -298,7 +298,7 @@ func TestServerRun_QueryCoordClientInitErr(t *testing.T) { mockQueryCoord := mocks.NewMockQueryCoordClient(t) mockQueryCoord.EXPECT().Close().Return(nil) - server.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient { + server.newQueryCoordClient = func() types.QueryCoordClient { return mockQueryCoord } err = server.Prepare() @@ -328,7 +328,7 @@ func TestServer_QueryCoordClientStartErr(t *testing.T) { mockQueryCoord := mocks.NewMockQueryCoordClient(t) mockQueryCoord.EXPECT().Close().Return(nil) - server.newQueryCoordClient = func(_ context.Context) types.QueryCoordClient { + server.newQueryCoordClient = func() types.QueryCoordClient { return mockQueryCoord } err = server.Prepare() diff --git a/pkg/util/syncutil/future.go b/pkg/util/syncutil/future.go deleted file mode 100644 index f13c40f58e..0000000000 --- a/pkg/util/syncutil/future.go +++ /dev/null @@ -1,56 +0,0 @@ -package syncutil - -import ( - "context" -) - -// Future is a future value that can be set and retrieved. -type Future[T any] struct { - ch chan struct{} - value T -} - -// NewFuture creates a new future. -func NewFuture[T any]() *Future[T] { - return &Future[T]{ - ch: make(chan struct{}), - } -} - -// Set sets the value of the future. -func (f *Future[T]) Set(value T) { - f.value = value - close(f.ch) -} - -// GetWithContext retrieves the value of the future if set, otherwise block until set or the context is done. -func (f *Future[T]) GetWithContext(ctx context.Context) (T, error) { - select { - case <-ctx.Done(): - var val T - return val, ctx.Err() - case <-f.ch: - return f.value, nil - } -} - -// Get retrieves the value of the future if set, otherwise block until set. -func (f *Future[T]) Get() T { - <-f.ch - return f.value -} - -// Done returns a channel that is closed when the future is set. -func (f *Future[T]) Done() <-chan struct{} { - return f.ch -} - -// Ready returns true if the future is set. -func (f *Future[T]) Ready() bool { - select { - case <-f.ch: - return true - default: - return false - } -} diff --git a/pkg/util/syncutil/future_test.go b/pkg/util/syncutil/future_test.go deleted file mode 100644 index 3e0c567789..0000000000 --- a/pkg/util/syncutil/future_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package syncutil - -import ( - "testing" - "time" -) - -func TestFuture_SetAndGet(t *testing.T) { - f := NewFuture[int]() - go func() { - time.Sleep(1 * time.Second) // Simulate some work - f.Set(42) - }() - - val := f.Get() - if val != 42 { - t.Errorf("Expected value 42, got %d", val) - } -} - -func TestFuture_Done(t *testing.T) { - f := NewFuture[string]() - go func() { - f.Set("done") - }() - - select { - case <-f.Done(): - // Success - case <-time.After(20 * time.Millisecond): - t.Error("Expected future to be done within 2 seconds") - } -} - -func TestFuture_Ready(t *testing.T) { - f := NewFuture[float64]() - go func() { - time.Sleep(20 * time.Millisecond) // Simulate some work - f.Set(3.14) - }() - - if f.Ready() { - t.Error("Expected future not to be ready immediately") - } - - <-f.Done() // Wait for the future to be set - - if !f.Ready() { - t.Error("Expected future to be ready after being set") - } -}