diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e01759a0a6..0b9c081b8c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -70,6 +70,10 @@ indexServer: address: localhost port: 31000 +dataService: + address: localhost + port: 13333 + dataNode: address: localhost port: 21124 diff --git a/internal/distributed/proxyservice/client.go b/internal/distributed/proxyservice/client.go index d53b9c83f2..94c29241ad 100644 --- a/internal/distributed/proxyservice/client.go +++ b/internal/distributed/proxyservice/client.go @@ -3,10 +3,6 @@ package grpcproxyservice import ( "context" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" @@ -48,18 +44,6 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe return err } -func (c *Client) GetTimeTickChannel() (string, error) { - response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{}) - if err != nil { - return "", err - } - return response.Value, nil -} - -func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { - return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{}) -} - func NewClient(address string) *Client { return &Client{ address: address, diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index dee90840b5..69fe84766a 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -6,11 +6,15 @@ import ( "net" "strconv" "sync" + "time" "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + masterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/queryservice" @@ -28,21 +32,20 @@ type Server struct { } func (s *Server) Init() error { - log.Println() - initParams := queryservice.InitParams{ - Distributed: true, - } - s.InitParams(&initParams) + log.Println("query service init") s.queryService.Init() + s.queryService.SetEnableGrpc(true) return nil } -func (s *Server) InitParams(params *queryservice.InitParams) { - s.queryService.InitParams(params) -} - func (s *Server) Start() error { - s.Init() + masterServiceClient, err := masterservice.NewGrpcClient(queryservice.Params.MasterServiceAddress, 30*time.Second) + if err != nil { + return err + } + s.queryService.SetMasterService(masterServiceClient) + dataServiceClient := dataservice.NewClient(queryservice.Params.DataServiceAddress) + s.queryService.SetDataService(dataServiceClient) log.Println("start query service ...") s.loopWg.Add(1) go s.grpcLoop() @@ -60,26 +63,10 @@ func (s *Server) Stop() error { return nil } -//func (s *Server) SetDataService(p querynode.DataServiceInterface) error { -// c, ok := s.queryService -// if !ok { -// return errors.Errorf("set data service failed") -// } -// return c.SetDataService(p) -//} -// -//func (s *Server) SetIndexService(p querynode.IndexServiceInterface) error { -// c, ok := s.core.(*cms.Core) -// if !ok { -// return errors.Errorf("set index service failed") -// } -// return c.SetIndexService(p) -//} - -func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*querypb.ComponentStatesResponse, error) { +func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) { componentStates, err := s.queryService.GetComponentStates() if err != nil { - return &querypb.ComponentStatesResponse{ + return &internalpb2.ComponentStates{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), @@ -87,13 +74,7 @@ func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (* }, err } - return &querypb.ComponentStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - Reason: "", - }, - States: componentStates, - }, nil + return componentStates, nil } func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) { @@ -174,13 +155,13 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (* func (s *Server) NewServer(ctx context.Context) *Server { ctx1, cancel := context.WithCancel(ctx) - serviceInterface, err := queryservice.NewQueryService(ctx) + service, err := queryservice.NewQueryService(ctx) if err != nil { log.Fatal(errors.New("create QueryService failed")) } return &Server{ - queryService: serviceInterface.(*QueryService), + queryService: service, loopCtx: ctx1, loopCancel: cancel, } diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto index fe81bb44c9..285b1665c4 100644 --- a/internal/proto/query_service.proto +++ b/internal/proto/query_service.proto @@ -156,7 +156,7 @@ service QueryService { rpc GetTimeTickChannel(common.Empty) returns (milvus.StringResponse) {} rpc GetStatisticsChannel(common.Empty) returns (milvus.StringResponse) {} rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {} - rpc GetComponentStates(common.Empty) returns (ComponentStatesResponse) {} + rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {} } service QueryNode { diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go index 11df73707c..60ab104e76 100644 --- a/internal/proto/querypb/query_service.pb.go +++ b/internal/proto/querypb/query_service.pb.go @@ -1211,81 +1211,81 @@ func init() { func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } var fileDescriptor_5fcb6756dc1afb8d = []byte{ - // 1169 bytes of a gzipped FileDescriptorProto + // 1174 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x6e, 0x1b, 0x45, - 0x14, 0xce, 0xc6, 0x4e, 0x5a, 0x9f, 0xb8, 0xb6, 0x3b, 0xf9, 0xd5, 0x82, 0xaa, 0x32, 0x40, 0x9b, - 0x1f, 0x70, 0x50, 0x2a, 0x21, 0xae, 0x40, 0x49, 0x5c, 0x45, 0x96, 0x68, 0x08, 0xeb, 0x94, 0x8a, + 0x14, 0xf6, 0xc6, 0x4e, 0x5a, 0x9f, 0xb8, 0xb6, 0x3b, 0xf9, 0xd5, 0x82, 0xaa, 0x32, 0x40, 0x9b, + 0x1f, 0x70, 0x50, 0x2a, 0x21, 0xae, 0x40, 0x49, 0x5c, 0x45, 0x96, 0x68, 0x08, 0x9b, 0x94, 0x8a, 0x40, 0x65, 0xd6, 0xbb, 0x83, 0x3d, 0xed, 0xfe, 0xb8, 0x3b, 0xe3, 0x40, 0x72, 0x03, 0x48, 0x5c, - 0xc3, 0x33, 0x20, 0x10, 0x48, 0x88, 0x4b, 0xde, 0x86, 0x2b, 0x1e, 0x81, 0x37, 0x40, 0x3b, 0xbb, - 0xde, 0xec, 0xac, 0xc7, 0xb1, 0x1b, 0x37, 0x4d, 0xef, 0x76, 0x66, 0xcf, 0x39, 0xdf, 0x77, 0xce, - 0xcc, 0x9c, 0xf9, 0x06, 0xe6, 0x9f, 0xf5, 0x48, 0x70, 0xd2, 0x64, 0x24, 0x38, 0xa6, 0x16, 0xa9, - 0x76, 0x03, 0x9f, 0xfb, 0x08, 0xb9, 0xd4, 0x39, 0xee, 0xb1, 0x68, 0x54, 0x15, 0x16, 0x7a, 0xd1, - 0xf2, 0x5d, 0xd7, 0xf7, 0xa2, 0x39, 0xbd, 0x98, 0xb6, 0xd0, 0x4b, 0xd4, 0xe3, 0x24, 0xf0, 0x4c, - 0x27, 0x1e, 0x23, 0xdb, 0xe4, 0xa6, 0x1c, 0x13, 0x7f, 0x07, 0xf3, 0x06, 0x69, 0x53, 0xc6, 0x49, - 0xb0, 0xef, 0xdb, 0xc4, 0x20, 0xcf, 0x7a, 0x84, 0x71, 0xf4, 0x1e, 0xe4, 0x5b, 0x26, 0x23, 0x2b, - 0xda, 0x6d, 0x6d, 0x75, 0x6e, 0xeb, 0xf5, 0xaa, 0x84, 0x1c, 0x43, 0x3e, 0x60, 0xed, 0x1d, 0x93, - 0x11, 0x43, 0x58, 0xa2, 0xf7, 0xe1, 0x9a, 0x69, 0xdb, 0x01, 0x61, 0x6c, 0x65, 0xfa, 0x1c, 0xa7, - 0xed, 0xc8, 0xc6, 0xe8, 0x1b, 0xe3, 0x9f, 0x35, 0x58, 0x90, 0x19, 0xb0, 0xae, 0xef, 0x31, 0x82, - 0xee, 0xc1, 0x2c, 0xe3, 0x26, 0xef, 0xb1, 0x98, 0xc4, 0x6b, 0xca, 0x78, 0x0d, 0x61, 0x62, 0xc4, - 0xa6, 0x68, 0x07, 0xe6, 0xa8, 0x47, 0x79, 0xb3, 0x6b, 0x06, 0xa6, 0xdb, 0x67, 0xf2, 0x86, 0xec, - 0x99, 0x54, 0xa5, 0xee, 0x51, 0x7e, 0x20, 0x0c, 0x0d, 0xa0, 0xc9, 0x37, 0x7e, 0x0c, 0x8b, 0x8d, - 0x8e, 0xff, 0xcd, 0xae, 0xef, 0x38, 0xc4, 0xe2, 0xd4, 0xf7, 0x2e, 0x5e, 0x14, 0x04, 0x79, 0xbb, - 0x55, 0xaf, 0x09, 0x1e, 0x39, 0x43, 0x7c, 0x63, 0x06, 0x4b, 0xd9, 0xf0, 0x93, 0x64, 0xfc, 0x16, - 0xdc, 0xb0, 0x92, 0x50, 0xf5, 0x5a, 0x98, 0x73, 0x6e, 0x35, 0x67, 0xc8, 0x93, 0xf8, 0x07, 0x0d, - 0x16, 0x3f, 0xf6, 0x4d, 0xfb, 0x92, 0x92, 0x42, 0x18, 0x8a, 0x69, 0xc0, 0x95, 0x9c, 0xf8, 0x27, - 0xcd, 0xe1, 0x1f, 0x35, 0x58, 0x31, 0x88, 0x43, 0x4c, 0x46, 0xae, 0x92, 0xc6, 0xf7, 0x1a, 0x2c, - 0x84, 0x0b, 0x70, 0x60, 0x06, 0x9c, 0x5e, 0x0d, 0x85, 0x6e, 0xb4, 0xc3, 0x52, 0x0c, 0x26, 0xd9, - 0x01, 0x18, 0x8a, 0xdd, 0x7e, 0xa4, 0xb3, 0x0d, 0x20, 0xcd, 0x61, 0x17, 0xca, 0x09, 0x5a, 0xe8, - 0x4e, 0x18, 0xba, 0x0d, 0x73, 0x29, 0x13, 0x01, 0x98, 0x33, 0xd2, 0x53, 0xe8, 0x03, 0x98, 0x09, - 0x21, 0x88, 0xc8, 0xaf, 0xb4, 0x85, 0xab, 0x83, 0xfd, 0xa7, 0x2a, 0x47, 0x35, 0x22, 0x07, 0xfc, - 0xbb, 0x06, 0x4b, 0x19, 0xbc, 0x97, 0x5e, 0xe5, 0x81, 0xba, 0xe4, 0x15, 0x75, 0xf9, 0x53, 0x83, - 0xe5, 0x01, 0xa2, 0x93, 0x2c, 0xc6, 0x11, 0x2c, 0x25, 0x00, 0x4d, 0x9b, 0x30, 0x2b, 0xa0, 0xdd, - 0xf0, 0x3b, 0x5a, 0x96, 0xb9, 0xad, 0x37, 0x47, 0x17, 0x91, 0x19, 0x8b, 0x49, 0x88, 0x5a, 0x2a, - 0x02, 0xfe, 0x4d, 0x83, 0x85, 0xf0, 0x10, 0x5f, 0xdd, 0xce, 0x1d, 0xab, 0xa6, 0x7f, 0x68, 0xb0, - 0x1c, 0x9f, 0xf3, 0x57, 0x9c, 0xe9, 0x2f, 0x1a, 0xe8, 0xbb, 0x01, 0x31, 0x39, 0xf9, 0x34, 0x5c, - 0x87, 0xdd, 0x8e, 0xe9, 0x79, 0xc4, 0x99, 0x6c, 0x03, 0xdc, 0x85, 0x72, 0x10, 0x25, 0xdb, 0xb4, - 0xa2, 0x78, 0x82, 0x7a, 0xc1, 0x28, 0xc5, 0xd3, 0x31, 0x0a, 0x7a, 0x1b, 0x4a, 0x01, 0x61, 0x3d, - 0xe7, 0xcc, 0x2e, 0x27, 0xec, 0x6e, 0x44, 0xb3, 0xb1, 0x19, 0xfe, 0x55, 0x83, 0xe5, 0x6d, 0xdb, - 0x4e, 0x13, 0x9c, 0xe0, 0x2c, 0x6d, 0xc0, 0xcd, 0x0c, 0xbb, 0xb8, 0xb4, 0x05, 0xa3, 0x22, 0xf3, - 0xab, 0xd7, 0xd0, 0x1a, 0x54, 0x64, 0x86, 0x71, 0xa9, 0x0b, 0x46, 0x59, 0xe2, 0x58, 0xaf, 0xe1, - 0x7f, 0x34, 0xd0, 0x0d, 0xe2, 0xfa, 0xc7, 0x44, 0x49, 0xf4, 0x42, 0x95, 0xec, 0x67, 0x37, 0x3d, - 0x59, 0x76, 0xb9, 0xe7, 0xc8, 0x2e, 0xaf, 0xce, 0xee, 0x09, 0x2c, 0x3d, 0x32, 0xb9, 0xd5, 0xa9, - 0xb9, 0x93, 0xaf, 0xc0, 0x2d, 0x80, 0x04, 0x2f, 0x6a, 0x0a, 0x05, 0x23, 0x35, 0x83, 0xff, 0x9e, - 0x06, 0x14, 0x1e, 0xf2, 0x06, 0x69, 0xbb, 0xc4, 0xe3, 0x2f, 0xff, 0xe0, 0x64, 0xee, 0x85, 0xfc, - 0xe0, 0xbd, 0x70, 0x0b, 0x80, 0x45, 0xec, 0xc2, 0x14, 0x66, 0xc4, 0xc1, 0x4a, 0xcd, 0x20, 0x1d, - 0xae, 0x7f, 0x4d, 0x89, 0x63, 0x87, 0x7f, 0x67, 0xc5, 0xdf, 0x64, 0x8c, 0x3e, 0x03, 0xe4, 0x98, - 0x8c, 0x37, 0x63, 0xf3, 0x66, 0x74, 0xc1, 0x5c, 0x13, 0x59, 0xad, 0xca, 0x59, 0x85, 0x6a, 0xb5, - 0x1a, 0x97, 0x41, 0x6e, 0xcd, 0x46, 0x25, 0x8c, 0x91, 0xfe, 0x85, 0xff, 0xd5, 0x60, 0x31, 0x6e, - 0x3a, 0x57, 0x56, 0xb9, 0x31, 0x5a, 0xce, 0x24, 0xb5, 0xc3, 0x3f, 0x69, 0xb0, 0xbc, 0xeb, 0xbb, - 0x5d, 0xdf, 0x1b, 0xa8, 0xc8, 0xc5, 0x4e, 0xd8, 0x87, 0x91, 0x13, 0xe9, 0x0b, 0xe5, 0x3b, 0x43, - 0x84, 0x72, 0x16, 0x34, 0xf6, 0x5a, 0x3f, 0x85, 0x92, 0x7c, 0x75, 0xa1, 0x22, 0x5c, 0xdf, 0xf7, - 0xf9, 0xfd, 0x6f, 0x29, 0xe3, 0x95, 0x29, 0x54, 0x02, 0xd8, 0xf7, 0xf9, 0x41, 0x40, 0x18, 0xf1, - 0x78, 0x45, 0x43, 0x00, 0xb3, 0x9f, 0x78, 0x35, 0xca, 0x9e, 0x56, 0xa6, 0xd1, 0x7c, 0xac, 0x48, - 0x4c, 0xa7, 0xee, 0x3d, 0x20, 0xae, 0x1f, 0x9c, 0x54, 0x72, 0xa1, 0x7b, 0x32, 0xca, 0xa3, 0x0a, - 0x14, 0x13, 0x93, 0xbd, 0x83, 0x87, 0x95, 0x19, 0x54, 0x80, 0x99, 0xe8, 0x73, 0x76, 0xeb, 0xaf, - 0x02, 0x14, 0x45, 0xaf, 0x69, 0x44, 0xef, 0x19, 0x64, 0x41, 0x31, 0xfd, 0x8e, 0x40, 0x77, 0x55, - 0x37, 0xad, 0xe2, 0xad, 0xa3, 0xaf, 0x8e, 0x36, 0x8c, 0x8a, 0x8c, 0xa7, 0xd0, 0x13, 0x28, 0xcb, - 0xe2, 0x9d, 0xa1, 0x35, 0x95, 0xbb, 0xf2, 0x01, 0xa1, 0xaf, 0x8f, 0x63, 0x9a, 0x60, 0xb5, 0xa1, - 0x24, 0xa9, 0x44, 0x86, 0x56, 0x87, 0xf9, 0x67, 0xef, 0x59, 0x7d, 0x6d, 0x0c, 0xcb, 0x04, 0xe8, - 0x73, 0x28, 0x49, 0xb2, 0x62, 0x08, 0x90, 0x4a, 0x7a, 0xe8, 0xe7, 0xed, 0x33, 0x3c, 0x85, 0x9a, - 0x70, 0x33, 0x2b, 0x05, 0x18, 0xda, 0x50, 0x17, 0x5c, 0xa9, 0x18, 0x46, 0x01, 0x1c, 0x45, 0xdc, - 0xcf, 0x0a, 0xa8, 0x5e, 0x0f, 0xe5, 0xdb, 0x67, 0x54, 0xec, 0xaf, 0x12, 0xf2, 0xa9, 0xf0, 0xef, - 0x9c, 0x43, 0xfe, 0xb9, 0x11, 0x5a, 0x80, 0x06, 0xf5, 0x07, 0xd2, 0x95, 0x4e, 0xf7, 0xdd, 0x2e, - 0x3f, 0xd1, 0xab, 0x2a, 0xf8, 0xe1, 0x1a, 0x06, 0x4f, 0xa1, 0x47, 0x80, 0xf6, 0x08, 0x3f, 0xa4, - 0x2e, 0x39, 0xa4, 0xd6, 0xd3, 0x71, 0x30, 0x32, 0x1a, 0x35, 0x1e, 0x34, 0x78, 0x40, 0xbd, 0xb6, - 0xb4, 0x6d, 0x16, 0xf6, 0x88, 0x68, 0x09, 0x94, 0x71, 0x6a, 0xb1, 0x17, 0x18, 0xda, 0x17, 0x9c, - 0xb3, 0x2f, 0x96, 0xf5, 0x71, 0xb4, 0x73, 0x5c, 0xf8, 0x8d, 0xb1, 0x6c, 0x13, 0xc0, 0xa6, 0x00, - 0xcc, 0xf4, 0xb9, 0x73, 0x33, 0x51, 0x02, 0x0c, 0xe9, 0xce, 0x78, 0x6a, 0xeb, 0xbf, 0x19, 0x28, - 0x88, 0x05, 0x12, 0xbd, 0xe9, 0xd2, 0xd6, 0xe4, 0x10, 0xca, 0xf1, 0x9a, 0xbc, 0xc8, 0xe5, 0xb8, - 0xec, 0xea, 0xa0, 0xc7, 0x50, 0xce, 0x68, 0x5c, 0x75, 0x93, 0x18, 0x22, 0x84, 0x47, 0x1d, 0x33, - 0x0b, 0xd0, 0xa0, 0x38, 0x45, 0x55, 0xf5, 0x49, 0x1e, 0x26, 0x62, 0x47, 0x81, 0x7c, 0x09, 0xe5, - 0x8c, 0x48, 0x54, 0x6f, 0x58, 0xb5, 0x92, 0x1c, 0x15, 0xfd, 0x21, 0x14, 0x53, 0xaa, 0x90, 0xa1, - 0x3b, 0xc3, 0xba, 0x9c, 0xac, 0x7e, 0x46, 0x85, 0xfd, 0x02, 0xca, 0xb2, 0x6a, 0x1a, 0x72, 0x9f, - 0x29, 0xa5, 0xd5, 0x88, 0xe0, 0x3b, 0xdb, 0x47, 0x1f, 0xb5, 0x29, 0xef, 0xf4, 0x5a, 0xe1, 0x9f, - 0xcd, 0x53, 0xea, 0x38, 0xf4, 0x94, 0x13, 0xab, 0xb3, 0x19, 0x79, 0xbd, 0x6b, 0x53, 0xc6, 0x03, - 0xda, 0xea, 0x71, 0x62, 0x6f, 0xf6, 0xb5, 0xc6, 0xa6, 0x08, 0xb5, 0x29, 0x50, 0xbb, 0xad, 0xd6, - 0xac, 0x18, 0xde, 0xfb, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x6d, 0xcd, 0x13, 0x8d, 0x10, 0x15, 0x00, - 0x00, + 0xc3, 0x33, 0x20, 0x10, 0x95, 0xb8, 0xe6, 0x6d, 0xb8, 0xe2, 0x11, 0x78, 0x03, 0xb4, 0xb3, 0xeb, + 0xcd, 0xce, 0x7a, 0x1c, 0x3b, 0x71, 0xdb, 0xf4, 0x6e, 0x67, 0xf6, 0x9c, 0xf3, 0x7d, 0xe7, 0xcc, + 0xcc, 0x99, 0x6f, 0x60, 0xee, 0x59, 0x8f, 0x04, 0x27, 0x4d, 0x46, 0x82, 0x63, 0x6a, 0x91, 0x5a, + 0x37, 0xf0, 0xb9, 0x8f, 0x90, 0x4b, 0x9d, 0xe3, 0x1e, 0x8b, 0x46, 0x35, 0x61, 0xa1, 0x97, 0x2c, + 0xdf, 0x75, 0x7d, 0x2f, 0x9a, 0xd3, 0x4b, 0x69, 0x0b, 0xbd, 0x4c, 0x3d, 0x4e, 0x02, 0xcf, 0x74, + 0xe2, 0x31, 0xb2, 0x4d, 0x6e, 0xca, 0x31, 0xf1, 0x0f, 0x30, 0x67, 0x90, 0x36, 0x65, 0x9c, 0x04, + 0x7b, 0xbe, 0x4d, 0x0c, 0xf2, 0xac, 0x47, 0x18, 0x47, 0x1f, 0x40, 0xa1, 0x65, 0x32, 0xb2, 0xac, + 0xdd, 0xd6, 0x56, 0x66, 0x37, 0xdf, 0xac, 0x49, 0xc8, 0x31, 0xe4, 0x03, 0xd6, 0xde, 0x36, 0x19, + 0x31, 0x84, 0x25, 0xfa, 0x10, 0xae, 0x99, 0xb6, 0x1d, 0x10, 0xc6, 0x96, 0xa7, 0xce, 0x71, 0xda, + 0x8a, 0x6c, 0x8c, 0xbe, 0x31, 0xfe, 0x55, 0x83, 0x79, 0x99, 0x01, 0xeb, 0xfa, 0x1e, 0x23, 0xe8, + 0x1e, 0xcc, 0x30, 0x6e, 0xf2, 0x1e, 0x8b, 0x49, 0xbc, 0xa1, 0x8c, 0x77, 0x20, 0x4c, 0x8c, 0xd8, + 0x14, 0x6d, 0xc3, 0x2c, 0xf5, 0x28, 0x6f, 0x76, 0xcd, 0xc0, 0x74, 0xfb, 0x4c, 0xde, 0x92, 0x3d, + 0x93, 0xaa, 0x34, 0x3c, 0xca, 0xf7, 0x85, 0xa1, 0x01, 0x34, 0xf9, 0xc6, 0x8f, 0x61, 0xe1, 0xa0, + 0xe3, 0x7f, 0xb7, 0xe3, 0x3b, 0x0e, 0xb1, 0x38, 0xf5, 0xbd, 0xcb, 0x17, 0x05, 0x41, 0xc1, 0x6e, + 0x35, 0xea, 0x82, 0x47, 0xde, 0x10, 0xdf, 0x98, 0xc1, 0x62, 0x36, 0xfc, 0x24, 0x19, 0xbf, 0x03, + 0x37, 0xac, 0x24, 0x54, 0xa3, 0x1e, 0xe6, 0x9c, 0x5f, 0xc9, 0x1b, 0xf2, 0x24, 0xfe, 0x49, 0x83, + 0x85, 0x4f, 0x7d, 0xd3, 0x7e, 0x49, 0x49, 0x21, 0x0c, 0xa5, 0x34, 0xe0, 0x72, 0x5e, 0xfc, 0x93, + 0xe6, 0xf0, 0xcf, 0x1a, 0x2c, 0x1b, 0xc4, 0x21, 0x26, 0x23, 0x57, 0x49, 0xe3, 0x47, 0x0d, 0xe6, + 0xc3, 0x05, 0xd8, 0x37, 0x03, 0x4e, 0xaf, 0x86, 0x42, 0x37, 0xda, 0x61, 0x29, 0x06, 0x93, 0xec, + 0x00, 0x0c, 0xa5, 0x6e, 0x3f, 0xd2, 0xd9, 0x06, 0x90, 0xe6, 0xb0, 0x0b, 0x95, 0x04, 0x2d, 0x74, + 0x27, 0x0c, 0xdd, 0x86, 0xd9, 0x94, 0x89, 0x00, 0xcc, 0x1b, 0xe9, 0x29, 0xf4, 0x11, 0x4c, 0x87, + 0x10, 0x44, 0xe4, 0x57, 0xde, 0xc4, 0xb5, 0xc1, 0xfe, 0x53, 0x93, 0xa3, 0x1a, 0x91, 0x03, 0xfe, + 0x53, 0x83, 0xc5, 0x0c, 0xde, 0x2b, 0xaf, 0xf2, 0x40, 0x5d, 0x0a, 0x8a, 0xba, 0xfc, 0xa5, 0xc1, + 0xd2, 0x00, 0xd1, 0x49, 0x16, 0xe3, 0x08, 0x16, 0x13, 0x80, 0xa6, 0x4d, 0x98, 0x15, 0xd0, 0x6e, + 0xf8, 0x1d, 0x2d, 0xcb, 0xec, 0xe6, 0xdb, 0xa3, 0x8b, 0xc8, 0x8c, 0x85, 0x24, 0x44, 0x3d, 0x15, + 0x01, 0xff, 0xa1, 0xc1, 0x7c, 0x78, 0x88, 0xaf, 0x6e, 0xe7, 0x8e, 0x55, 0xd3, 0xe7, 0x1a, 0x2c, + 0xc5, 0xe7, 0xfc, 0x35, 0x67, 0xfa, 0x9b, 0x06, 0xfa, 0x4e, 0x40, 0x4c, 0x4e, 0x3e, 0x0f, 0xd7, + 0x61, 0xa7, 0x63, 0x7a, 0x1e, 0x71, 0x26, 0xdb, 0x00, 0x77, 0xa1, 0x12, 0x44, 0xc9, 0x36, 0xad, + 0x28, 0x9e, 0xa0, 0x5e, 0x34, 0xca, 0xf1, 0x74, 0x8c, 0x82, 0xde, 0x85, 0x72, 0x40, 0x58, 0xcf, + 0x39, 0xb3, 0xcb, 0x0b, 0xbb, 0x1b, 0xd1, 0x6c, 0x6c, 0x86, 0x7f, 0xd7, 0x60, 0x69, 0xcb, 0xb6, + 0xd3, 0x04, 0x27, 0x38, 0x4b, 0xeb, 0x70, 0x33, 0xc3, 0x2e, 0x2e, 0x6d, 0xd1, 0xa8, 0xca, 0xfc, + 0x1a, 0x75, 0xb4, 0x0a, 0x55, 0x99, 0x61, 0x5c, 0xea, 0xa2, 0x51, 0x91, 0x38, 0x36, 0xea, 0xf8, + 0x1f, 0x0d, 0x74, 0x83, 0xb8, 0xfe, 0x31, 0x51, 0x12, 0xbd, 0x54, 0x25, 0xfb, 0xd9, 0x4d, 0x4d, + 0x96, 0x5d, 0xfe, 0x02, 0xd9, 0x15, 0xd4, 0xd9, 0x3d, 0x81, 0xc5, 0x47, 0x26, 0xb7, 0x3a, 0x75, + 0x77, 0xf2, 0x15, 0xb8, 0x05, 0x90, 0xe0, 0x45, 0x4d, 0xa1, 0x68, 0xa4, 0x66, 0xf0, 0xdf, 0x53, + 0x80, 0xc2, 0x43, 0x7e, 0x40, 0xda, 0x2e, 0xf1, 0xf8, 0xab, 0x3f, 0x38, 0x99, 0x7b, 0xa1, 0x30, + 0x78, 0x2f, 0xdc, 0x02, 0x60, 0x11, 0xbb, 0x30, 0x85, 0x69, 0x71, 0xb0, 0x52, 0x33, 0x48, 0x87, + 0xeb, 0xdf, 0x52, 0xe2, 0xd8, 0xe1, 0xdf, 0x19, 0xf1, 0x37, 0x19, 0xa3, 0x2f, 0x00, 0x39, 0x26, + 0xe3, 0xcd, 0xd8, 0xbc, 0x19, 0x5d, 0x30, 0xd7, 0x44, 0x56, 0x2b, 0x72, 0x56, 0xa1, 0x5a, 0xad, + 0xc5, 0x65, 0x90, 0x5b, 0xb3, 0x51, 0x0d, 0x63, 0xa4, 0x7f, 0xe1, 0x7f, 0x35, 0x58, 0x88, 0x9b, + 0xce, 0x95, 0x55, 0x6e, 0x8c, 0x96, 0x33, 0x49, 0xed, 0xf0, 0x2f, 0x1a, 0x2c, 0xed, 0xf8, 0x6e, + 0xd7, 0xf7, 0x06, 0x2a, 0x72, 0xb9, 0x13, 0xf6, 0x71, 0xe4, 0x44, 0xfa, 0x42, 0xf9, 0xce, 0x10, + 0xa1, 0x9c, 0x05, 0x8d, 0xbd, 0xd6, 0x4e, 0xa1, 0x2c, 0x5f, 0x5d, 0xa8, 0x04, 0xd7, 0xf7, 0x7c, + 0x7e, 0xff, 0x7b, 0xca, 0x78, 0x35, 0x87, 0xca, 0x00, 0x7b, 0x3e, 0xdf, 0x0f, 0x08, 0x23, 0x1e, + 0xaf, 0x6a, 0x08, 0x60, 0xe6, 0x33, 0xaf, 0x4e, 0xd9, 0xd3, 0xea, 0x14, 0x9a, 0x8b, 0x15, 0x89, + 0xe9, 0x34, 0xbc, 0x07, 0xc4, 0xf5, 0x83, 0x93, 0x6a, 0x3e, 0x74, 0x4f, 0x46, 0x05, 0x54, 0x85, + 0x52, 0x62, 0xb2, 0xbb, 0xff, 0xb0, 0x3a, 0x8d, 0x8a, 0x30, 0x1d, 0x7d, 0xce, 0x6c, 0x3e, 0x2f, + 0x42, 0x49, 0xf4, 0x9a, 0x83, 0xe8, 0x3d, 0x83, 0x2c, 0x28, 0xa5, 0xdf, 0x11, 0xe8, 0xae, 0xea, + 0xa6, 0x55, 0xbc, 0x75, 0xf4, 0x95, 0xd1, 0x86, 0x51, 0x91, 0x71, 0x0e, 0x3d, 0x81, 0x8a, 0x2c, + 0xde, 0x19, 0x5a, 0x55, 0xb9, 0x2b, 0x1f, 0x10, 0xfa, 0xda, 0x38, 0xa6, 0x09, 0x56, 0x1b, 0xca, + 0x92, 0x4a, 0x64, 0x68, 0x65, 0x98, 0x7f, 0xf6, 0x9e, 0xd5, 0x57, 0xc7, 0xb0, 0x4c, 0x80, 0xbe, + 0x84, 0xb2, 0x24, 0x2b, 0x86, 0x00, 0xa9, 0xa4, 0x87, 0x7e, 0xde, 0x3e, 0xc3, 0x39, 0xd4, 0x84, + 0x9b, 0x59, 0x29, 0xc0, 0xd0, 0xba, 0xba, 0xe0, 0x4a, 0xc5, 0x30, 0x0a, 0xe0, 0x28, 0xe2, 0x7e, + 0x56, 0x40, 0xf5, 0x7a, 0x28, 0xdf, 0x3e, 0xa3, 0x62, 0x7f, 0x93, 0x90, 0x4f, 0x85, 0x7f, 0xef, + 0x1c, 0xf2, 0x17, 0x46, 0x68, 0x01, 0x1a, 0xd4, 0x1f, 0x48, 0x57, 0x3a, 0xdd, 0x77, 0xbb, 0xfc, + 0x44, 0xaf, 0xa9, 0xe0, 0x87, 0x6b, 0x18, 0x9c, 0x43, 0x8f, 0x00, 0xed, 0x12, 0x7e, 0x48, 0x5d, + 0x72, 0x48, 0xad, 0xa7, 0xe3, 0x60, 0x64, 0x34, 0x6a, 0x3c, 0x38, 0xe0, 0x01, 0xf5, 0xda, 0xd2, + 0xb6, 0x99, 0xdf, 0x25, 0xa2, 0x25, 0x50, 0xc6, 0xa9, 0xc5, 0x5e, 0x60, 0x68, 0x5f, 0x70, 0xce, + 0xbe, 0x58, 0xd6, 0xc6, 0xd1, 0xce, 0x71, 0xe1, 0xd7, 0xc7, 0xb2, 0x4d, 0x00, 0x8f, 0x04, 0x60, + 0xa6, 0xcf, 0x9d, 0x9b, 0xc9, 0x98, 0xbd, 0x12, 0xe7, 0x36, 0xff, 0x9b, 0x86, 0xa2, 0x58, 0x1b, + 0xd1, 0x96, 0x5e, 0xda, 0x72, 0x1c, 0x42, 0x25, 0x5e, 0x8e, 0x17, 0xb9, 0x12, 0xcd, 0x0b, 0x17, + 0x46, 0x59, 0xf9, 0x21, 0xd7, 0x16, 0xce, 0xa1, 0xc7, 0x50, 0xc9, 0xc8, 0x5b, 0x75, 0x7f, 0x18, + 0xa2, 0x81, 0x47, 0x9d, 0x30, 0x0b, 0xd0, 0xa0, 0x2e, 0x45, 0x35, 0xf5, 0x21, 0x1e, 0xa6, 0x5f, + 0x47, 0x81, 0x7c, 0x0d, 0x95, 0x8c, 0x3e, 0x54, 0xef, 0x55, 0xb5, 0x88, 0x1c, 0x15, 0xfd, 0x21, + 0x94, 0x52, 0x82, 0x90, 0xa1, 0x3b, 0xc3, 0x1a, 0x9c, 0x2c, 0x7c, 0x46, 0x85, 0xfd, 0x0a, 0x2a, + 0xb2, 0x60, 0x1a, 0x72, 0x95, 0x29, 0x55, 0xd5, 0x88, 0xe0, 0xdb, 0x5b, 0x47, 0x9f, 0xb4, 0x29, + 0xef, 0xf4, 0x5a, 0xe1, 0x9f, 0x8d, 0x53, 0xea, 0x38, 0xf4, 0x94, 0x13, 0xab, 0xb3, 0x11, 0x79, + 0xbd, 0x6f, 0x53, 0xc6, 0x03, 0xda, 0xea, 0x71, 0x62, 0x6f, 0xf4, 0x8f, 0xce, 0x86, 0x08, 0xb5, + 0x21, 0x50, 0xbb, 0xad, 0xd6, 0x8c, 0x18, 0xde, 0xfb, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x3a, 0x16, + 0x3b, 0xb4, 0x0b, 0x15, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1311,7 +1311,7 @@ type QueryServiceClient interface { GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetPartitionStates(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) - GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*ComponentStatesResponse, error) + GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error) } type queryServiceClient struct { @@ -1421,8 +1421,8 @@ func (c *queryServiceClient) GetPartitionStates(ctx context.Context, in *Partiti return out, nil } -func (c *queryServiceClient) GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*ComponentStatesResponse, error) { - out := new(ComponentStatesResponse) +func (c *queryServiceClient) GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error) { + out := new(internalpb2.ComponentStates) err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetComponentStates", in, out, opts...) if err != nil { return nil, err @@ -1443,7 +1443,7 @@ type QueryServiceServer interface { GetTimeTickChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error) GetPartitionStates(context.Context, *PartitionStatesRequest) (*PartitionStatesResponse, error) - GetComponentStates(context.Context, *commonpb.Empty) (*ComponentStatesResponse, error) + GetComponentStates(context.Context, *commonpb.Empty) (*internalpb2.ComponentStates, error) } // UnimplementedQueryServiceServer can be embedded to have forward compatible implementations. @@ -1483,7 +1483,7 @@ func (*UnimplementedQueryServiceServer) GetStatisticsChannel(ctx context.Context func (*UnimplementedQueryServiceServer) GetPartitionStates(ctx context.Context, req *PartitionStatesRequest) (*PartitionStatesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetPartitionStates not implemented") } -func (*UnimplementedQueryServiceServer) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*ComponentStatesResponse, error) { +func (*UnimplementedQueryServiceServer) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) { return nil, status.Errorf(codes.Unimplemented, "method GetComponentStates not implemented") } diff --git a/internal/queryservice/collection.go b/internal/queryservice/collection.go deleted file mode 100644 index a61d7fa842..0000000000 --- a/internal/queryservice/collection.go +++ /dev/null @@ -1,6 +0,0 @@ -package queryservice - -type collection struct { - id UniqueID - partitions []*partition -} diff --git a/internal/queryservice/collection_replica.go b/internal/queryservice/collection_replica.go deleted file mode 100644 index e479992897..0000000000 --- a/internal/queryservice/collection_replica.go +++ /dev/null @@ -1,43 +0,0 @@ -package queryservice - -import "github.com/zilliztech/milvus-distributed/internal/errors" - -type metaReplica interface { - getCollectionIDs(id UniqueID) ([]UniqueID, error) - getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) -} - -type metaReplicaImpl struct { - dbID []UniqueID - db2collections map[UniqueID][]*collection -} - -func (mp *metaReplicaImpl) getCollectionIDs(dbID UniqueID) ([]UniqueID, error) { - if collections, ok := mp.db2collections[dbID]; ok { - collectionIDs := make([]UniqueID, 0) - for _, collection := range collections { - collectionIDs = append(collectionIDs, collection.id) - } - return collectionIDs, nil - } - - return nil, errors.New("can't find collection in queryService") -} - -func (mp *metaReplicaImpl) getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) { - if collections, ok := mp.db2collections[dbID]; ok { - - for _, collection := range collections { - if collectionID == collection.id { - partitions := collection.partitions - partitionIDs := make([]UniqueID, 0) - for _, partition := range partitions { - partitionIDs = append(partitionIDs, partition.id) - } - return partitionIDs, nil - } - } - } - - return nil, errors.New("can't find partitions in queryService") -} diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go new file mode 100644 index 0000000000..0e2cb9fa72 --- /dev/null +++ b/internal/queryservice/meta_replica.go @@ -0,0 +1,221 @@ +package queryservice + +import ( + "log" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +type metaReplica interface { + getCollectionIDs(dbID UniqueID) ([]UniqueID, error) + getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) + getCollection(dbID UniqueID, collectionID UniqueID) *collection + getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error) + loadCollection(dbID UniqueID, collectionID UniqueID) + loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) + updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) + getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) +} + +type segment struct { + id UniqueID +} + +type partition struct { + id UniqueID + segments map[UniqueID]*segment + state querypb.PartitionState +} + +type collection struct { + id UniqueID + partitions map[UniqueID]*partition + node2channel map[int][]string +} + +type metaReplicaImpl struct { + dbID []UniqueID + db2collections map[UniqueID][]*collection +} + +func newMetaReplica() metaReplica { + db2collections := make(map[UniqueID][]*collection) + db2collections[0] = make([]*collection, 0) + dbIDs := make([]UniqueID, 0) + dbIDs = append(dbIDs, UniqueID(0)) + return &metaReplicaImpl{ + dbID: dbIDs, + db2collections: db2collections, + } +} + +func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) { + partitions := make(map[UniqueID]*partition) + node2channel := make(map[int][]string) + newCollection := &collection{ + id: collectionID, + partitions: partitions, + node2channel: node2channel, + } + mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) +} + +func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) { + collections := mp.db2collections[dbID] + for _, collection := range collections { + if collection.id == collectionID { + partitions := collection.partitions + segments := make(map[UniqueID]*segment) + partitions[partitionID] = &partition{ + id: partitionID, + state: querypb.PartitionState_NotPresent, + segments: segments, + } + return + } + } + log.Fatal("can't find collection when add partition") +} + +func (mp *metaReplicaImpl) getCollection(dbID UniqueID, collectionID UniqueID) *collection { + for _, id := range mp.dbID { + if id == dbID { + collections := mp.db2collections[id] + for _, collection := range collections { + if collection.id == collectionID { + return collection + } + } + return nil + } + } + return nil +} + +func (mp *metaReplicaImpl) getCollectionIDs(dbID UniqueID) ([]UniqueID, error) { + if collections, ok := mp.db2collections[dbID]; ok { + collectionIDs := make([]UniqueID, 0) + for _, collection := range collections { + collectionIDs = append(collectionIDs, collection.id) + } + return collectionIDs, nil + } + + return nil, errors.New("can't find collection in queryService") +} + +func (mp *metaReplicaImpl) getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) { + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + partitions := collection.partitions + partitionIDs := make([]UniqueID, 0) + for _, partition := range partitions { + partitionIDs = append(partitionIDs, partition.id) + } + return partitionIDs, nil + } + } + } + + return nil, errors.New("can't find partitions in queryService") +} + +func (mp *metaReplicaImpl) getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error) { + segmentIDs := make([]UniqueID, 0) + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + if partition, ok := collection.partitions[partitionID]; ok { + for _, segment := range partition.segments { + segmentIDs = append(segmentIDs, segment.id) + } + } + } + } + } + return segmentIDs, nil +} + +func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) { + collectionIDs, err := mp.getCollectionIDs(dbID) + if err != nil { + mp.addCollection(dbID, collectionID) + return + } + for _, id := range collectionIDs { + if collectionID == id { + return + } + } + mp.addCollection(dbID, collectionID) +} + +func (mp *metaReplicaImpl) loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) { + var collection *collection = nil + for _, col := range mp.db2collections[dbID] { + if col.id == collectionID { + collection = col + } + } + if collection == nil { + mp.addCollection(dbID, collectionID) + for _, col := range mp.db2collections[dbID] { + if col.id == collectionID { + collection = col + } + } + } + for _, partitionID := range partitionIDs { + match := false + for _, partition := range collection.partitions { + if partition.id == partitionID { + match = true + continue + } + } + if !match { + mp.addPartition(dbID, collectionID, partitionID) + } + } +} + +func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, + collectionID UniqueID, + partitionID UniqueID, + state querypb.PartitionState) { + for _, collection := range mp.db2collections[dbID] { + if collection.id == collectionID { + if partition, ok := collection.partitions[partitionID]; ok { + partition.state = state + return + } + } + } + log.Fatal("update partition state fail") +} + +func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, + collectionID UniqueID, + partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) { + partitionStates := make([]*querypb.PartitionStates, 0) + for _, collection := range mp.db2collections[dbID] { + if collection.id == collectionID { + for _, partitionID := range partitionIDs { + if partition, ok := collection.partitions[partitionID]; ok { + partitionStates = append(partitionStates, &querypb.PartitionStates{ + PartitionID: partitionID, + State: partition.state, + }) + } else { + partitionStates = append(partitionStates, &querypb.PartitionStates{ + PartitionID: partitionID, + State: querypb.PartitionState_NotPresent, + }) + } + } + } + } + return partitionStates, nil +} diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index fc9bd9aef9..6238f5430e 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -14,11 +14,11 @@ type UniqueID = typeutil.UniqueID type ParamTable struct { paramtable.BaseTable - Distributed bool - Address string - Port int + Address string + Port int MasterServiceAddress string + DataServiceAddress string PulsarAddress string ETCDAddress string @@ -27,7 +27,7 @@ type ParamTable struct { QueryServiceID UniqueID QueryNodeID UniqueID - QueryNodeNum int + QueryNodeNum uint64 FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 @@ -160,7 +160,6 @@ func (p *ParamTable) Init() { p.initAddress() p.initPort() p.initMasterServiceAddress() - p.initIsDistributed() } func (p *ParamTable) initMinioEndPoint() { @@ -223,6 +222,14 @@ func (p *ParamTable) initMasterServiceAddress() { p.MasterServiceAddress = url } +func (p *ParamTable) initDataServiceAddress() { + url, err := p.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + p.DataServiceAddress = url +} + func (p *ParamTable) initQueryNodeID() { queryNodeID, err := p.Load("_queryNodeID") if err != nil { @@ -348,7 +355,7 @@ func (p *ParamTable) initInsertChannelNames() { for _, ID := range channelIDs { ret = append(ret, prefix+strconv.Itoa(ID)) } - sep := len(channelIDs) / p.QueryNodeNum + sep := len(channelIDs) / int(p.QueryNodeNum) index := p.SliceIndex if index == -1 { panic("queryNodeID not Match with Config") @@ -458,7 +465,7 @@ func (p *ParamTable) initSliceIndex() { } func (p *ParamTable) initQueryNodeNum() { - p.QueryNodeNum = len(p.QueryNodeIDList()) + p.QueryNodeNum = uint64(len(p.QueryNodeIDList())) } func (p *ParamTable) initLoadIndexChannelNames() { @@ -490,10 +497,6 @@ func (p *ParamTable) initTimeTickReceiveBufSize() { p.TimeTickReceiveBufferSize = p.ParseInt64("queryNode.msgStream.timeTick.recvBufSize") } -func (p *ParamTable) initIsDistributed() { - p.Distributed = false -} - func (p *ParamTable) initAddress() { url, err := p.Load("_QueryServiceAddress") if err != nil { diff --git a/internal/queryservice/partition.go b/internal/queryservice/partition.go deleted file mode 100644 index de86698ee6..0000000000 --- a/internal/queryservice/partition.go +++ /dev/null @@ -1,6 +0,0 @@ -package queryservice - -type partition struct { - id UniqueID - segments []*segment -} diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go new file mode 100644 index 0000000000..776636ee63 --- /dev/null +++ b/internal/queryservice/querynode.go @@ -0,0 +1,22 @@ +package queryservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +type queryNode struct { + client QueryNodeInterface + insertChannels string + nodeID uint64 + segments []UniqueID +} + +func (qn *queryNode) GetComponentStates() (*internalpb2.ComponentStates, error) { + return qn.client.GetComponentStates() +} + +func (qn *queryNode) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { + return qn.client.LoadSegments(in) +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 04ec0a4242..a57a186627 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -3,67 +3,64 @@ package queryservice import ( "context" "log" + "sort" "strconv" "sync/atomic" - "time" - "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - grpcquerynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" + nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/querynode" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -type Interface = typeutil.QueryServiceInterface +type MasterServiceInterface interface { + ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) + ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) +} + +type DataServiceInterface interface { + GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) +} + +type QueryNodeInterface interface { + GetComponentStates() (*internalpb2.ComponentStates, error) + + AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) + RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) + WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) + LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) + ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) +} type QueryService struct { loopCtx context.Context loopCancel context.CancelFunc - QueryServiceID uint64 + queryServiceID uint64 replica metaReplica - // type(masterServiceClient) should be interface - // masterServiceClient masterService.Service - masterServiceClient *masterservice.GrpcClient - queryNodeClient map[int]querynode.Node - numQueryNode int - numQueryChannel int + dataServiceClient DataServiceInterface + masterServiceClient MasterServiceInterface + queryNodes []*queryNode + //TODO:: nodeID use UniqueID + numRegisterNode uint64 + numQueryChannel uint64 - stateCode atomic.Value - isInit atomic.Value + stateCode atomic.Value + isInit atomic.Value + enableGrpc bool } -type InitParams struct { - Distributed bool -} - -//serverBase interface func (qs *QueryService) Init() error { - if Params.Distributed { - var err error - //TODO:: alter 2*second - qs.masterServiceClient, err = masterservice.NewGrpcClient(Params.MasterServiceAddress, 2*time.Second) - if err != nil { - return err - - } - } else { - //TODO:: create masterService.Core{} - log.Fatal(errors.New("should not use grpc client")) - } - + Params.Init() qs.isInit.Store(true) return nil } -func (qs *QueryService) InitParams(params *InitParams) { - Params.Distributed = params.Distributed -} - func (qs *QueryService) Start() error { isInit := qs.isInit.Load().(bool) if !isInit { @@ -79,29 +76,14 @@ func (qs *QueryService) Stop() error { return nil } -//func (qs *QueryService) SetDataService(p querynode.DataServiceInterface) error { -// for k, v := range qs.queryNodeClient { -// v.Set -// } -// return c.SetDataService(p) -//} -// -//func (qs *QueryService) SetIndexService(p querynode.IndexServiceInterface) error { -// c, ok := s.core.(*cms.Core) -// if !ok { -// return errors.Errorf("set index service failed") -// } -// return c.SetIndexService(p) -//} - func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, StateCode: qs.stateCode.Load().(internalpb2.StateCode), } subComponentInfos := make([]*internalpb2.ComponentInfo, 0) - for nodeID, nodeClient := range qs.queryNodeClient { - componentStates, err := nodeClient.GetComponentStates() + for nodeID, node := range qs.queryNodes { + componentStates, err := node.GetComponentStates() if err != nil { subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{ NodeID: int64(nodeID), @@ -112,6 +94,9 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro subComponentInfos = append(subComponentInfos, componentStates.State) } return &internalpb2.ComponentStates{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, State: serviceComponentInfo, SubcomponentStates: subComponentInfos, }, nil @@ -125,18 +110,31 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { return Params.StatsChannelName, nil } +// TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { - allocatedID := qs.numQueryNode - qs.numQueryNode++ + allocatedID := qs.numRegisterNode + qs.numRegisterNode++ + + if allocatedID > Params.QueryNodeNum { + log.Fatal("allocated queryNodeID should lower than Params.QueryNodeNum") + } registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) - var client querynode.Node - if Params.Distributed { - client = grpcquerynodeclient.NewClient(registerNodeAddress) + var node *queryNode + if qs.enableGrpc { + client := nodeclient.NewClient(registerNodeAddress) + node = &queryNode{ + client: client, + nodeID: allocatedID, + } } else { - log.Fatal(errors.New("should be queryNodeImpl.QueryNode")) + client := querynode.NewQueryNode(qs.loopCtx, allocatedID) + node = &queryNode{ + client: client, + nodeID: allocatedID, + } } - qs.queryNodeClient[allocatedID] = client + qs.queryNodes[allocatedID] = node return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ @@ -163,11 +161,63 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu } func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { - panic("implement me") + dbID := req.DbID + collectionID := req.CollectionID + qs.replica.loadCollection(dbID, collectionID) + + fn := func(err error) *commonpb.Status { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + } + + // get partitionIDs + showPartitionRequest := &milvuspb.ShowPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowPartitions, + }, + CollectionID: collectionID, + } + + showPartitionResponse, err := qs.masterServiceClient.ShowPartitions(showPartitionRequest) + if err != nil { + return fn(err), err + } + if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Fatal("show partition fail, v%", showPartitionResponse.Status.Reason) + } + partitionIDs := showPartitionResponse.PartitionIDs + + loadPartitionsRequest := &querypb.LoadPartitionRequest{ + Base: req.Base, + DbID: dbID, + CollectionID: collectionID, + PartitionIDs: partitionIDs, + } + + status, err := qs.LoadPartitions(loadPartitionsRequest) + + return status, err } func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { - panic("implement me") + dbID := req.DbID + collectionID := req.CollectionID + partitionsIDs, err := qs.replica.getPartitionIDs(dbID, collectionID) + if err != nil { + log.Fatal("get partition ids error") + } + releasePartitionRequest := &querypb.ReleasePartitionRequest{ + Base: req.Base, + DbID: dbID, + CollectionID: collectionID, + PartitionIDs: partitionsIDs, + } + + status, err := qs.ReleasePartitions(releasePartitionRequest) + + return status, err } func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) { @@ -187,11 +237,142 @@ func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*quer } func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*commonpb.Status, error) { - panic("implement me") + dbID := req.DbID + collectionID := req.CollectionID + partitionIDs := req.PartitionIDs + qs.replica.loadPartitions(dbID, collectionID, partitionIDs) + fn := func(err error) *commonpb.Status { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + } + + // get segments and load segment to query node + for _, partitionID := range partitionIDs { + showSegmentRequest := &milvuspb.ShowSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowSegment, + }, + CollectionID: collectionID, + PartitionID: partitionID, + } + showSegmentResponse, err := qs.masterServiceClient.ShowSegments(showSegmentRequest) + if err != nil { + return fn(err), err + } + if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason) + } + segmentIDs := showSegmentResponse.SegmentIDs + segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) + channel2id := make(map[string]int) + id2channels := make(map[int][]string) + id2segs := make(map[int][]UniqueID) + offset := 0 + + for _, segmentID := range segmentIDs { + state, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{ + SegmentID: segmentID, + }) + if err != nil { + log.Fatal("get segment states fail") + } + segmentStates[segmentID] = state + var flatChannelName string + channelNames := make([]string, 0) + for i, str := range state.StartPositions { + flatChannelName += str.ChannelName + channelNames = append(channelNames, str.ChannelName) + if i < len(state.StartPositions) { + flatChannelName += "/" + } + } + if _, ok := channel2id[flatChannelName]; !ok { + channel2id[flatChannelName] = offset + id2channels[offset] = channelNames + id2segs[offset] = make([]UniqueID, 0) + id2segs[offset] = append(id2segs[offset], segmentID) + offset++ + } else { + //TODO::check channel name + id := channel2id[flatChannelName] + id2segs[id] = append(id2segs[id], segmentID) + } + } + for key, value := range id2segs { + sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime }) + selectedSegs := make([]UniqueID, 0) + for i, v := range value { + if segmentStates[v].State == datapb.SegmentState_SegmentFlushed { + selectedSegs = append(selectedSegs, v) + } else { + if i > 0 && segmentStates[v-1].State != datapb.SegmentState_SegmentFlushed { + break + } + selectedSegs = append(selectedSegs, v) + } + } + id2segs[key] = selectedSegs + } + + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + + // TODO:: filter channel for query node + for channels, i := range channel2id { + for key, node := range qs.queryNodes { + if channels == node.insertChannels { + statesID := id2segs[i][len(id2segs[i])-1] + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + SegmentIDs: id2segs[i], + LastSegmentState: segmentStates[statesID], + } + status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest) + if err != nil { + return status, err + } + } + } + } + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) + } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil } func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) { - panic("implement me") + dbID := req.DbID + collectionID := req.CollectionID + partitionIDs := req.PartitionIDs + segmentIDs := make([]UniqueID, 0) + for _, partitionID := range partitionIDs { + res, err := qs.replica.getSegmentIDs(dbID, collectionID, partitionID) + if err != nil { + log.Fatal("get segment ids error") + } + segmentIDs = append(segmentIDs, res...) + } + releaseSegmentRequest := &querypb.ReleaseSegmentRequest{ + Base: req.Base, + DbID: dbID, + CollectionID: collectionID, + PartitionIDs: partitionIDs, + SegmentIDs: segmentIDs, + } + + for _, node := range qs.queryNodes { + status, err := node.client.ReleaseSegments(releaseSegmentRequest) + if err != nil { + return status, err + } + } + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil } func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, error) { @@ -200,6 +381,7 @@ func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelRespons allocatedQueryChannel := "query-" + strconv.FormatInt(int64(channelID), 10) allocatedQueryResultChannel := "queryResult-" + strconv.FormatInt(int64(channelID), 10) + //TODO:: query node watch query channels return &querypb.CreateQueryChannelResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -210,21 +392,50 @@ func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelRespons } func (qs *QueryService) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { - panic("implement me") + states, err := qs.replica.getPartitionStates(req.DbID, req.CollectionID, req.PartitionIDs) + if err != nil { + return &querypb.PartitionStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + PartitionDescriptions: states, + }, err + } + return &querypb.PartitionStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + PartitionDescriptions: states, + }, nil } -func NewQueryService(ctx context.Context) (Interface, error) { - Params.Init() - nodeClients := make(map[int]querynode.Node) +func NewQueryService(ctx context.Context) (*QueryService, error) { + nodes := make([]*queryNode, 0) ctx1, cancel := context.WithCancel(ctx) + replica := newMetaReplica() service := &QueryService{ loopCtx: ctx1, loopCancel: cancel, - numQueryNode: 0, - queryNodeClient: nodeClients, + queryNodes: nodes, + replica: replica, + numRegisterNode: 0, numQueryChannel: 0, + enableGrpc: false, } service.stateCode.Store(internalpb2.StateCode_INITIALIZING) service.isInit.Store(false) return service, nil } + +func (qs *QueryService) SetMasterService(masterService MasterServiceInterface) { + qs.masterServiceClient = masterService +} + +func (qs *QueryService) SetDataService(dataService DataServiceInterface) { + qs.dataServiceClient = dataService +} + +func (qs *QueryService) SetEnableGrpc(en bool) { + qs.enableGrpc = en +} diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go new file mode 100644 index 0000000000..067d308aca --- /dev/null +++ b/internal/queryservice/queryservice_test.go @@ -0,0 +1,40 @@ +package queryservice + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueryService_Init(t *testing.T) { + service, err := NewQueryService(context.Background()) + assert.Nil(t, err) + service.Init() + service.Start() + + t.Run("Test create channel", func(t *testing.T) { + response, err := service.CreateQueryChannel() + assert.Nil(t, err) + assert.Equal(t, response.RequestChannel, "query-0") + assert.Equal(t, response.ResultChannel, "queryResult-0") + }) + + t.Run("Test Get statistics channel", func(t *testing.T) { + response, err := service.GetStatisticsChannel() + assert.Nil(t, err) + assert.Equal(t, response, "query-node-stats") + }) + + t.Run("Test Get timeTick channel", func(t *testing.T) { + response, err := service.GetTimeTickChannel() + assert.Nil(t, err) + assert.Equal(t, response, "queryTimeTick") + }) + + service.Stop() +} + +//func TestQueryService_Load(t *testing.T) { +// +//} diff --git a/internal/queryservice/segment.go b/internal/queryservice/segment.go deleted file mode 100644 index 8a18e50277..0000000000 --- a/internal/queryservice/segment.go +++ /dev/null @@ -1,5 +0,0 @@ -package queryservice - -type segment struct { - id UniqueID -} diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index 5b6fa7eeb2..64a201ca25 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -178,6 +178,23 @@ func (gp *BaseTable) tryloadFromEnv() { if err != nil { panic(err) } + + dataServiceAddress := os.Getenv("DATA_SERVICE_ADDRESS") + if dataServiceAddress == "" { + serviceHost, err := gp.Load("dataService.address") + if err != nil { + panic(err) + } + port, err := gp.Load("dataService.port") + if err != nil { + panic(err) + } + dataServiceAddress = serviceHost + ":" + port + } + err = gp.Save("_DataServiceAddress", dataServiceAddress) + if err != nil { + panic(err) + } } func (gp *BaseTable) Load(key string) (string, error) {