diff --git a/go.mod b/go.mod index 82505dd1d2..2606d7e6a0 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect golang.org/x/arch v0.3.0 // indirect google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 // indirect ) require ( diff --git a/go.sum b/go.sum index c391f3d02a..13aee59333 100644 --- a/go.sum +++ b/go.sum @@ -1391,6 +1391,8 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5 google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f h1:rqzndB2lIQGivcXdTuY3Y9NBvr70X+y77woofSRluec= google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f/go.mod h1:gxndsbNG1n4TZcHGgsYEfVGnTxqfEdfiDv6/DADXX9o= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/internal/http/router.go b/internal/http/router.go index d09a8e410b..99fca2526e 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -21,3 +21,6 @@ const HealthzRouterPath = "/healthz" // LogLevelRouterPath is path for Get and Update log level at runtime. const LogLevelRouterPath = "/log/level" + +// EventLogRouterPath is path for eventlog control. +const EventLogRouterPath = "/eventlog" diff --git a/internal/http/server.go b/internal/http/server.go index 5387561df0..cbd483f3ac 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -26,7 +26,9 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/http/healthz" + "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) const ( @@ -51,6 +53,11 @@ func registerDefaults() { Path: HealthzRouterPath, Handler: healthz.Handler(), }) + + Register(&Handler{ + Path: EventLogRouterPath, + Handler: eventlog.Handler(), + }) } func Register(h *Handler) { @@ -81,6 +88,7 @@ func getHTTPAddr() string { if err != nil { return fmt.Sprintf(":%s", DefaultListenPort) } + paramtable.Get().Save(paramtable.Get().CommonCfg.MetricsPort.Key, port) return fmt.Sprintf(":%s", port) } diff --git a/internal/http/server_test.go b/internal/http/server_test.go index cd1b2f2244..a87ac937a1 100644 --- a/internal/http/server_test.go +++ b/internal/http/server_test.go @@ -32,8 +32,13 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) +func TestMain(t *testing.M) { + paramtable.Init() +} + func TestGetHTTPAddr(t *testing.T) { assert.Equal(t, getHTTPAddr(), ":"+DefaultListenPort) testPort := "9092" diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 89b198c55c..88d3cb5fe4 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -204,6 +205,8 @@ func (job *LoadCollectionJob) Execute() error { return errors.Wrap(err, msg) } + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Start load collection %d", collection.CollectionID))) + metrics.QueryCoordNumPartitions.WithLabelValues().Add(float64(len(partitions))) return nil } diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 4709d73a86..a32a7cefa0 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -18,6 +18,7 @@ package meta import ( "context" + "fmt" "sync" "time" @@ -27,6 +28,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -492,6 +494,7 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int newPartition.Status = querypb.LoadStatus_Loaded elapsed := time.Since(newPartition.CreatedAt) metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Partition %d loaded", partitionID))) } err := m.putPartition([]*Partition{newPartition}, savePartition) if err != nil { @@ -520,6 +523,7 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int metrics.QueryCoordNumCollections.WithLabelValues().Inc() metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Collection %d loaded", newCollection.CollectionID))) } return collectionPercent, m.putCollection(saveCollection, newCollection) } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 1fb8dd7c6f..74acb7eec1 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -18,6 +18,7 @@ package observers import ( "context" + "fmt" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" ) @@ -224,4 +226,5 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti zap.Int32("partitionLoadPercentage", loadPercentage), zap.Int32("collectionLoadPercentage", collectionPercentage), ) + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", partition.CollectionID, loadPercentage))) } diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 149278fe13..f1e4a8dd6c 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -29,6 +29,7 @@ import ( "sync" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/paramtable" . "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -134,7 +135,7 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { if _, ok := targetMap[segment.ID()]; ok { continue } - + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) targetMap[segment.ID()] = segment metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 88335fc75e..2963c537cd 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -592,6 +592,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, segment *Local } func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalSegment, indexInfo *querypb.FieldIndexInfo) error { + log.Warn("CQX load segment index", zap.Int64("segmentID", segment.ID())) filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths)) for _, indexPath := range indexInfo.IndexFilePaths { diff --git a/pkg/config/manager.go b/pkg/config/manager.go index a853abdb3a..be806de951 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -129,6 +129,9 @@ func (m *Manager) GetConfigs() map[string]string { } config[key] = sValue } + for key, value := range m.overlays { + config[key] = value + } return config } diff --git a/pkg/eventlog/event_log.pb.go b/pkg/eventlog/event_log.pb.go new file mode 100644 index 0000000000..b5f84d92b7 --- /dev/null +++ b/pkg/eventlog/event_log.pb.go @@ -0,0 +1,290 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: event_log.proto + +package eventlog + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Level int32 + +const ( + Level_Undefined Level = 0 + Level_Debug Level = 1 + Level_Info Level = 2 + Level_Warn Level = 3 + Level_Error Level = 4 +) + +var Level_name = map[int32]string{ + 0: "Undefined", + 1: "Debug", + 2: "Info", + 3: "Warn", + 4: "Error", +} + +var Level_value = map[string]int32{ + "Undefined": 0, + "Debug": 1, + "Info": 2, + "Warn": 3, + "Error": 4, +} + +func (x Level) String() string { + return proto.EnumName(Level_name, int32(x)) +} + +func (Level) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{0} +} + +type ListenRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListenRequest) Reset() { *m = ListenRequest{} } +func (m *ListenRequest) String() string { return proto.CompactTextString(m) } +func (*ListenRequest) ProtoMessage() {} +func (*ListenRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{0} +} + +func (m *ListenRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListenRequest.Unmarshal(m, b) +} +func (m *ListenRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListenRequest.Marshal(b, m, deterministic) +} +func (m *ListenRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListenRequest.Merge(m, src) +} +func (m *ListenRequest) XXX_Size() int { + return xxx_messageInfo_ListenRequest.Size(m) +} +func (m *ListenRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListenRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListenRequest proto.InternalMessageInfo + +type Event struct { + Level Level `protobuf:"varint,1,opt,name=level,proto3,enum=milvus.proto.eventlog.Level" json:"level,omitempty"` + Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + Ts int64 `protobuf:"varint,4,opt,name=ts,proto3" json:"ts,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Event) Reset() { *m = Event{} } +func (m *Event) String() string { return proto.CompactTextString(m) } +func (*Event) ProtoMessage() {} +func (*Event) Descriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{1} +} + +func (m *Event) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Event.Unmarshal(m, b) +} +func (m *Event) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Event.Marshal(b, m, deterministic) +} +func (m *Event) XXX_Merge(src proto.Message) { + xxx_messageInfo_Event.Merge(m, src) +} +func (m *Event) XXX_Size() int { + return xxx_messageInfo_Event.Size(m) +} +func (m *Event) XXX_DiscardUnknown() { + xxx_messageInfo_Event.DiscardUnknown(m) +} + +var xxx_messageInfo_Event proto.InternalMessageInfo + +func (m *Event) GetLevel() Level { + if m != nil { + return m.Level + } + return Level_Undefined +} + +func (m *Event) GetType() int32 { + if m != nil { + return m.Type + } + return 0 +} + +func (m *Event) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Event) GetTs() int64 { + if m != nil { + return m.Ts + } + return 0 +} + +func init() { + proto.RegisterEnum("milvus.proto.eventlog.Level", Level_name, Level_value) + proto.RegisterType((*ListenRequest)(nil), "milvus.proto.eventlog.ListenRequest") + proto.RegisterType((*Event)(nil), "milvus.proto.eventlog.Event") +} + +func init() { proto.RegisterFile("event_log.proto", fileDescriptor_443313318a2fd90c) } + +var fileDescriptor_443313318a2fd90c = []byte{ + // 276 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0x5f, 0x4b, 0xc3, 0x30, + 0x14, 0xc5, 0x97, 0xae, 0x1d, 0xee, 0xe2, 0xb6, 0x12, 0x10, 0x8a, 0xf8, 0x50, 0x8a, 0x0f, 0x65, + 0x60, 0x2b, 0xf5, 0x0b, 0x88, 0xb8, 0x07, 0xa1, 0x0f, 0x52, 0x11, 0xc1, 0x17, 0xe9, 0x9f, 0xbb, + 0x18, 0xec, 0x92, 0x9a, 0xa4, 0x05, 0xbf, 0xbd, 0x34, 0x55, 0xf0, 0xc1, 0xbd, 0x9d, 0xe4, 0x9c, + 0xfb, 0xe3, 0x1c, 0xd8, 0xe0, 0x80, 0xc2, 0xbc, 0xb5, 0x92, 0x25, 0x9d, 0x92, 0x46, 0xd2, 0xb3, + 0x03, 0x6f, 0x87, 0x5e, 0x4f, 0xaf, 0xc4, 0xba, 0xad, 0x64, 0xd1, 0x06, 0x56, 0x39, 0xd7, 0x06, + 0x45, 0x81, 0x9f, 0x3d, 0x6a, 0x13, 0x69, 0xf0, 0x76, 0xa3, 0x49, 0x33, 0xf0, 0x5a, 0x1c, 0xb0, + 0x0d, 0x48, 0x48, 0xe2, 0x75, 0x76, 0x91, 0xfc, 0x0b, 0x48, 0xf2, 0x31, 0x53, 0x4c, 0x51, 0x4a, + 0xc1, 0x35, 0x5f, 0x1d, 0x06, 0x4e, 0x48, 0x62, 0xaf, 0xb0, 0x7a, 0xfc, 0x6b, 0x4a, 0x53, 0x06, + 0xf3, 0x90, 0xc4, 0xa7, 0x85, 0xd5, 0x74, 0x0d, 0x8e, 0xd1, 0x81, 0x1b, 0x92, 0x78, 0x5e, 0x38, + 0x46, 0x6f, 0x6f, 0xc1, 0xb3, 0x1c, 0xba, 0x82, 0xe5, 0xb3, 0x68, 0x70, 0xcf, 0x05, 0x36, 0xfe, + 0x8c, 0x2e, 0xc1, 0xbb, 0xc7, 0xaa, 0x67, 0x3e, 0xa1, 0x27, 0xe0, 0x3e, 0x88, 0xbd, 0xf4, 0x9d, + 0x51, 0xbd, 0x94, 0x4a, 0xf8, 0xf3, 0xd1, 0xde, 0x29, 0x25, 0x95, 0xef, 0x66, 0x35, 0x6c, 0x6c, + 0xed, 0x5c, 0xb2, 0x27, 0x54, 0x03, 0xaf, 0x91, 0x3e, 0xc2, 0x62, 0x9a, 0x46, 0x2f, 0x8f, 0x75, + 0xff, 0xbb, 0xfc, 0xfc, 0xd8, 0x42, 0xcb, 0x8d, 0x66, 0xd7, 0xe4, 0x6e, 0xfb, 0x1a, 0x33, 0x6e, + 0xde, 0xfb, 0x2a, 0xa9, 0xe5, 0x21, 0x9d, 0xd2, 0x57, 0x5c, 0xfe, 0xa8, 0xb4, 0xfb, 0x60, 0xe9, + 0xef, 0x55, 0xb5, 0xb0, 0x94, 0x9b, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x46, 0x60, 0x11, + 0x89, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// EventLogServiceClient is the client API for EventLogService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type EventLogServiceClient interface { + Listen(ctx context.Context, in *ListenRequest, opts ...grpc.CallOption) (EventLogService_ListenClient, error) +} + +type eventLogServiceClient struct { + cc *grpc.ClientConn +} + +func NewEventLogServiceClient(cc *grpc.ClientConn) EventLogServiceClient { + return &eventLogServiceClient{cc} +} + +func (c *eventLogServiceClient) Listen(ctx context.Context, in *ListenRequest, opts ...grpc.CallOption) (EventLogService_ListenClient, error) { + stream, err := c.cc.NewStream(ctx, &_EventLogService_serviceDesc.Streams[0], "/milvus.proto.eventlog.EventLogService/Listen", opts...) + if err != nil { + return nil, err + } + x := &eventLogServiceListenClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type EventLogService_ListenClient interface { + Recv() (*Event, error) + grpc.ClientStream +} + +type eventLogServiceListenClient struct { + grpc.ClientStream +} + +func (x *eventLogServiceListenClient) Recv() (*Event, error) { + m := new(Event) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// EventLogServiceServer is the server API for EventLogService service. +type EventLogServiceServer interface { + Listen(*ListenRequest, EventLogService_ListenServer) error +} + +// UnimplementedEventLogServiceServer can be embedded to have forward compatible implementations. +type UnimplementedEventLogServiceServer struct { +} + +func (*UnimplementedEventLogServiceServer) Listen(req *ListenRequest, srv EventLogService_ListenServer) error { + return status.Errorf(codes.Unimplemented, "method Listen not implemented") +} + +func RegisterEventLogServiceServer(s *grpc.Server, srv EventLogServiceServer) { + s.RegisterService(&_EventLogService_serviceDesc, srv) +} + +func _EventLogService_Listen_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ListenRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EventLogServiceServer).Listen(m, &eventLogServiceListenServer{stream}) +} + +type EventLogService_ListenServer interface { + Send(*Event) error + grpc.ServerStream +} + +type eventLogServiceListenServer struct { + grpc.ServerStream +} + +func (x *eventLogServiceListenServer) Send(m *Event) error { + return x.ServerStream.SendMsg(m) +} + +var _EventLogService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "milvus.proto.eventlog.EventLogService", + HandlerType: (*EventLogServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Listen", + Handler: _EventLogService_Listen_Handler, + ServerStreams: true, + }, + }, + Metadata: "event_log.proto", +} diff --git a/pkg/eventlog/event_log.proto b/pkg/eventlog/event_log.proto new file mode 100644 index 0000000000..4f4d57967f --- /dev/null +++ b/pkg/eventlog/event_log.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +package milvus.proto.eventlog; + +option go_package = "github.com/milvus-io/milvus/pkg/eventlog"; + +service EventLogService { + rpc Listen(ListenRequest) returns(stream Event) {} +} + +message ListenRequest { +} + +message Event { + Level level = 1; + int32 type = 2; + bytes data = 3; + int64 ts = 4; +} + +enum Level { + Undefined = 0; + Debug = 1; + Info = 2; + Warn = 3; + Error = 4; +} diff --git a/pkg/eventlog/evt_raw.go b/pkg/eventlog/evt_raw.go new file mode 100644 index 0000000000..7c870e6f66 --- /dev/null +++ b/pkg/eventlog/evt_raw.go @@ -0,0 +1,43 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +// rawEvt implement `Evt` interface with plain event msg. +type rawEvt struct { + level Level + tp int32 + data []byte +} + +func (l *rawEvt) Level() Level { + return l.level +} + +func (l *rawEvt) Type() int32 { + return l.tp +} + +func (l *rawEvt) Raw() []byte { + return l.data +} + +func NewRawEvt(level Level, data string) Evt { + return &rawEvt{ + level: level, + data: []byte(data), + } +} diff --git a/pkg/eventlog/global.go b/pkg/eventlog/global.go new file mode 100644 index 0000000000..bd363d5096 --- /dev/null +++ b/pkg/eventlog/global.go @@ -0,0 +1,89 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "go.uber.org/atomic" +) + +var ( + global atomic.Pointer[globalLogger] + sfGlobal conc.Singleflight[*globalLogger] +) + +func getGlobalLogger() *globalLogger { + if l := global.Load(); l != nil { + return l + } + l, _, _ := sfGlobal.Do("evt_global_logger", func() (*globalLogger, error) { + if l := global.Load(); l != nil { + return l, nil + } + + l := &globalLogger{ + level: Level_Info, + loggers: typeutil.NewConcurrentMap[string, Logger](), + } + global.Store(l) + return l, nil + }) + return l +} + +// globalLogger implement `Logger` interface for package level util functions +type globalLogger struct { + level Level + loggers *typeutil.ConcurrentMap[string, Logger] +} + +// Records implements `Logger`, dispatches evt to all registered Loggers. +func (l *globalLogger) Record(evt Evt) { + if evt.Level() < l.level { + return + } + l.loggers.Range(func(_ string, subL Logger) bool { + subL.Record(evt) + return true + }) +} + +// RecordFunc implements `Logger`, dispatches evtFn to all registered Loggers. +func (l *globalLogger) RecordFunc(lvl Level, fn func() Evt) { + if lvl < l.level { + return + } + l.loggers.Range(func(_ string, subL Logger) bool { + subL.RecordFunc(lvl, fn) + return true + }) +} + +// Flush implements `Logger`, dispatch Flush to all registered Loggers. +func (l *globalLogger) Flush() error { + l.loggers.Range(func(_ string, subL Logger) bool { + subL.Flush() + return true + }) + return nil +} + +// Register adds a logger into global loggers with provided key. +func (l *globalLogger) Register(key string, logger Logger) { + l.loggers.GetOrInsert(key, logger) +} diff --git a/pkg/eventlog/global_test.go b/pkg/eventlog/global_test.go new file mode 100644 index 0000000000..dc978cdcdc --- /dev/null +++ b/pkg/eventlog/global_test.go @@ -0,0 +1,108 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "testing" + + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type GlobalLoggerSuite struct { + suite.Suite +} + +func (s *GlobalLoggerSuite) TearDownTest() { + global.Store(nil) +} + +func (s *GlobalLoggerSuite) TestGetGlobalLogger() { + l := getGlobalLogger() + s.NotNil(l) + + s.Equal(Level_Info, l.level) + + s.Equal(global.Load(), l) + + la := getGlobalLogger() + s.Equal(l, la) +} + +func (s *GlobalLoggerSuite) TestRecord() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + getGlobalLogger().Register("mock1", mock1) + getGlobalLogger().Register("mock2", mock2) + + rawEvt := NewRawEvt(Level_Info, "test") + + mock1.EXPECT().Record(rawEvt) + mock2.EXPECT().Record(rawEvt) + + getGlobalLogger().Record(rawEvt) + + mock3 := NewMockLogger(s.T()) + + getGlobalLogger().Register("mock3", mock3) // register logger without expectations + + rawEvt = NewRawEvt(Level_Debug, "test") + + getGlobalLogger().Record(rawEvt) +} + +func (s *GlobalLoggerSuite) TestRecordFunc() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + getGlobalLogger().Register("mock1", mock1) + getGlobalLogger().Register("mock2", mock2) + + rawEvt := NewRawEvt(Level_Info, "test") + + mock1.EXPECT().RecordFunc(mock.Anything, mock.Anything) + mock2.EXPECT().RecordFunc(mock.Anything, mock.Anything) + + getGlobalLogger().RecordFunc(Level_Info, func() Evt { return rawEvt }) + + mock3 := NewMockLogger(s.T()) + + getGlobalLogger().Register("mock3", mock3) // register logger without expectations + + rawEvt = NewRawEvt(Level_Debug, "test") + + getGlobalLogger().RecordFunc(Level_Debug, func() Evt { return rawEvt }) +} + +func (s *GlobalLoggerSuite) TestFlush() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + getGlobalLogger().Register("mock1", mock1) + getGlobalLogger().Register("mock2", mock2) + + mock1.EXPECT().Flush().Return(nil) + mock2.EXPECT().Flush().Return(nil) + + err := getGlobalLogger().Flush() + s.NoError(err) +} + +func TestGlobalLogger(t *testing.T) { + suite.Run(t, new(GlobalLoggerSuite)) +} diff --git a/pkg/eventlog/grpc.go b/pkg/eventlog/grpc.go new file mode 100644 index 0000000000..ddb515869b --- /dev/null +++ b/pkg/eventlog/grpc.go @@ -0,0 +1,172 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "net" + "sync" + "time" + + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "go.uber.org/atomic" + "google.golang.org/grpc" +) + +var ( + grpcLog atomic.Pointer[grpcLogger] + sf conc.Singleflight[*grpcLogger] +) + +// grpcLogger is a Logger with dispatches streaming Evt to client listeners. +type grpcLogger struct { + level atomic.Int32 + lis net.Listener + port int + + clients *typeutil.ConcurrentMap[string, *listenerClient] +} + +func (l *grpcLogger) SetLevel(lvl Level) { + l.level.Store(int32(lvl)) +} + +func (l *grpcLogger) GetLevel() Level { + return Level(l.level.Load()) +} + +func (l *grpcLogger) Record(evt Evt) { + if evt.Level() < Level(l.level.Load()) { + return + } + + l.clients.Range(func(key string, client *listenerClient) bool { + client.Notify(evt) + return true + }) +} + +func (l *grpcLogger) RecordFunc(lvl Level, fn func() Evt) { + if lvl < l.GetLevel() { + return + } + + l.Record(fn()) +} + +func (l *grpcLogger) Flush() error { + return nil +} + +func (l *grpcLogger) Listen(req *ListenRequest, svr EventLogService_ListenServer) error { + client := newListenerClient() + key := funcutil.RandomString(8) + l.clients.Insert(key, client) + defer func() { + client, ok := l.clients.GetAndRemove(key) + if ok { + client.Stop() + } + }() + for { + select { + case evt := <-client.ch: + err := svr.Send(&Event{ + Level: evt.Level(), + Type: evt.Type(), + Data: evt.Raw(), + Ts: time.Now().UnixNano(), + }) + if err != nil { + return nil + } + case <-svr.Context().Done(): + return nil + case <-client.closed: + } + } +} + +func (l *grpcLogger) Close() { + if l.lis != nil { + l.lis.Close() + } +} + +// getGrpcLogger starts or returns the singleton grpcLogger listening port. +func getGrpcLogger() (int, error) { + if l := grpcLog.Load(); l != nil { + return l.port, nil + } + l, err, _ := sf.Do("grpc_evt_log", func() (*grpcLogger, error) { + if grpcLog.Load() != nil { + return grpcLog.Load(), nil + } + lis, err := net.Listen("tcp", ":0") + if err != nil { + return nil, err + } + + port := lis.Addr().(*net.TCPAddr).Port + + svr := grpc.NewServer() + l := &grpcLogger{ + lis: lis, + port: port, + clients: typeutil.NewConcurrentMap[string, *listenerClient](), + } + l.SetLevel(Level_Debug) + RegisterEventLogServiceServer(svr, l) + go svr.Serve(lis) + + grpcLog.Store(l) + getGlobalLogger().Register("grpc_logger", l) + return l, nil + }) + if err != nil { + return -1, err + } + + return l.port, nil +} + +type listenerClient struct { + ch chan Evt + closed chan struct{} + once sync.Once +} + +func newListenerClient() *listenerClient { + return &listenerClient{ + ch: make(chan Evt, 100), + closed: make(chan struct{}), + } +} + +func (c *listenerClient) Notify(l Evt) { + select { + case c.ch <- l: + default: + } +} + +func (c *listenerClient) Stop() { + c.once.Do(func() { + close(c.ch) + }) +} diff --git a/pkg/eventlog/grpc_test.go b/pkg/eventlog/grpc_test.go new file mode 100644 index 0000000000..d4d9c3e963 --- /dev/null +++ b/pkg/eventlog/grpc_test.go @@ -0,0 +1,203 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + context "context" + fmt "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type GrpcLoggerSuite struct { + suite.Suite + l *grpcLogger + port int +} + +type localListenerClient struct { + conn *grpc.ClientConn + client EventLogServiceClient + listenClient EventLogService_ListenClient + result chan *Event +} + +func (c *localListenerClient) listen(t *testing.T) { + for { + evt, err := c.listenClient.Recv() + if err != nil { + return + } + + select { + case c.result <- evt: + default: + } + } +} + +func (c *localListenerClient) close() { + if c.conn != nil { + c.conn.Close() + } + if c.result != nil { + close(c.result) + } +} + +func (s *GrpcLoggerSuite) SetupTest() { + port, err := getGrpcLogger() + s.Require().NoError(err) + s.port = port + + s.l = grpcLog.Load() + s.Require().NotNil(s.l) +} + +func (s *GrpcLoggerSuite) registerClient() *localListenerClient { + ctx := context.Background() + addr := fmt.Sprintf("127.0.0.1:%d", s.port) + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithTimeout(time.Second), + } + + conn, err := grpc.DialContext(ctx, addr, opts...) + + s.Require().NoError(err) + + client := NewEventLogServiceClient(conn) + + listenClient, err := client.Listen(ctx, &ListenRequest{}) + s.Require().NoError(err) + + c := &localListenerClient{ + conn: conn, + client: client, + listenClient: listenClient, + result: make(chan *Event, 100), + } + + go c.listen(s.T()) + + return c +} + +func (s *GrpcLoggerSuite) TestRecord() { + s.Run("normal_case", func() { + c := s.registerClient() + + s.Eventually(func() bool { + return s.l.clients.Len() == 1 + }, time.Second, time.Millisecond*100) + + s.l.Record(NewRawEvt(Level_Info, "test")) + + evt := <-c.result + s.Equal(Level_Info, evt.GetLevel()) + s.EqualValues("test", evt.GetData()) + + c.close() + + s.Eventually(func() bool { + return s.l.clients.Len() == 0 + }, time.Second, time.Millisecond*100) + }) + + s.Run("skip_level", func() { + s.l.SetLevel(Level_Warn) + defer s.l.SetLevel(Level_Debug) + c := s.registerClient() + + s.Eventually(func() bool { + return s.l.clients.Len() == 1 + }, time.Second, time.Millisecond*100) + + s.l.Record(NewRawEvt(Level_Info, "test")) + + c.close() + + s.Eventually(func() bool { + return s.l.clients.Len() == 0 + }, time.Second, time.Millisecond*100) + + var result []*Event + for evt := range c.result { + result = append(result, evt) + } + s.Equal(0, len(result)) + }) +} + +func (s *GrpcLoggerSuite) TestRecordFunc() { + s.Run("normal_case", func() { + c := s.registerClient() + + s.Eventually(func() bool { + return s.l.clients.Len() == 1 + }, time.Second, time.Millisecond*100) + + s.l.RecordFunc(Level_Info, func() Evt { return NewRawEvt(Level_Info, "test") }) + + evt := <-c.result + s.Equal(Level_Info, evt.GetLevel()) + s.EqualValues("test", evt.GetData()) + + c.close() + + s.Eventually(func() bool { + return s.l.clients.Len() == 0 + }, time.Second, time.Millisecond*100) + }) + + s.Run("skip_level", func() { + s.l.SetLevel(Level_Warn) + defer s.l.SetLevel(Level_Debug) + c := s.registerClient() + + s.Eventually(func() bool { + return s.l.clients.Len() == 1 + }, time.Second, time.Millisecond*100) + + s.l.RecordFunc(Level_Info, func() Evt { return NewRawEvt(Level_Info, "test") }) + + c.close() + + s.Eventually(func() bool { + return s.l.clients.Len() == 0 + }, time.Second, time.Millisecond*100) + + var result []*Event + for evt := range c.result { + result = append(result, evt) + } + s.Equal(0, len(result)) + }) +} + +func (s *GrpcLoggerSuite) TestFlush() { + s.NoError(s.l.Flush()) +} + +func TestGrpcLogger(t *testing.T) { + suite.Run(t, new(GrpcLoggerSuite)) +} diff --git a/pkg/eventlog/handler.go b/pkg/eventlog/handler.go new file mode 100644 index 0000000000..4ba5ea496d --- /dev/null +++ b/pkg/eventlog/handler.go @@ -0,0 +1,74 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "encoding/json" + "net/http" + + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" +) + +const ( + // ContentTypeHeader is the health check request type header. + ContentTypeHeader = "Content-Type" + // ContentTypeText is the health check request type text. + ContentTypeText = "text/plain" + // ContentTypeJSON is another health check request type text, which response contains more info. + ContentTypeJSON = "application/json" +) + +// eventLogHandler is the event log http handler +type eventLogHandler struct { +} + +func Handler() http.Handler { + return &eventLogHandler{} +} + +type eventLogResponse struct { + Status int `json:"status"` + Port int `json:"port"` +} + +// ServeHTTP handler which start a grpc server listen on random available port. +func (h *eventLogHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + resp := &eventLogResponse{ + Status: http.StatusOK, + } + + port, err := getGrpcLogger() + if err != nil { + resp.Status = http.StatusInternalServerError + writeJSON(w, r, resp) + return + } + + resp.Port = port + + writeJSON(w, r, resp) +} + +func writeJSON(w http.ResponseWriter, r *http.Request, resp *eventLogResponse) { + w.Header().Set(ContentTypeHeader, ContentTypeJSON) + bs, err := json.Marshal(resp) + if err != nil { + log.Warn("faild to send response", zap.Error(err)) + } + w.Write(bs) +} diff --git a/pkg/eventlog/handler_test.go b/pkg/eventlog/handler_test.go new file mode 100644 index 0000000000..54c4487bbf --- /dev/null +++ b/pkg/eventlog/handler_test.go @@ -0,0 +1,75 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/suite" +) + +type HandlerSuite struct { + suite.Suite +} + +func (s *HandlerSuite) SetupTest() { + s.cleanGrpcLog() +} + +func (s *HandlerSuite) TearDownTest() { + s.cleanGrpcLog() +} + +func (s *HandlerSuite) cleanGrpcLog() { + l := grpcLog.Load() + if l != nil { + l.Close() + } +} + +func (s *HandlerSuite) TestServerHTTP() { + req := httptest.NewRequest(http.MethodGet, "/eventlog", nil) + w := httptest.NewRecorder() + + handler := Handler() + + handler.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + data, err := ioutil.ReadAll(res.Body) + s.Require().NoError(err) + + resp := eventLogResponse{} + + err = json.Unmarshal(data, &resp) + s.Require().NoError(err) + + s.Equal(http.StatusOK, resp.Status) + + l := grpcLog.Load() + s.NotNil(l) + s.Equal(l.port, resp.Port) +} + +func TestHandler(t *testing.T) { + suite.Run(t, new(HandlerSuite)) +} diff --git a/pkg/eventlog/logger.go b/pkg/eventlog/logger.go new file mode 100644 index 0000000000..ef0c31839a --- /dev/null +++ b/pkg/eventlog/logger.go @@ -0,0 +1,54 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +// Logger is the interface for event loggers. +type Logger interface { + // Record append log into logger directly. + Record(Evt) + // RecordFunc performs log level check & other implementation related check before composing the log entity + // preferred to use this when in performance related path + RecordFunc(Level, func() Evt) + // Flush is the API to invoke flush operation for logger (if any). + Flush() error +} + +// Evt is event log interface. +type Evt interface { + Level() Level + Type() int32 + Raw() []byte +} + +// Record is the global helper function to `globalLogger.Record`. +func Record(evt Evt) { + getGlobalLogger().Record(evt) +} + +// RecordFunc is the global helper function to `globalLogger.RecordFunc`. +func RecordFunc(lvl Level, fn func() Evt) { + getGlobalLogger().RecordFunc(lvl, fn) +} + +// Flush is the global helper function to `globalLogger.Flush`. +func Flush() error { + return getGlobalLogger().Flush() +} + +func Register(key string, logger Logger) { + getGlobalLogger().Register(key, logger) +} diff --git a/pkg/eventlog/logger_test.go b/pkg/eventlog/logger_test.go new file mode 100644 index 0000000000..9a94b81116 --- /dev/null +++ b/pkg/eventlog/logger_test.go @@ -0,0 +1,96 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventlog + +import ( + "testing" + + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type LoggerSuite struct { + suite.Suite +} + +func (s *LoggerSuite) TearDownTest() { + global.Store(nil) +} + +func (s *LoggerSuite) TestRecord() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + Register("mock1", mock1) + Register("mock2", mock2) + + rawEvt := NewRawEvt(Level_Info, "test") + + mock1.EXPECT().Record(rawEvt) + mock2.EXPECT().Record(rawEvt) + + Record(rawEvt) + + mock3 := NewMockLogger(s.T()) + + Register("mock3", mock3) // register logger without expectations + + rawEvt = NewRawEvt(Level_Debug, "test") + + Record(rawEvt) +} + +func (s *LoggerSuite) TestRecordFunc() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + Register("mock1", mock1) + Register("mock2", mock2) + + rawEvt := NewRawEvt(Level_Info, "test") + + mock1.EXPECT().RecordFunc(mock.Anything, mock.Anything) + mock2.EXPECT().RecordFunc(mock.Anything, mock.Anything) + + RecordFunc(Level_Info, func() Evt { return rawEvt }) + + mock3 := NewMockLogger(s.T()) + + Register("mock3", mock3) // register logger without expectations + + rawEvt = NewRawEvt(Level_Debug, "test") + + RecordFunc(Level_Debug, func() Evt { return rawEvt }) +} + +func (s *LoggerSuite) TestFlush() { + mock1 := NewMockLogger(s.T()) + mock2 := NewMockLogger(s.T()) + + Register("mock1", mock1) + Register("mock2", mock2) + + mock1.EXPECT().Flush().Return(nil) + mock2.EXPECT().Flush().Return(nil) + + err := Flush() + s.NoError(err) +} + +func TestLogger(t *testing.T) { + suite.Run(t, new(LoggerSuite)) +} diff --git a/pkg/eventlog/mock_logger.go b/pkg/eventlog/mock_logger.go new file mode 100644 index 0000000000..b534f014e6 --- /dev/null +++ b/pkg/eventlog/mock_logger.go @@ -0,0 +1,126 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package eventlog + +import mock "github.com/stretchr/testify/mock" + +// MockLogger is an autogenerated mock type for the Logger type +type MockLogger struct { + mock.Mock +} + +type MockLogger_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLogger) EXPECT() *MockLogger_Expecter { + return &MockLogger_Expecter{mock: &_m.Mock} +} + +// Flush provides a mock function with given fields: +func (_m *MockLogger) Flush() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLogger_Flush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Flush' +type MockLogger_Flush_Call struct { + *mock.Call +} + +// Flush is a helper method to define mock.On call +func (_e *MockLogger_Expecter) Flush() *MockLogger_Flush_Call { + return &MockLogger_Flush_Call{Call: _e.mock.On("Flush")} +} + +func (_c *MockLogger_Flush_Call) Run(run func()) *MockLogger_Flush_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLogger_Flush_Call) Return(_a0 error) *MockLogger_Flush_Call { + _c.Call.Return(_a0) + return _c +} + +// Record provides a mock function with given fields: _a0 +func (_m *MockLogger) Record(_a0 Evt) { + _m.Called(_a0) +} + +// MockLogger_Record_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Record' +type MockLogger_Record_Call struct { + *mock.Call +} + +// Record is a helper method to define mock.On call +// - _a0 Evt +func (_e *MockLogger_Expecter) Record(_a0 interface{}) *MockLogger_Record_Call { + return &MockLogger_Record_Call{Call: _e.mock.On("Record", _a0)} +} + +func (_c *MockLogger_Record_Call) Run(run func(_a0 Evt)) *MockLogger_Record_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(Evt)) + }) + return _c +} + +func (_c *MockLogger_Record_Call) Return() *MockLogger_Record_Call { + _c.Call.Return() + return _c +} + +// RecordFunc provides a mock function with given fields: _a0, _a1 +func (_m *MockLogger) RecordFunc(_a0 Level, _a1 func() Evt) { + _m.Called(_a0, _a1) +} + +// MockLogger_RecordFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordFunc' +type MockLogger_RecordFunc_Call struct { + *mock.Call +} + +// RecordFunc is a helper method to define mock.On call +// - _a0 Level +// - _a1 func() Evt +func (_e *MockLogger_Expecter) RecordFunc(_a0 interface{}, _a1 interface{}) *MockLogger_RecordFunc_Call { + return &MockLogger_RecordFunc_Call{Call: _e.mock.On("RecordFunc", _a0, _a1)} +} + +func (_c *MockLogger_RecordFunc_Call) Run(run func(_a0 Level, _a1 func() Evt)) *MockLogger_RecordFunc_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(Level), args[1].(func() Evt)) + }) + return _c +} + +func (_c *MockLogger_RecordFunc_Call) Return() *MockLogger_RecordFunc_Call { + _c.Call.Return() + return _c +} + +type mockConstructorTestingTNewMockLogger interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockLogger creates a new instance of MockLogger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLogger(t mockConstructorTestingTNewMockLogger) *MockLogger { + mock := &MockLogger{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 7ab4dadb13..054a4bf55b 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -52,7 +52,7 @@ const ( // Const of Global Config List func globalConfigPrefixs() []string { - return []string{"metastore.", "localStorage.", "etcd.", "mysql.", "minio.", "pulsar.", "kafka.", "rocksmq.", "log.", "grpc.", "common.", "quotaAndLimits."} + return []string{"metastore", "localStorage", "etcd", "mysql", "minio", "pulsar", "kafka", "rocksmq", "log", "grpc", "common", "quotaAndLimits"} } var defaultYaml = []string{"milvus.yaml", "default.yaml", "user.yaml"} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 68b5e90707..4c2424eae2 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -222,6 +222,8 @@ type commonConfig struct { JSONMaxLength ParamItem `refreshable:"false"` ImportMaxFileSize ParamItem `refreshable:"true"` + + MetricsPort ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -645,6 +647,13 @@ like the old password verification when updating the credential`, DefaultValue: fmt.Sprint(16 << 30), } p.ImportMaxFileSize.Init(base.mgr) + + p.MetricsPort = ParamItem{ + Key: "common.MetricsPort", + Version: "2.3.0", + DefaultValue: "9091", + } + p.MetricsPort.Init(base.mgr) } type traceConfig struct {