diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 277fc46770..6166aec09c 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -283,6 +283,13 @@ func (mr *MilvusRoles) setupLogger() { MaxDays: params.LogCfg.MaxAge.GetAsInt(), MaxBackups: params.LogCfg.MaxBackups.GetAsInt(), }, + AsyncWriteEnable: params.LogCfg.AsyncWriteEnable.GetAsBool(), + AsyncWriteFlushInterval: params.LogCfg.AsyncWriteFlushInterval.GetAsDurationByParse(), + AsyncWriteDroppedTimeout: params.LogCfg.AsyncWriteDroppedTimeout.GetAsDurationByParse(), + AsyncWriteStopTimeout: params.LogCfg.AsyncWriteStopTimeout.GetAsDurationByParse(), + AsyncWritePendingLength: params.LogCfg.AsyncWritePendingLength.GetAsInt(), + AsyncWriteBufferSize: int(params.LogCfg.AsyncWriteBufferSize.GetAsSize()), + AsyncWriteMaxBytesPerLog: int(params.LogCfg.AsyncWriteMaxBytesPerLog.GetAsSize()), } id := paramtable.GetNodeID() roleName := paramtable.GetRole() @@ -294,6 +301,7 @@ func (mr *MilvusRoles) setupLogger() { } logutil.SetupLogger(&logConfig) + params.Watch(params.LogCfg.Level.Key, config.NewHandler("log.level", func(event *config.Event) { if !event.HasUpdated || event.EventType == config.DeleteType { return @@ -417,14 +425,6 @@ func (mr *MilvusRoles) Run() { // init tracer before run any component tracer.Init() - // Initialize streaming service if enabled. - - if mr.ServerType == typeutil.StandaloneRole || !mr.EnableDataNode { - // only datanode does not init streaming service - streaming.Init() - defer streaming.Release() - } - enableComponents := []bool{ mr.EnableProxy, mr.EnableQueryNode, @@ -444,6 +444,8 @@ func (mr *MilvusRoles) Run() { expr.Init() expr.Register("param", paramtable.Get()) mr.setupLogger() + defer log.Cleanup() + http.ServeHTTP() setupPrometheusHTTPServer(Registry) @@ -459,6 +461,13 @@ func (mr *MilvusRoles) Run() { } } + // Initialize streaming service if enabled. + if mr.ServerType == typeutil.StandaloneRole || !mr.EnableDataNode { + // only datanode does not init streaming service + streaming.Init() + defer streaming.Release() + } + var wg sync.WaitGroup local := mr.Local diff --git a/internal/streamingcoord/server/balancer/metrics.go b/internal/streamingcoord/server/balancer/metrics.go deleted file mode 100644 index db2249d36a..0000000000 --- a/internal/streamingcoord/server/balancer/metrics.go +++ /dev/null @@ -1 +0,0 @@ -package balancer diff --git a/internal/util/streamingutil/service/discoverer/discoverer.go b/internal/util/streamingutil/service/discoverer/discoverer.go index 86ec8db446..b708cb61a9 100644 --- a/internal/util/streamingutil/service/discoverer/discoverer.go +++ b/internal/util/streamingutil/service/discoverer/discoverer.go @@ -2,9 +2,12 @@ package discoverer import ( "context" + "fmt" + "strings" "google.golang.org/grpc/resolver" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/attributes" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -24,3 +27,15 @@ type VersionedState struct { Version typeutil.Version State resolver.State } + +func (v VersionedState) String() string { + as := make([]string, 0, len(v.State.Addresses)) + for _, addr := range v.State.Addresses { + if nodeID := attributes.GetServerID(addr.Attributes); nodeID != nil { + as = append(as, fmt.Sprintf("%d@%s", *nodeID, addr.Addr)) + continue + } + as = append(as, addr.Addr) + } + return fmt.Sprintf("Version: %s, Addrs: %s", v.Version, strings.Join(as, ",")) +} diff --git a/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go b/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go index c91e369315..92d60ae44a 100644 --- a/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go +++ b/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go @@ -182,11 +182,11 @@ func (r *resolverWithDiscoverer) doDiscover() { latestState := r.getLatestState() if latestState != nil && !state.Version.GT(latestState.Version) { // Ignore the old version. - r.Logger().Info("service discover update, ignore old version", zap.Any("state", state)) + r.Logger().Info("service discover update, ignore old version", zap.Stringer("state", state)) continue } // Update all grpc resolver. - r.Logger().Info("service discover update, update resolver", zap.Any("state", state), zap.Int("resolver_count", len(grpcResolvers))) + r.Logger().Info("service discover update, update resolver", zap.Stringer("state", state), zap.Int("resolver_count", len(grpcResolvers))) for watcher := range grpcResolvers { // Update operation do not block. // Only return error if the resolver is closed, so just print a info log and delete the resolver. diff --git a/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go b/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go index 82407b144b..39a2b8755b 100644 --- a/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go +++ b/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go @@ -51,10 +51,10 @@ func (r *watchBasedGRPCResolver) Update(state VersionedState) error { if err := r.cc.UpdateState(state.State); err != nil { // watch based resolver could ignore the error, just log and return nil - r.Logger().Warn("fail to update resolver state", zap.Any("state", state.State), zap.Error(err)) + r.Logger().Warn("fail to update resolver state", zap.Stringer("state", state), zap.Error(err)) return nil } - r.Logger().Info("update resolver state success", zap.Any("state", state.State)) + r.Logger().Info("update resolver state success", zap.Stringer("state", state)) return nil } diff --git a/pkg/common/common.go b/pkg/common/common.go index 5c48cd375a..21616dab8c 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -28,11 +28,9 @@ import ( "github.com/twpayne/go-geom/encoding/wkb" "github.com/twpayne/go-geom/encoding/wkbcommon" "github.com/twpayne/go-geom/encoding/wkt" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/v2/log" ) // system field id: @@ -300,7 +298,6 @@ func GetIndexType(indexParams []*commonpb.KeyValuePair) string { return param.Value } } - log.Warn("IndexType not found in indexParams") return "" } @@ -485,7 +482,6 @@ func GetCollectionLoadFields(schema *schemapb.CollectionSchema, skipDynamicField v, err := ShouldFieldBeLoaded(field.GetTypeParams()) if err != nil { - log.Warn("type param parse skip load failed", zap.Error(err)) // if configuration cannot be parsed, ignore it and load field return field.GetFieldID(), true } @@ -535,7 +531,6 @@ func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) { if kv.GetKey() == ReplicateEndTSKey { ts, err := strconv.ParseUint(kv.GetValue(), 10, 64) if err != nil { - log.Warn("parse replicate end ts failed", zap.Error(err), zap.Stack("stack")) return 0, false } return ts, true diff --git a/pkg/common/mock_testonly.go b/pkg/common/mock_testonly.go index b139174970..f5d5c26689 100644 --- a/pkg/common/mock_testonly.go +++ b/pkg/common/mock_testonly.go @@ -22,9 +22,7 @@ package common import ( "context" - "fmt" - - "github.com/milvus-io/milvus/pkg/v2/log" + "log" ) type MockTestingT struct { @@ -38,15 +36,15 @@ func NewEmptyMockT() *MockTestingT { } func (m *MockTestingT) Logf(format string, args ...interface{}) { - log.Ctx(m.ctx).Info(fmt.Sprintf(format, args...)) + log.Printf(format, args...) } func (m *MockTestingT) Errorf(format string, args ...interface{}) { - log.Ctx(m.ctx).Error(fmt.Sprintf(format, args...)) + log.Printf(format, args...) } func (m *MockTestingT) FailNow() { - log.Ctx(m.ctx).Panic("FailNow called") + log.Println("FailNow called") } func (m *MockTestingT) Cleanup(func()) {} diff --git a/pkg/log/config.go b/pkg/log/config.go index bae90c58f1..b6cf37945d 100644 --- a/pkg/log/config.go +++ b/pkg/log/config.go @@ -71,6 +71,30 @@ type Config struct { // // Values configured here are per-second. See zapcore.NewSampler for details. Sampling *zap.SamplingConfig `toml:"sampling" json:"sampling"` + + // AsyncWriteEnable enables async write for the logger. + AsyncWriteEnable bool `toml:"async-write-enable" json:"async-write-enable"` + + // AsyncWriteFlushInterval is the interval to flush the logs + AsyncWriteFlushInterval time.Duration `toml:"async-write-flush-interval" json:"async-write-flush-interval"` + + // AsyncWriteDroppedTimeout is the timeout to drop the write request if the buffer is full + AsyncWriteDroppedTimeout time.Duration `toml:"async-write-dropped-timeout" json:"async-write-dropped-timeout"` + + // AsyncWriteNonDroppableLevel is the level that will not be dropped when the buffer is full + AsyncWriteNonDroppableLevel string `toml:"async-write-non-droppable-level" json:"async-write-non-droppable-level"` + + // AsyncWriteStopTimeout is the timeout to stop the async write + AsyncWriteStopTimeout time.Duration `toml:"async-write-stop-timeout" json:"async-write-stop-timeout"` + + // AsyncWritePendingLength is the maximum number of pending write requests, the exceeded log operation will be dropped + AsyncWritePendingLength int `toml:"async-write-pending-length" json:"async-write-pending-length"` + + // AsyncWriteBufferSize is the size of the write buffer + AsyncWriteBufferSize int `toml:"async-write-buffer-size" json:"async-write-buffer-size"` + + // AsyncWriteMaxBytesPerLog is the max bytes per log + AsyncWriteMaxBytesPerLog int `toml:"async-write-max-bytes-per-log" json:"async-write-max-bytes-per-log"` } // ZapProperties records some information about zap. @@ -110,3 +134,28 @@ func (cfg *Config) buildOptions(errSink zapcore.WriteSyncer) []zap.Option { } return opts } + +// initialize initializes the config. +func (cfg *Config) initialize() { + if cfg.AsyncWriteFlushInterval <= 0 { + cfg.AsyncWriteFlushInterval = 10 * time.Second + } + if cfg.AsyncWriteDroppedTimeout <= 0 { + cfg.AsyncWriteDroppedTimeout = 100 * time.Millisecond + } + if _, err := zapcore.ParseLevel(cfg.AsyncWriteNonDroppableLevel); cfg.AsyncWriteNonDroppableLevel == "" || err != nil { + cfg.AsyncWriteNonDroppableLevel = zapcore.ErrorLevel.String() + } + if cfg.AsyncWriteStopTimeout <= 0 { + cfg.AsyncWriteStopTimeout = 1 * time.Second + } + if cfg.AsyncWritePendingLength <= 0 { + cfg.AsyncWritePendingLength = 1024 + } + if cfg.AsyncWriteBufferSize <= 0 { + cfg.AsyncWriteBufferSize = 1024 * 1024 + } + if cfg.AsyncWriteMaxBytesPerLog <= 0 { + cfg.AsyncWriteMaxBytesPerLog = 1024 * 1024 + } +} diff --git a/pkg/log/log.go b/pkg/log/log.go index 946904189b..83cc478c59 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -45,7 +45,7 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) -var _globalL, _globalP, _globalS, _globalR atomic.Value +var _globalL, _globalP, _globalS, _globalR, _globalCleanup atomic.Value var ( _globalLevelLogger sync.Map @@ -122,7 +122,13 @@ func InitLoggerWithWriteSyncer(cfg *Config, output zapcore.WriteSyncer, opts ... if err != nil { return nil, nil, fmt.Errorf("initLoggerWithWriteSyncer UnmarshalText cfg.Level err:%w", err) } - core := NewTextCore(newZapTextEncoder(cfg), output, level) + var core zapcore.Core + if cfg.AsyncWriteEnable { + core = NewAsyncTextIOCore(cfg, output, level) + registerCleanup(core.(*asyncTextIOCore).Stop) + } else { + core = NewTextCore(newZapTextEncoder(cfg), output, level) + } opts = append(cfg.buildOptions(output), opts...) lg := zap.New(core, opts...) r := &ZapProperties{ @@ -212,6 +218,14 @@ func fatalL() *zap.Logger { return v.(*zap.Logger) } +// Cleanup cleans up the global logger and sugared logger. +func Cleanup() { + cleanup := _globalCleanup.Load() + if cleanup != nil { + cleanup.(func())() + } +} + // ReplaceGlobals replaces the global Logger and SugaredLogger. // It's safe for concurrent use. func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) { @@ -220,6 +234,14 @@ func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) { _globalP.Store(props) } +// registerCleanup registers a cleanup function to be called when the global logger is cleaned up. +func registerCleanup(cleanup func()) { + oldCleanup := _globalCleanup.Swap(cleanup) + if oldCleanup != nil { + oldCleanup.(func())() + } +} + func replaceLeveledLoggers(debugLogger *zap.Logger) { levels := []zapcore.Level{ zapcore.DebugLevel, zapcore.InfoLevel, zapcore.WarnLevel, zapcore.ErrorLevel, diff --git a/pkg/log/zap_async_buffered_write_core.go b/pkg/log/zap_async_buffered_write_core.go new file mode 100644 index 0000000000..f725859850 --- /dev/null +++ b/pkg/log/zap_async_buffered_write_core.go @@ -0,0 +1,232 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "fmt" + "time" + + "go.uber.org/zap/buffer" + "go.uber.org/zap/zapcore" + + "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" +) + +// asyncTextIOCore is a wrapper around the textIOCore that writes the logs to the underlying buffered write syncer. +var _ zapcore.Core = (*asyncTextIOCore)(nil) + +// NewAsyncTextIOCore creates a new async text IO core. +func NewAsyncTextIOCore(cfg *Config, ws zapcore.WriteSyncer, enab zapcore.LevelEnabler) *asyncTextIOCore { + enc := newZapTextEncoder(cfg) + bws := &zapcore.BufferedWriteSyncer{ + WS: ws, + Size: cfg.AsyncWriteBufferSize, + FlushInterval: cfg.AsyncWriteFlushInterval, + } + nonDroppableLevel, _ := zapcore.ParseLevel(cfg.AsyncWriteNonDroppableLevel) + asyncTextIOCore := &asyncTextIOCore{ + LevelEnabler: enab, + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + enc: enc, + bws: bws, + pending: make(chan *entryItem, cfg.AsyncWritePendingLength), + writeDroppedTimeout: cfg.AsyncWriteDroppedTimeout, + nonDroppableLevel: nonDroppableLevel, + stopTimeout: cfg.AsyncWriteStopTimeout, + maxBytesPerLog: cfg.AsyncWriteMaxBytesPerLog, + } + go asyncTextIOCore.background() + return asyncTextIOCore +} + +// asyncTextIOCore is a wrapper around the textIOCore that writes the logs to the underlying buffered write syncer. +type asyncTextIOCore struct { + zapcore.LevelEnabler + + notifier *syncutil.AsyncTaskNotifier[struct{}] + enc zapcore.Encoder + bws *zapcore.BufferedWriteSyncer + pending chan *entryItem // the incoming new write requests + writeDroppedTimeout time.Duration + nonDroppableLevel zapcore.Level + stopTimeout time.Duration + maxBytesPerLog int +} + +// entryItem is the item to write to the underlying buffered write syncer. +type entryItem struct { + buf *buffer.Buffer + level zapcore.Level +} + +// With returns a copy of the Core with the given fields added. +func (s *asyncTextIOCore) With(fields []zapcore.Field) zapcore.Core { + enc := s.enc.Clone() + switch e := enc.(type) { + case *textEncoder: + e.addFields(fields) + case zapcore.ObjectEncoder: + for _, field := range fields { + field.AddTo(e) + } + default: + panic(fmt.Sprintf("unsupported encode type: %T for With operation", enc)) + } + return &asyncTextIOCore{ + LevelEnabler: s.LevelEnabler, + notifier: s.notifier, + enc: s.enc.Clone(), + bws: s.bws, + pending: s.pending, + writeDroppedTimeout: s.writeDroppedTimeout, + stopTimeout: s.stopTimeout, + maxBytesPerLog: s.maxBytesPerLog, + } +} + +// Check checks if the entry is enabled by the level enabler. +func (s *asyncTextIOCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if s.Enabled(ent.Level) { + return ce.AddCore(ent, s) + } + return ce +} + +// Write writes the underlying buffered write syncer and buffers the writes in a channel. +// asyncTextIOCore doesn't promise the write operation is done. +// the write operation will be dropped if the buffer is full or the underlying buffered write syncer is blocked. +func (s *asyncTextIOCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + buf, err := s.enc.EncodeEntry(ent, fields) + if err != nil { + return err + } + entry := &entryItem{ + buf: buf, + level: ent.Level, + } + + length := buf.Len() + if length == 0 { + return nil + } + var writeDroppedTimeout <-chan time.Time + if ent.Level < s.nonDroppableLevel { + writeDroppedTimeout = time.After(s.writeDroppedTimeout) + } + select { + case s.pending <- entry: + metrics.LoggingPendingWriteLength.Inc() + metrics.LoggingPendingWriteBytes.Add(float64(length)) + case <-writeDroppedTimeout: + metrics.LoggingDroppedWrites.Inc() + // drop the entry if the write is dropped due to timeout + buf.Free() + } + return nil +} + +// Sync syncs the underlying buffered write syncer. +func (s *asyncTextIOCore) Sync() error { + return nil +} + +// background is the background goroutine to write the logs to the underlying buffered write syncer. +func (s *asyncTextIOCore) background() { + defer func() { + s.flushPendingWriteWithTimeout() + s.notifier.Finish(struct{}{}) + }() + + for { + select { + case <-s.notifier.Context().Done(): + return + case ent := <-s.pending: + s.consumeEntry(ent) + } + } +} + +// consumeEntry write the entry to the underlying buffered write syncer and free the buffer. +func (s *asyncTextIOCore) consumeEntry(ent *entryItem) { + length := ent.buf.Len() + metrics.LoggingPendingWriteLength.Dec() + metrics.LoggingPendingWriteBytes.Sub(float64(length)) + writes := s.getWriteBytes(ent) + if _, err := s.bws.Write(writes); err != nil { + metrics.LoggingIOFailure.Inc() + } + ent.buf.Free() + if ent.level > zapcore.ErrorLevel { + s.bws.Sync() + } +} + +// getWriteBytes gets the bytes to write to the underlying buffered write syncer. +// if the length of the write exceeds the max bytes per log, it will truncate the write and return the truncated bytes. +// otherwise, it will return the original bytes. +func (s *asyncTextIOCore) getWriteBytes(ent *entryItem) []byte { + length := ent.buf.Len() + writes := ent.buf.Bytes() + + if length > s.maxBytesPerLog { + // truncate the write if it exceeds the max bytes per log + metrics.LoggingTruncatedWrites.Inc() + metrics.LoggingTruncatedWriteBytes.Add(float64(length - s.maxBytesPerLog)) + + end := writes[length-1] + writes = writes[:s.maxBytesPerLog] + writes[len(writes)-1] = end + } + return writes +} + +// flushPendingWriteWithTimeout flushes the pending write with a timeout. +func (s *asyncTextIOCore) flushPendingWriteWithTimeout() { + done := make(chan struct{}) + go s.flushAllPendingWrites(done) + + select { + case <-time.After(s.stopTimeout): + case <-done: + } +} + +// flushAllPendingWrites flushes all the pending writes to the underlying buffered write syncer. +func (s *asyncTextIOCore) flushAllPendingWrites(done chan struct{}) { + defer func() { + if err := s.bws.Stop(); err != nil { + metrics.LoggingIOFailure.Inc() + } + close(done) + }() + + for { + select { + case ent := <-s.pending: + s.consumeEntry(ent) + default: + return + } + } +} + +func (s *asyncTextIOCore) Stop() { + s.notifier.Cancel() + s.notifier.BlockUntilFinish() +} diff --git a/pkg/log/zap_async_buffered_write_core_test.go b/pkg/log/zap_async_buffered_write_core_test.go new file mode 100644 index 0000000000..a5e42158be --- /dev/null +++ b/pkg/log/zap_async_buffered_write_core_test.go @@ -0,0 +1,86 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "io" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestAsyncBufferedWriteSyncer(t *testing.T) { + blockWriter := &blockWriter{ + Writer: os.Stdout, + } + syncer := NewAsyncTextIOCore( + &Config{ + AsyncWriteEnable: true, + AsyncWriteFlushInterval: 1 * time.Second, + AsyncWriteDroppedTimeout: 100 * time.Millisecond, + AsyncWriteStopTimeout: 1 * time.Second, + AsyncWritePendingLength: 100, + AsyncWriteBufferSize: 5, + AsyncWriteMaxBytesPerLog: 2, + }, + zapcore.AddSync(blockWriter), + zap.DebugLevel, + ) + wg := &sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + syncer.Write(zapcore.Entry{ + Level: zap.DebugLevel, + Message: "test", + }, []zapcore.Field{ + zap.String("test", "test"), + zap.Int("test", 1), + }) + wg.Done() + }() + } + syncer.Sync() + wg.Wait() + syncer.Stop() +} + +type blockWriter struct { + io.Writer +} + +func (s *blockWriter) Write(p []byte) (n int, err error) { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + if rand.Intn(10) == 0 { + return 0, errors.New("write error") + } + return s.Writer.Write(p) +} + +func (s *blockWriter) Sync() error { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + if rand.Intn(10) == 0 { + return errors.New("sync error") + } + return nil +} diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index e3e78b0538..cbcf164912 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -385,6 +385,7 @@ func registerDataNodeOnce(registry *prometheus.Registry) { registry.MustRegister(DataNodeBuildIndexLatency) registry.MustRegister(DataNodeBuildJSONStatsLatency) registry.MustRegister(DataNodeSlot) + RegisterLoggingMetrics(registry) } func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) { diff --git a/pkg/metrics/logging_metrics.go b/pkg/metrics/logging_metrics.go new file mode 100644 index 0000000000..db5e3ff4cd --- /dev/null +++ b/pkg/metrics/logging_metrics.go @@ -0,0 +1,85 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + loggingMetricSubsystem = "logging" +) + +var ( + LoggingMetricsRegisterOnce sync.Once + + LoggingPendingWriteLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "pending_write_length", + Help: "The length of pending writes in the logging buffer", + }) + + LoggingPendingWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "pending_write_bytes", + Help: "The total bytes of pending writes in the logging buffer", + }) + + LoggingTruncatedWrites = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "truncated_writes", + Help: "The number of truncated writes due to exceeding the max bytes per log", + }) + + LoggingTruncatedWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "truncated_write_bytes", + Help: "The total bytes of truncated writes due to exceeding the max bytes per log", + }) + + LoggingDroppedWrites = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "dropped_writes", + Help: "The number of dropped writes due to buffer full or write timeout", + }) + + LoggingIOFailure = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: loggingMetricSubsystem, + Name: "io_failures", + Help: "The number of IO failures due to underlying write syncer is blocked or write timeout", + }) +) + +// RegisterLoggingMetrics registers logging metrics +func RegisterLoggingMetrics(registry *prometheus.Registry) { + LoggingMetricsRegisterOnce.Do(func() { + registry.MustRegister(LoggingPendingWriteLength) + registry.MustRegister(LoggingPendingWriteBytes) + registry.MustRegister(LoggingTruncatedWrites) + registry.MustRegister(LoggingTruncatedWriteBytes) + registry.MustRegister(LoggingDroppedWrites) + registry.MustRegister(LoggingIOFailure) + }) +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index c38caa6cb1..138ce5d680 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -38,6 +38,7 @@ func TestRegisterMetrics(t *testing.T) { RegisterCGOMetrics(r) RegisterStreamingServiceClient(r) RegisterStreamingNode(r) + RegisterLoggingMetrics(r) }) } diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index a85a24e66a..02042c831c 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -553,6 +553,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyScannedRemoteMB) registry.MustRegister(ProxyScannedTotalMB) RegisterStreamingServiceClient(registry) + RegisterLoggingMetrics(registry) } func CleanupProxyDBMetrics(nodeID int64, dbName string) { diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index c2311e4f4c..e2d1426b8d 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -902,6 +902,7 @@ func RegisterQueryNode(registry *prometheus.Registry) { RegisterCGOMetrics(registry) RegisterStreamingServiceClient(registry) + RegisterLoggingMetrics(registry) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { diff --git a/pkg/metrics/rootcoord_metrics.go b/pkg/metrics/rootcoord_metrics.go index 8da84234fc..b9bdf14f8e 100644 --- a/pkg/metrics/rootcoord_metrics.go +++ b/pkg/metrics/rootcoord_metrics.go @@ -290,6 +290,7 @@ func RegisterMixCoord(registry *prometheus.Registry) { RegisterStreamingServiceClient(registry) RegisterQueryCoord(registry) RegisterDataCoord(registry) + RegisterLoggingMetrics(registry) } func CleanupRootCoordDBMetrics(dbName string) { diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go index d376b38d07..c380dc5e4d 100644 --- a/pkg/metrics/streaming_service_metrics.go +++ b/pkg/metrics/streaming_service_metrics.go @@ -500,6 +500,7 @@ func RegisterStreamingNode(registry *prometheus.Registry) { registry.MustRegister(StreamingNodeConsumeBytes) registerWAL(registry) + RegisterLoggingMetrics(registry) // TODO: after remove the implementation of old data node // Such as flowgraph and writebuffer, we can remove these metrics from streaming node. diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 987c8d7b7c..ec9cbbdade 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1513,14 +1513,22 @@ func (t *holmesConfig) init(base *BaseTable) { } type logConfig struct { - Level ParamItem `refreshable:"false"` - RootPath ParamItem `refreshable:"false"` - MaxSize ParamItem `refreshable:"false"` - MaxAge ParamItem `refreshable:"false"` - MaxBackups ParamItem `refreshable:"false"` - Format ParamItem `refreshable:"false"` - Stdout ParamItem `refreshable:"false"` - GrpcLogLevel ParamItem `refreshable:"false"` + Level ParamItem `refreshable:"false"` + RootPath ParamItem `refreshable:"false"` + MaxSize ParamItem `refreshable:"false"` + MaxAge ParamItem `refreshable:"false"` + MaxBackups ParamItem `refreshable:"false"` + Format ParamItem `refreshable:"false"` + Stdout ParamItem `refreshable:"false"` + GrpcLogLevel ParamItem `refreshable:"false"` + AsyncWriteEnable ParamItem `refreshable:"false"` + AsyncWriteFlushInterval ParamItem `refreshable:"false"` + AsyncWriteDroppedTimeout ParamItem `refreshable:"false"` + AsyncWriteNonDroppableLevel ParamItem `refreshable:"false"` + AsyncWriteStopTimeout ParamItem `refreshable:"false"` + AsyncWritePendingLength ParamItem `refreshable:"false"` + AsyncWriteBufferSize ParamItem `refreshable:"false"` + AsyncWriteMaxBytesPerLog ParamItem `refreshable:"false"` } func (l *logConfig) init(base *BaseTable) { @@ -1597,6 +1605,94 @@ Set this parameter as the path that you have permission to write.`, Export: true, } l.GrpcLogLevel.Init(base.mgr) + + l.AsyncWriteEnable = ParamItem{ + Key: "log.asyncWrite.enable", + DefaultValue: "true", + Version: "2.6.7", + Doc: "Enable async write for the logger.", + Export: false, + } + l.AsyncWriteEnable.Init(base.mgr) + + l.AsyncWriteFlushInterval = ParamItem{ + Key: "log.asyncWrite.flushInterval", + DefaultValue: "10s", + Version: "2.6.7", + Doc: `The interval to flush the logs. +Lower the interval, More frequent the flush will be applied, +Faster the logging writes can be seen by the underlying file system.`, + Export: false, + } + l.AsyncWriteFlushInterval.Init(base.mgr) + + l.AsyncWriteDroppedTimeout = ParamItem{ + Key: "log.asyncWrite.droppedTimeout", + DefaultValue: "100ms", + Version: "2.6.7", + Doc: `The timeout to drop the write operation if the buffer is full. +Once the underlying buffered writer is blocked or too slow and +the pending length is larger than the pending length threshold, +the new incoming write operation will be dropped if it exceeds the timeout.`, + Export: false, + } + l.AsyncWriteDroppedTimeout.Init(base.mgr) + + l.AsyncWriteNonDroppableLevel = ParamItem{ + Key: "log.asyncWrite.nonDroppableLevel", + DefaultValue: "error", + Version: "2.6.7", + Doc: `The level that will not be dropped when the buffer is full. +Once the level is greater or equal to the non-droppable level, +the write operation will not be dropped because the buffer is full`, + Export: false, + } + l.AsyncWriteNonDroppableLevel.Init(base.mgr) + + l.AsyncWriteStopTimeout = ParamItem{ + Key: "log.asyncWrite.stopTimeout", + DefaultValue: "1s", + Version: "2.6.7", + Doc: `The timeout to stop the async write. +When the milvus is on shutdown, +the async writer of logger will try to flush all the pending write operations +to the underlying file system until reaching the timeout.`, + Export: false, + } + l.AsyncWriteStopTimeout.Init(base.mgr) + + l.AsyncWritePendingLength = ParamItem{ + Key: "log.asyncWrite.pendingLength", + DefaultValue: "1024", + Version: "2.6.7", + Doc: `The maximum number of pending write operation. +Once the underlying buffered writer is blocked or too slow, +the pending write operation will be cached in the buffer. +The larger the pending length, the more memory is used, the less logging writes will be dropped.`, + Export: false, + } + l.AsyncWritePendingLength.Init(base.mgr) + + l.AsyncWriteBufferSize = ParamItem{ + Key: "log.asyncWrite.bufferSize", + DefaultValue: "1m", + Version: "2.6.7", + Doc: `The buffer size of the underlying bufio writer. +The larger the buffer size, the more memory is used, +but the less the number of writes to the underlying file system.`, + Export: false, + } + l.AsyncWriteBufferSize.Init(base.mgr) + + l.AsyncWriteMaxBytesPerLog = ParamItem{ + Key: "log.asyncWrite.maxBytesPerLog", + DefaultValue: "1m", + Version: "2.6.7", + Doc: `The max bytes per log. +Once the log message exceeds the max bytes per log, it will be truncated.`, + Export: false, + } + l.AsyncWriteMaxBytesPerLog.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 43bda2d58b..c63de9ef66 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -155,6 +155,18 @@ func TestComponentParam(t *testing.T) { params.Save("common.clusterID", "0") }) + t.Run("test logConfig", func(t *testing.T) { + Params := ¶ms.LogCfg + assert.True(t, Params.AsyncWriteEnable.GetAsBool()) + assert.Equal(t, 10*time.Second, Params.AsyncWriteFlushInterval.GetAsDurationByParse()) + assert.Equal(t, 100*time.Millisecond, Params.AsyncWriteDroppedTimeout.GetAsDurationByParse()) + assert.Equal(t, "error", Params.AsyncWriteNonDroppableLevel.GetValue()) + assert.Equal(t, 1*time.Second, Params.AsyncWriteStopTimeout.GetAsDurationByParse()) + assert.Equal(t, 1024, Params.AsyncWritePendingLength.GetAsInt()) + assert.Equal(t, int64(1024*1024), Params.AsyncWriteBufferSize.GetAsSize()) + assert.Equal(t, int64(1024*1024), Params.AsyncWriteMaxBytesPerLog.GetAsSize()) + }) + t.Run("test rootCoordConfig", func(t *testing.T) { Params := ¶ms.RootCoordCfg diff --git a/pkg/util/typeutil/convension.go b/pkg/util/typeutil/convension.go index 0488090f40..a023ef1bb3 100644 --- a/pkg/util/typeutil/convension.go +++ b/pkg/util/typeutil/convension.go @@ -23,11 +23,9 @@ import ( "reflect" "github.com/x448/float16" - "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/log" ) // Generic Clone for proto message @@ -102,7 +100,6 @@ func Uint64ToBytes(v uint64) []byte { // SliceRemoveDuplicate is used to dedup a Slice func SliceRemoveDuplicate(a interface{}) (ret []interface{}) { if reflect.TypeOf(a).Kind() != reflect.Slice { - log.Error("input is not slice", zap.String("inputType", fmt.Sprintf("%T", a))) return ret } diff --git a/pkg/util/typeutil/conversion_test.go b/pkg/util/typeutil/conversion_test.go index ca47b525cb..05201b25be 100644 --- a/pkg/util/typeutil/conversion_test.go +++ b/pkg/util/typeutil/conversion_test.go @@ -17,14 +17,12 @@ package typeutil import ( + "fmt" "math" "math/rand" "testing" "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/log" ) func TestConversion(t *testing.T) { @@ -104,7 +102,7 @@ func TestConversion(t *testing.T) { v := (rand.Float32() - 0.5) * 100 b := Float32ToFloat16Bytes(v) v2 := Float16BytesToFloat32(b) - log.Info("float16", zap.Float32("v", v), zap.Float32("v2", v2)) + fmt.Printf("float16 v: %f, v2: %f", v, v2) assert.Less(t, math.Abs(float64(v2/v-1)), 0.001) } }) @@ -114,7 +112,7 @@ func TestConversion(t *testing.T) { v := (rand.Float32() - 0.5) * 100 b := Float32ToBFloat16Bytes(v) v2 := BFloat16BytesToFloat32(b) - log.Info("bfloat16", zap.Float32("v", v), zap.Float32("v2", v2)) + fmt.Printf("bfloat16 v: %f, v2: %f", v, v2) assert.Less(t, math.Abs(float64(v2/v-1)), 0.01) } }) diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 097acfa2b2..0b85ea553d 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -31,12 +31,10 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" - "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/log" ) const DynamicFieldMaxLength = 512 @@ -501,7 +499,6 @@ func (helper *SchemaHelper) getDefaultJSONField(fieldName string) (*schemapb.Fie } } errMsg := fmt.Sprintf("field %s not exist", fieldName) - log.Warn(errMsg) return nil, fmt.Errorf("%s", errMsg) } @@ -990,8 +987,6 @@ func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int6 } else { dstScalar.GetGeometryWktData().Data = append(dstScalar.GetGeometryWktData().Data, srcScalar.GeometryWktData.Data[idx]) } - default: - log.Error("Not supported field type", zap.String("field type", fieldData.Type.String())) } case *schemapb.FieldData_Vectors: dim := fieldType.Vectors.Dim @@ -1094,8 +1089,6 @@ func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int6 } else { dstVector.GetVectorArray().Data = append(dstVector.GetVectorArray().Data, srcVector.VectorArray.Data[idx]) } - default: - log.Error("Not supported field type", zap.String("field type", fieldData.Type.String())) } } } @@ -1109,7 +1102,6 @@ func DeleteFieldData(dst []*schemapb.FieldData) { switch fieldType := fieldData.Field.(type) { case *schemapb.FieldData_Scalars: if dst[i] == nil || dst[i].GetScalars() == nil { - log.Info("empty field data can't be deleted") return } dstScalar := dst[i].GetScalars() @@ -1132,12 +1124,9 @@ func DeleteFieldData(dst []*schemapb.FieldData) { dstScalar.GetJsonData().Data = dstScalar.GetJsonData().Data[:len(dstScalar.GetJsonData().Data)-1] case *schemapb.ScalarField_GeometryData: dstScalar.GetGeometryData().Data = dstScalar.GetGeometryData().Data[:len(dstScalar.GetGeometryData().Data)-1] - default: - log.Error("wrong field type added", zap.String("field type", fieldData.Type.String())) } case *schemapb.FieldData_Vectors: if dst[i] == nil || dst[i].GetVectors() == nil { - log.Info("empty field data can't be deleted") return } dim := fieldType.Vectors.Dim @@ -1159,8 +1148,6 @@ func DeleteFieldData(dst []*schemapb.FieldData) { case *schemapb.VectorField_Int8Vector: dstInt8Vector := dstVector.Data.(*schemapb.VectorField_Int8Vector) dstInt8Vector.Int8Vector = dstInt8Vector.Int8Vector[:len(dstInt8Vector.Int8Vector)-int(dim)] - default: - log.Error("wrong field type added", zap.String("field type", fieldData.Type.String())) } } } @@ -1297,7 +1284,6 @@ func UpdateFieldData(base, update []*schemapb.FieldData, baseIdx, updateIdx int6 baseData.Data[baseIdx] = updateData.Data[updateIdx] } default: - log.Error("Not supported scalar field type", zap.String("field type", baseFieldData.Type.String())) return fmt.Errorf("unsupported scalar field type: %s", baseFieldData.Type.String()) } @@ -1379,11 +1365,9 @@ func UpdateFieldData(base, update []*schemapb.FieldData, baseIdx, updateIdx int6 } } default: - log.Error("Not supported vector field type", zap.String("field type", baseFieldData.Type.String())) return fmt.Errorf("unsupported vector field type: %s", baseFieldData.Type.String()) } default: - log.Error("Not supported field type", zap.String("field type", baseFieldData.Type.String())) return fmt.Errorf("unsupported field type: %s", baseFieldData.Type.String()) } } @@ -1523,7 +1507,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error dstScalar.GetBytesData().Data = append(dstScalar.GetBytesData().Data, srcScalar.BytesData.Data...) } default: - log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) return errors.New("unsupported data type: " + srcFieldData.Type.String()) } case *schemapb.FieldData_Vectors: @@ -1599,7 +1582,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error dstVector.GetVectorArray().Data = append(dstVector.GetVectorArray().Data, srcVector.VectorArray.Data...) } default: - log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) return errors.New("unsupported data type: " + srcFieldData.Type.String()) } } @@ -2011,8 +1993,6 @@ func AppendPKs(pks *schemapb.IDs, pk interface{}) { } } pks.GetStrId().Data = append(pks.GetStrId().GetData(), realPK) - default: - log.Warn("got unexpected data type of pk when append pks", zap.Any("pk", pk)) } } diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 418a9b2b7f..f9de5f2430 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -28,13 +28,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/log" ) func TestSchema(t *testing.T) { @@ -1071,7 +1069,7 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, FieldId: fieldID, } default: - log.Error("not supported field type", zap.String("field type", fieldType.String())) + fmt.Printf("not supported field type: %s", fieldType.String()) } return fieldData diff --git a/pkg/util/typeutil/version.go b/pkg/util/typeutil/version.go index 31733fcbaa..8b28f54818 100644 --- a/pkg/util/typeutil/version.go +++ b/pkg/util/typeutil/version.go @@ -1,5 +1,10 @@ package typeutil +import ( + "fmt" + "strconv" +) + // Version is a interface for version comparison. type Version interface { // GT returns true if v > v2. @@ -7,6 +12,9 @@ type Version interface { // EQ returns true if v == v2. EQ(Version) bool + + // String returns the string representation of the version. + String() string } // VersionInt64 is a int64 type version. @@ -20,6 +28,10 @@ func (v VersionInt64) EQ(v2 Version) bool { return v == mustCastVersionInt64(v2) } +func (v VersionInt64) String() string { + return strconv.FormatInt(int64(v), 10) +} + func mustCastVersionInt64(v2 Version) VersionInt64 { if v2i, ok := v2.(VersionInt64); ok { return v2i @@ -46,6 +58,10 @@ func (v VersionInt64Pair) EQ(v2 Version) bool { return v.Global == vPair.Global && v.Local == vPair.Local } +func (v VersionInt64Pair) String() string { + return fmt.Sprintf("%d/%d", v.Global, v.Local) +} + func mustCastVersionInt64Pair(v2 Version) VersionInt64Pair { if v2i, ok := v2.(VersionInt64Pair); ok { return v2i