enhance: support async write syncer for milvus logging (#45805)

issue: #45640

- log may be dropped if the underlying file system is busy.
- use async write syncer to avoid the log operation block the milvus
major system.
- remove some log dependency from the until function to avoid
dependency-loop.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-11-28 17:43:11 +08:00 committed by GitHub
parent 4f080bd3a0
commit c3fe6473b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 658 additions and 65 deletions

View File

@ -283,6 +283,13 @@ func (mr *MilvusRoles) setupLogger() {
MaxDays: params.LogCfg.MaxAge.GetAsInt(),
MaxBackups: params.LogCfg.MaxBackups.GetAsInt(),
},
AsyncWriteEnable: params.LogCfg.AsyncWriteEnable.GetAsBool(),
AsyncWriteFlushInterval: params.LogCfg.AsyncWriteFlushInterval.GetAsDurationByParse(),
AsyncWriteDroppedTimeout: params.LogCfg.AsyncWriteDroppedTimeout.GetAsDurationByParse(),
AsyncWriteStopTimeout: params.LogCfg.AsyncWriteStopTimeout.GetAsDurationByParse(),
AsyncWritePendingLength: params.LogCfg.AsyncWritePendingLength.GetAsInt(),
AsyncWriteBufferSize: int(params.LogCfg.AsyncWriteBufferSize.GetAsSize()),
AsyncWriteMaxBytesPerLog: int(params.LogCfg.AsyncWriteMaxBytesPerLog.GetAsSize()),
}
id := paramtable.GetNodeID()
roleName := paramtable.GetRole()
@ -294,6 +301,7 @@ func (mr *MilvusRoles) setupLogger() {
}
logutil.SetupLogger(&logConfig)
params.Watch(params.LogCfg.Level.Key, config.NewHandler("log.level", func(event *config.Event) {
if !event.HasUpdated || event.EventType == config.DeleteType {
return
@ -417,14 +425,6 @@ func (mr *MilvusRoles) Run() {
// init tracer before run any component
tracer.Init()
// Initialize streaming service if enabled.
if mr.ServerType == typeutil.StandaloneRole || !mr.EnableDataNode {
// only datanode does not init streaming service
streaming.Init()
defer streaming.Release()
}
enableComponents := []bool{
mr.EnableProxy,
mr.EnableQueryNode,
@ -444,6 +444,8 @@ func (mr *MilvusRoles) Run() {
expr.Init()
expr.Register("param", paramtable.Get())
mr.setupLogger()
defer log.Cleanup()
http.ServeHTTP()
setupPrometheusHTTPServer(Registry)
@ -459,6 +461,13 @@ func (mr *MilvusRoles) Run() {
}
}
// Initialize streaming service if enabled.
if mr.ServerType == typeutil.StandaloneRole || !mr.EnableDataNode {
// only datanode does not init streaming service
streaming.Init()
defer streaming.Release()
}
var wg sync.WaitGroup
local := mr.Local

View File

@ -1 +0,0 @@
package balancer

View File

@ -2,9 +2,12 @@ package discoverer
import (
"context"
"fmt"
"strings"
"google.golang.org/grpc/resolver"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/attributes"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -24,3 +27,15 @@ type VersionedState struct {
Version typeutil.Version
State resolver.State
}
func (v VersionedState) String() string {
as := make([]string, 0, len(v.State.Addresses))
for _, addr := range v.State.Addresses {
if nodeID := attributes.GetServerID(addr.Attributes); nodeID != nil {
as = append(as, fmt.Sprintf("%d@%s", *nodeID, addr.Addr))
continue
}
as = append(as, addr.Addr)
}
return fmt.Sprintf("Version: %s, Addrs: %s", v.Version, strings.Join(as, ","))
}

View File

@ -182,11 +182,11 @@ func (r *resolverWithDiscoverer) doDiscover() {
latestState := r.getLatestState()
if latestState != nil && !state.Version.GT(latestState.Version) {
// Ignore the old version.
r.Logger().Info("service discover update, ignore old version", zap.Any("state", state))
r.Logger().Info("service discover update, ignore old version", zap.Stringer("state", state))
continue
}
// Update all grpc resolver.
r.Logger().Info("service discover update, update resolver", zap.Any("state", state), zap.Int("resolver_count", len(grpcResolvers)))
r.Logger().Info("service discover update, update resolver", zap.Stringer("state", state), zap.Int("resolver_count", len(grpcResolvers)))
for watcher := range grpcResolvers {
// Update operation do not block.
// Only return error if the resolver is closed, so just print a info log and delete the resolver.

View File

@ -51,10 +51,10 @@ func (r *watchBasedGRPCResolver) Update(state VersionedState) error {
if err := r.cc.UpdateState(state.State); err != nil {
// watch based resolver could ignore the error, just log and return nil
r.Logger().Warn("fail to update resolver state", zap.Any("state", state.State), zap.Error(err))
r.Logger().Warn("fail to update resolver state", zap.Stringer("state", state), zap.Error(err))
return nil
}
r.Logger().Info("update resolver state success", zap.Any("state", state.State))
r.Logger().Info("update resolver state success", zap.Stringer("state", state))
return nil
}

View File

@ -28,11 +28,9 @@ import (
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/twpayne/go-geom/encoding/wkbcommon"
"github.com/twpayne/go-geom/encoding/wkt"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"
)
// system field id:
@ -300,7 +298,6 @@ func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
return param.Value
}
}
log.Warn("IndexType not found in indexParams")
return ""
}
@ -485,7 +482,6 @@ func GetCollectionLoadFields(schema *schemapb.CollectionSchema, skipDynamicField
v, err := ShouldFieldBeLoaded(field.GetTypeParams())
if err != nil {
log.Warn("type param parse skip load failed", zap.Error(err))
// if configuration cannot be parsed, ignore it and load field
return field.GetFieldID(), true
}
@ -535,7 +531,6 @@ func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) {
if kv.GetKey() == ReplicateEndTSKey {
ts, err := strconv.ParseUint(kv.GetValue(), 10, 64)
if err != nil {
log.Warn("parse replicate end ts failed", zap.Error(err), zap.Stack("stack"))
return 0, false
}
return ts, true

View File

@ -22,9 +22,7 @@ package common
import (
"context"
"fmt"
"github.com/milvus-io/milvus/pkg/v2/log"
"log"
)
type MockTestingT struct {
@ -38,15 +36,15 @@ func NewEmptyMockT() *MockTestingT {
}
func (m *MockTestingT) Logf(format string, args ...interface{}) {
log.Ctx(m.ctx).Info(fmt.Sprintf(format, args...))
log.Printf(format, args...)
}
func (m *MockTestingT) Errorf(format string, args ...interface{}) {
log.Ctx(m.ctx).Error(fmt.Sprintf(format, args...))
log.Printf(format, args...)
}
func (m *MockTestingT) FailNow() {
log.Ctx(m.ctx).Panic("FailNow called")
log.Println("FailNow called")
}
func (m *MockTestingT) Cleanup(func()) {}

View File

@ -71,6 +71,30 @@ type Config struct {
//
// Values configured here are per-second. See zapcore.NewSampler for details.
Sampling *zap.SamplingConfig `toml:"sampling" json:"sampling"`
// AsyncWriteEnable enables async write for the logger.
AsyncWriteEnable bool `toml:"async-write-enable" json:"async-write-enable"`
// AsyncWriteFlushInterval is the interval to flush the logs
AsyncWriteFlushInterval time.Duration `toml:"async-write-flush-interval" json:"async-write-flush-interval"`
// AsyncWriteDroppedTimeout is the timeout to drop the write request if the buffer is full
AsyncWriteDroppedTimeout time.Duration `toml:"async-write-dropped-timeout" json:"async-write-dropped-timeout"`
// AsyncWriteNonDroppableLevel is the level that will not be dropped when the buffer is full
AsyncWriteNonDroppableLevel string `toml:"async-write-non-droppable-level" json:"async-write-non-droppable-level"`
// AsyncWriteStopTimeout is the timeout to stop the async write
AsyncWriteStopTimeout time.Duration `toml:"async-write-stop-timeout" json:"async-write-stop-timeout"`
// AsyncWritePendingLength is the maximum number of pending write requests, the exceeded log operation will be dropped
AsyncWritePendingLength int `toml:"async-write-pending-length" json:"async-write-pending-length"`
// AsyncWriteBufferSize is the size of the write buffer
AsyncWriteBufferSize int `toml:"async-write-buffer-size" json:"async-write-buffer-size"`
// AsyncWriteMaxBytesPerLog is the max bytes per log
AsyncWriteMaxBytesPerLog int `toml:"async-write-max-bytes-per-log" json:"async-write-max-bytes-per-log"`
}
// ZapProperties records some information about zap.
@ -110,3 +134,28 @@ func (cfg *Config) buildOptions(errSink zapcore.WriteSyncer) []zap.Option {
}
return opts
}
// initialize initializes the config.
func (cfg *Config) initialize() {
if cfg.AsyncWriteFlushInterval <= 0 {
cfg.AsyncWriteFlushInterval = 10 * time.Second
}
if cfg.AsyncWriteDroppedTimeout <= 0 {
cfg.AsyncWriteDroppedTimeout = 100 * time.Millisecond
}
if _, err := zapcore.ParseLevel(cfg.AsyncWriteNonDroppableLevel); cfg.AsyncWriteNonDroppableLevel == "" || err != nil {
cfg.AsyncWriteNonDroppableLevel = zapcore.ErrorLevel.String()
}
if cfg.AsyncWriteStopTimeout <= 0 {
cfg.AsyncWriteStopTimeout = 1 * time.Second
}
if cfg.AsyncWritePendingLength <= 0 {
cfg.AsyncWritePendingLength = 1024
}
if cfg.AsyncWriteBufferSize <= 0 {
cfg.AsyncWriteBufferSize = 1024 * 1024
}
if cfg.AsyncWriteMaxBytesPerLog <= 0 {
cfg.AsyncWriteMaxBytesPerLog = 1024 * 1024
}
}

View File

@ -45,7 +45,7 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
)
var _globalL, _globalP, _globalS, _globalR atomic.Value
var _globalL, _globalP, _globalS, _globalR, _globalCleanup atomic.Value
var (
_globalLevelLogger sync.Map
@ -122,7 +122,13 @@ func InitLoggerWithWriteSyncer(cfg *Config, output zapcore.WriteSyncer, opts ...
if err != nil {
return nil, nil, fmt.Errorf("initLoggerWithWriteSyncer UnmarshalText cfg.Level err:%w", err)
}
core := NewTextCore(newZapTextEncoder(cfg), output, level)
var core zapcore.Core
if cfg.AsyncWriteEnable {
core = NewAsyncTextIOCore(cfg, output, level)
registerCleanup(core.(*asyncTextIOCore).Stop)
} else {
core = NewTextCore(newZapTextEncoder(cfg), output, level)
}
opts = append(cfg.buildOptions(output), opts...)
lg := zap.New(core, opts...)
r := &ZapProperties{
@ -212,6 +218,14 @@ func fatalL() *zap.Logger {
return v.(*zap.Logger)
}
// Cleanup cleans up the global logger and sugared logger.
func Cleanup() {
cleanup := _globalCleanup.Load()
if cleanup != nil {
cleanup.(func())()
}
}
// ReplaceGlobals replaces the global Logger and SugaredLogger.
// It's safe for concurrent use.
func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) {
@ -220,6 +234,14 @@ func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) {
_globalP.Store(props)
}
// registerCleanup registers a cleanup function to be called when the global logger is cleaned up.
func registerCleanup(cleanup func()) {
oldCleanup := _globalCleanup.Swap(cleanup)
if oldCleanup != nil {
oldCleanup.(func())()
}
}
func replaceLeveledLoggers(debugLogger *zap.Logger) {
levels := []zapcore.Level{
zapcore.DebugLevel, zapcore.InfoLevel, zapcore.WarnLevel, zapcore.ErrorLevel,

View File

@ -0,0 +1,232 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log
import (
"fmt"
"time"
"go.uber.org/zap/buffer"
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
// asyncTextIOCore is a wrapper around the textIOCore that writes the logs to the underlying buffered write syncer.
var _ zapcore.Core = (*asyncTextIOCore)(nil)
// NewAsyncTextIOCore creates a new async text IO core.
func NewAsyncTextIOCore(cfg *Config, ws zapcore.WriteSyncer, enab zapcore.LevelEnabler) *asyncTextIOCore {
enc := newZapTextEncoder(cfg)
bws := &zapcore.BufferedWriteSyncer{
WS: ws,
Size: cfg.AsyncWriteBufferSize,
FlushInterval: cfg.AsyncWriteFlushInterval,
}
nonDroppableLevel, _ := zapcore.ParseLevel(cfg.AsyncWriteNonDroppableLevel)
asyncTextIOCore := &asyncTextIOCore{
LevelEnabler: enab,
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
enc: enc,
bws: bws,
pending: make(chan *entryItem, cfg.AsyncWritePendingLength),
writeDroppedTimeout: cfg.AsyncWriteDroppedTimeout,
nonDroppableLevel: nonDroppableLevel,
stopTimeout: cfg.AsyncWriteStopTimeout,
maxBytesPerLog: cfg.AsyncWriteMaxBytesPerLog,
}
go asyncTextIOCore.background()
return asyncTextIOCore
}
// asyncTextIOCore is a wrapper around the textIOCore that writes the logs to the underlying buffered write syncer.
type asyncTextIOCore struct {
zapcore.LevelEnabler
notifier *syncutil.AsyncTaskNotifier[struct{}]
enc zapcore.Encoder
bws *zapcore.BufferedWriteSyncer
pending chan *entryItem // the incoming new write requests
writeDroppedTimeout time.Duration
nonDroppableLevel zapcore.Level
stopTimeout time.Duration
maxBytesPerLog int
}
// entryItem is the item to write to the underlying buffered write syncer.
type entryItem struct {
buf *buffer.Buffer
level zapcore.Level
}
// With returns a copy of the Core with the given fields added.
func (s *asyncTextIOCore) With(fields []zapcore.Field) zapcore.Core {
enc := s.enc.Clone()
switch e := enc.(type) {
case *textEncoder:
e.addFields(fields)
case zapcore.ObjectEncoder:
for _, field := range fields {
field.AddTo(e)
}
default:
panic(fmt.Sprintf("unsupported encode type: %T for With operation", enc))
}
return &asyncTextIOCore{
LevelEnabler: s.LevelEnabler,
notifier: s.notifier,
enc: s.enc.Clone(),
bws: s.bws,
pending: s.pending,
writeDroppedTimeout: s.writeDroppedTimeout,
stopTimeout: s.stopTimeout,
maxBytesPerLog: s.maxBytesPerLog,
}
}
// Check checks if the entry is enabled by the level enabler.
func (s *asyncTextIOCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if s.Enabled(ent.Level) {
return ce.AddCore(ent, s)
}
return ce
}
// Write writes the underlying buffered write syncer and buffers the writes in a channel.
// asyncTextIOCore doesn't promise the write operation is done.
// the write operation will be dropped if the buffer is full or the underlying buffered write syncer is blocked.
func (s *asyncTextIOCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
buf, err := s.enc.EncodeEntry(ent, fields)
if err != nil {
return err
}
entry := &entryItem{
buf: buf,
level: ent.Level,
}
length := buf.Len()
if length == 0 {
return nil
}
var writeDroppedTimeout <-chan time.Time
if ent.Level < s.nonDroppableLevel {
writeDroppedTimeout = time.After(s.writeDroppedTimeout)
}
select {
case s.pending <- entry:
metrics.LoggingPendingWriteLength.Inc()
metrics.LoggingPendingWriteBytes.Add(float64(length))
case <-writeDroppedTimeout:
metrics.LoggingDroppedWrites.Inc()
// drop the entry if the write is dropped due to timeout
buf.Free()
}
return nil
}
// Sync syncs the underlying buffered write syncer.
func (s *asyncTextIOCore) Sync() error {
return nil
}
// background is the background goroutine to write the logs to the underlying buffered write syncer.
func (s *asyncTextIOCore) background() {
defer func() {
s.flushPendingWriteWithTimeout()
s.notifier.Finish(struct{}{})
}()
for {
select {
case <-s.notifier.Context().Done():
return
case ent := <-s.pending:
s.consumeEntry(ent)
}
}
}
// consumeEntry write the entry to the underlying buffered write syncer and free the buffer.
func (s *asyncTextIOCore) consumeEntry(ent *entryItem) {
length := ent.buf.Len()
metrics.LoggingPendingWriteLength.Dec()
metrics.LoggingPendingWriteBytes.Sub(float64(length))
writes := s.getWriteBytes(ent)
if _, err := s.bws.Write(writes); err != nil {
metrics.LoggingIOFailure.Inc()
}
ent.buf.Free()
if ent.level > zapcore.ErrorLevel {
s.bws.Sync()
}
}
// getWriteBytes gets the bytes to write to the underlying buffered write syncer.
// if the length of the write exceeds the max bytes per log, it will truncate the write and return the truncated bytes.
// otherwise, it will return the original bytes.
func (s *asyncTextIOCore) getWriteBytes(ent *entryItem) []byte {
length := ent.buf.Len()
writes := ent.buf.Bytes()
if length > s.maxBytesPerLog {
// truncate the write if it exceeds the max bytes per log
metrics.LoggingTruncatedWrites.Inc()
metrics.LoggingTruncatedWriteBytes.Add(float64(length - s.maxBytesPerLog))
end := writes[length-1]
writes = writes[:s.maxBytesPerLog]
writes[len(writes)-1] = end
}
return writes
}
// flushPendingWriteWithTimeout flushes the pending write with a timeout.
func (s *asyncTextIOCore) flushPendingWriteWithTimeout() {
done := make(chan struct{})
go s.flushAllPendingWrites(done)
select {
case <-time.After(s.stopTimeout):
case <-done:
}
}
// flushAllPendingWrites flushes all the pending writes to the underlying buffered write syncer.
func (s *asyncTextIOCore) flushAllPendingWrites(done chan struct{}) {
defer func() {
if err := s.bws.Stop(); err != nil {
metrics.LoggingIOFailure.Inc()
}
close(done)
}()
for {
select {
case ent := <-s.pending:
s.consumeEntry(ent)
default:
return
}
}
}
func (s *asyncTextIOCore) Stop() {
s.notifier.Cancel()
s.notifier.BlockUntilFinish()
}

View File

@ -0,0 +1,86 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log
import (
"io"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func TestAsyncBufferedWriteSyncer(t *testing.T) {
blockWriter := &blockWriter{
Writer: os.Stdout,
}
syncer := NewAsyncTextIOCore(
&Config{
AsyncWriteEnable: true,
AsyncWriteFlushInterval: 1 * time.Second,
AsyncWriteDroppedTimeout: 100 * time.Millisecond,
AsyncWriteStopTimeout: 1 * time.Second,
AsyncWritePendingLength: 100,
AsyncWriteBufferSize: 5,
AsyncWriteMaxBytesPerLog: 2,
},
zapcore.AddSync(blockWriter),
zap.DebugLevel,
)
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
syncer.Write(zapcore.Entry{
Level: zap.DebugLevel,
Message: "test",
}, []zapcore.Field{
zap.String("test", "test"),
zap.Int("test", 1),
})
wg.Done()
}()
}
syncer.Sync()
wg.Wait()
syncer.Stop()
}
type blockWriter struct {
io.Writer
}
func (s *blockWriter) Write(p []byte) (n int, err error) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(10) == 0 {
return 0, errors.New("write error")
}
return s.Writer.Write(p)
}
func (s *blockWriter) Sync() error {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(10) == 0 {
return errors.New("sync error")
}
return nil
}

View File

@ -385,6 +385,7 @@ func registerDataNodeOnce(registry *prometheus.Registry) {
registry.MustRegister(DataNodeBuildIndexLatency)
registry.MustRegister(DataNodeBuildJSONStatsLatency)
registry.MustRegister(DataNodeSlot)
RegisterLoggingMetrics(registry)
}
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {

View File

@ -0,0 +1,85 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
const (
loggingMetricSubsystem = "logging"
)
var (
LoggingMetricsRegisterOnce sync.Once
LoggingPendingWriteLength = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "pending_write_length",
Help: "The length of pending writes in the logging buffer",
})
LoggingPendingWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "pending_write_bytes",
Help: "The total bytes of pending writes in the logging buffer",
})
LoggingTruncatedWrites = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "truncated_writes",
Help: "The number of truncated writes due to exceeding the max bytes per log",
})
LoggingTruncatedWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "truncated_write_bytes",
Help: "The total bytes of truncated writes due to exceeding the max bytes per log",
})
LoggingDroppedWrites = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "dropped_writes",
Help: "The number of dropped writes due to buffer full or write timeout",
})
LoggingIOFailure = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: loggingMetricSubsystem,
Name: "io_failures",
Help: "The number of IO failures due to underlying write syncer is blocked or write timeout",
})
)
// RegisterLoggingMetrics registers logging metrics
func RegisterLoggingMetrics(registry *prometheus.Registry) {
LoggingMetricsRegisterOnce.Do(func() {
registry.MustRegister(LoggingPendingWriteLength)
registry.MustRegister(LoggingPendingWriteBytes)
registry.MustRegister(LoggingTruncatedWrites)
registry.MustRegister(LoggingTruncatedWriteBytes)
registry.MustRegister(LoggingDroppedWrites)
registry.MustRegister(LoggingIOFailure)
})
}

View File

@ -38,6 +38,7 @@ func TestRegisterMetrics(t *testing.T) {
RegisterCGOMetrics(r)
RegisterStreamingServiceClient(r)
RegisterStreamingNode(r)
RegisterLoggingMetrics(r)
})
}

View File

@ -553,6 +553,7 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyScannedRemoteMB)
registry.MustRegister(ProxyScannedTotalMB)
RegisterStreamingServiceClient(registry)
RegisterLoggingMetrics(registry)
}
func CleanupProxyDBMetrics(nodeID int64, dbName string) {

View File

@ -902,6 +902,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
RegisterCGOMetrics(registry)
RegisterStreamingServiceClient(registry)
RegisterLoggingMetrics(registry)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -290,6 +290,7 @@ func RegisterMixCoord(registry *prometheus.Registry) {
RegisterStreamingServiceClient(registry)
RegisterQueryCoord(registry)
RegisterDataCoord(registry)
RegisterLoggingMetrics(registry)
}
func CleanupRootCoordDBMetrics(dbName string) {

View File

@ -500,6 +500,7 @@ func RegisterStreamingNode(registry *prometheus.Registry) {
registry.MustRegister(StreamingNodeConsumeBytes)
registerWAL(registry)
RegisterLoggingMetrics(registry)
// TODO: after remove the implementation of old data node
// Such as flowgraph and writebuffer, we can remove these metrics from streaming node.

View File

@ -1513,14 +1513,22 @@ func (t *holmesConfig) init(base *BaseTable) {
}
type logConfig struct {
Level ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
MaxSize ParamItem `refreshable:"false"`
MaxAge ParamItem `refreshable:"false"`
MaxBackups ParamItem `refreshable:"false"`
Format ParamItem `refreshable:"false"`
Stdout ParamItem `refreshable:"false"`
GrpcLogLevel ParamItem `refreshable:"false"`
Level ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
MaxSize ParamItem `refreshable:"false"`
MaxAge ParamItem `refreshable:"false"`
MaxBackups ParamItem `refreshable:"false"`
Format ParamItem `refreshable:"false"`
Stdout ParamItem `refreshable:"false"`
GrpcLogLevel ParamItem `refreshable:"false"`
AsyncWriteEnable ParamItem `refreshable:"false"`
AsyncWriteFlushInterval ParamItem `refreshable:"false"`
AsyncWriteDroppedTimeout ParamItem `refreshable:"false"`
AsyncWriteNonDroppableLevel ParamItem `refreshable:"false"`
AsyncWriteStopTimeout ParamItem `refreshable:"false"`
AsyncWritePendingLength ParamItem `refreshable:"false"`
AsyncWriteBufferSize ParamItem `refreshable:"false"`
AsyncWriteMaxBytesPerLog ParamItem `refreshable:"false"`
}
func (l *logConfig) init(base *BaseTable) {
@ -1597,6 +1605,94 @@ Set this parameter as the path that you have permission to write.`,
Export: true,
}
l.GrpcLogLevel.Init(base.mgr)
l.AsyncWriteEnable = ParamItem{
Key: "log.asyncWrite.enable",
DefaultValue: "true",
Version: "2.6.7",
Doc: "Enable async write for the logger.",
Export: false,
}
l.AsyncWriteEnable.Init(base.mgr)
l.AsyncWriteFlushInterval = ParamItem{
Key: "log.asyncWrite.flushInterval",
DefaultValue: "10s",
Version: "2.6.7",
Doc: `The interval to flush the logs.
Lower the interval, More frequent the flush will be applied,
Faster the logging writes can be seen by the underlying file system.`,
Export: false,
}
l.AsyncWriteFlushInterval.Init(base.mgr)
l.AsyncWriteDroppedTimeout = ParamItem{
Key: "log.asyncWrite.droppedTimeout",
DefaultValue: "100ms",
Version: "2.6.7",
Doc: `The timeout to drop the write operation if the buffer is full.
Once the underlying buffered writer is blocked or too slow and
the pending length is larger than the pending length threshold,
the new incoming write operation will be dropped if it exceeds the timeout.`,
Export: false,
}
l.AsyncWriteDroppedTimeout.Init(base.mgr)
l.AsyncWriteNonDroppableLevel = ParamItem{
Key: "log.asyncWrite.nonDroppableLevel",
DefaultValue: "error",
Version: "2.6.7",
Doc: `The level that will not be dropped when the buffer is full.
Once the level is greater or equal to the non-droppable level,
the write operation will not be dropped because the buffer is full`,
Export: false,
}
l.AsyncWriteNonDroppableLevel.Init(base.mgr)
l.AsyncWriteStopTimeout = ParamItem{
Key: "log.asyncWrite.stopTimeout",
DefaultValue: "1s",
Version: "2.6.7",
Doc: `The timeout to stop the async write.
When the milvus is on shutdown,
the async writer of logger will try to flush all the pending write operations
to the underlying file system until reaching the timeout.`,
Export: false,
}
l.AsyncWriteStopTimeout.Init(base.mgr)
l.AsyncWritePendingLength = ParamItem{
Key: "log.asyncWrite.pendingLength",
DefaultValue: "1024",
Version: "2.6.7",
Doc: `The maximum number of pending write operation.
Once the underlying buffered writer is blocked or too slow,
the pending write operation will be cached in the buffer.
The larger the pending length, the more memory is used, the less logging writes will be dropped.`,
Export: false,
}
l.AsyncWritePendingLength.Init(base.mgr)
l.AsyncWriteBufferSize = ParamItem{
Key: "log.asyncWrite.bufferSize",
DefaultValue: "1m",
Version: "2.6.7",
Doc: `The buffer size of the underlying bufio writer.
The larger the buffer size, the more memory is used,
but the less the number of writes to the underlying file system.`,
Export: false,
}
l.AsyncWriteBufferSize.Init(base.mgr)
l.AsyncWriteMaxBytesPerLog = ParamItem{
Key: "log.asyncWrite.maxBytesPerLog",
DefaultValue: "1m",
Version: "2.6.7",
Doc: `The max bytes per log.
Once the log message exceeds the max bytes per log, it will be truncated.`,
Export: false,
}
l.AsyncWriteMaxBytesPerLog.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -155,6 +155,18 @@ func TestComponentParam(t *testing.T) {
params.Save("common.clusterID", "0")
})
t.Run("test logConfig", func(t *testing.T) {
Params := &params.LogCfg
assert.True(t, Params.AsyncWriteEnable.GetAsBool())
assert.Equal(t, 10*time.Second, Params.AsyncWriteFlushInterval.GetAsDurationByParse())
assert.Equal(t, 100*time.Millisecond, Params.AsyncWriteDroppedTimeout.GetAsDurationByParse())
assert.Equal(t, "error", Params.AsyncWriteNonDroppableLevel.GetValue())
assert.Equal(t, 1*time.Second, Params.AsyncWriteStopTimeout.GetAsDurationByParse())
assert.Equal(t, 1024, Params.AsyncWritePendingLength.GetAsInt())
assert.Equal(t, int64(1024*1024), Params.AsyncWriteBufferSize.GetAsSize())
assert.Equal(t, int64(1024*1024), Params.AsyncWriteMaxBytesPerLog.GetAsSize())
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := &params.RootCoordCfg

View File

@ -23,11 +23,9 @@ import (
"reflect"
"github.com/x448/float16"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
)
// Generic Clone for proto message
@ -102,7 +100,6 @@ func Uint64ToBytes(v uint64) []byte {
// SliceRemoveDuplicate is used to dedup a Slice
func SliceRemoveDuplicate(a interface{}) (ret []interface{}) {
if reflect.TypeOf(a).Kind() != reflect.Slice {
log.Error("input is not slice", zap.String("inputType", fmt.Sprintf("%T", a)))
return ret
}

View File

@ -17,14 +17,12 @@
package typeutil
import (
"fmt"
"math"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
)
func TestConversion(t *testing.T) {
@ -104,7 +102,7 @@ func TestConversion(t *testing.T) {
v := (rand.Float32() - 0.5) * 100
b := Float32ToFloat16Bytes(v)
v2 := Float16BytesToFloat32(b)
log.Info("float16", zap.Float32("v", v), zap.Float32("v2", v2))
fmt.Printf("float16 v: %f, v2: %f", v, v2)
assert.Less(t, math.Abs(float64(v2/v-1)), 0.001)
}
})
@ -114,7 +112,7 @@ func TestConversion(t *testing.T) {
v := (rand.Float32() - 0.5) * 100
b := Float32ToBFloat16Bytes(v)
v2 := BFloat16BytesToFloat32(b)
log.Info("bfloat16", zap.Float32("v", v), zap.Float32("v2", v2))
fmt.Printf("bfloat16 v: %f, v2: %f", v, v2)
assert.Less(t, math.Abs(float64(v2/v-1)), 0.01)
}
})

View File

@ -31,12 +31,10 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
)
const DynamicFieldMaxLength = 512
@ -501,7 +499,6 @@ func (helper *SchemaHelper) getDefaultJSONField(fieldName string) (*schemapb.Fie
}
}
errMsg := fmt.Sprintf("field %s not exist", fieldName)
log.Warn(errMsg)
return nil, fmt.Errorf("%s", errMsg)
}
@ -990,8 +987,6 @@ func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int6
} else {
dstScalar.GetGeometryWktData().Data = append(dstScalar.GetGeometryWktData().Data, srcScalar.GeometryWktData.Data[idx])
}
default:
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
}
case *schemapb.FieldData_Vectors:
dim := fieldType.Vectors.Dim
@ -1094,8 +1089,6 @@ func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int6
} else {
dstVector.GetVectorArray().Data = append(dstVector.GetVectorArray().Data, srcVector.VectorArray.Data[idx])
}
default:
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
}
}
}
@ -1109,7 +1102,6 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
switch fieldType := fieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
if dst[i] == nil || dst[i].GetScalars() == nil {
log.Info("empty field data can't be deleted")
return
}
dstScalar := dst[i].GetScalars()
@ -1132,12 +1124,9 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
dstScalar.GetJsonData().Data = dstScalar.GetJsonData().Data[:len(dstScalar.GetJsonData().Data)-1]
case *schemapb.ScalarField_GeometryData:
dstScalar.GetGeometryData().Data = dstScalar.GetGeometryData().Data[:len(dstScalar.GetGeometryData().Data)-1]
default:
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
}
case *schemapb.FieldData_Vectors:
if dst[i] == nil || dst[i].GetVectors() == nil {
log.Info("empty field data can't be deleted")
return
}
dim := fieldType.Vectors.Dim
@ -1159,8 +1148,6 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
case *schemapb.VectorField_Int8Vector:
dstInt8Vector := dstVector.Data.(*schemapb.VectorField_Int8Vector)
dstInt8Vector.Int8Vector = dstInt8Vector.Int8Vector[:len(dstInt8Vector.Int8Vector)-int(dim)]
default:
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
}
}
}
@ -1297,7 +1284,6 @@ func UpdateFieldData(base, update []*schemapb.FieldData, baseIdx, updateIdx int6
baseData.Data[baseIdx] = updateData.Data[updateIdx]
}
default:
log.Error("Not supported scalar field type", zap.String("field type", baseFieldData.Type.String()))
return fmt.Errorf("unsupported scalar field type: %s", baseFieldData.Type.String())
}
@ -1379,11 +1365,9 @@ func UpdateFieldData(base, update []*schemapb.FieldData, baseIdx, updateIdx int6
}
}
default:
log.Error("Not supported vector field type", zap.String("field type", baseFieldData.Type.String()))
return fmt.Errorf("unsupported vector field type: %s", baseFieldData.Type.String())
}
default:
log.Error("Not supported field type", zap.String("field type", baseFieldData.Type.String()))
return fmt.Errorf("unsupported field type: %s", baseFieldData.Type.String())
}
}
@ -1523,7 +1507,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error
dstScalar.GetBytesData().Data = append(dstScalar.GetBytesData().Data, srcScalar.BytesData.Data...)
}
default:
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
return errors.New("unsupported data type: " + srcFieldData.Type.String())
}
case *schemapb.FieldData_Vectors:
@ -1599,7 +1582,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error
dstVector.GetVectorArray().Data = append(dstVector.GetVectorArray().Data, srcVector.VectorArray.Data...)
}
default:
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
return errors.New("unsupported data type: " + srcFieldData.Type.String())
}
}
@ -2011,8 +1993,6 @@ func AppendPKs(pks *schemapb.IDs, pk interface{}) {
}
}
pks.GetStrId().Data = append(pks.GetStrId().GetData(), realPK)
default:
log.Warn("got unexpected data type of pk when append pks", zap.Any("pk", pk))
}
}

View File

@ -28,13 +28,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
)
func TestSchema(t *testing.T) {
@ -1071,7 +1069,7 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType,
FieldId: fieldID,
}
default:
log.Error("not supported field type", zap.String("field type", fieldType.String()))
fmt.Printf("not supported field type: %s", fieldType.String())
}
return fieldData

View File

@ -1,5 +1,10 @@
package typeutil
import (
"fmt"
"strconv"
)
// Version is a interface for version comparison.
type Version interface {
// GT returns true if v > v2.
@ -7,6 +12,9 @@ type Version interface {
// EQ returns true if v == v2.
EQ(Version) bool
// String returns the string representation of the version.
String() string
}
// VersionInt64 is a int64 type version.
@ -20,6 +28,10 @@ func (v VersionInt64) EQ(v2 Version) bool {
return v == mustCastVersionInt64(v2)
}
func (v VersionInt64) String() string {
return strconv.FormatInt(int64(v), 10)
}
func mustCastVersionInt64(v2 Version) VersionInt64 {
if v2i, ok := v2.(VersionInt64); ok {
return v2i
@ -46,6 +58,10 @@ func (v VersionInt64Pair) EQ(v2 Version) bool {
return v.Global == vPair.Global && v.Local == vPair.Local
}
func (v VersionInt64Pair) String() string {
return fmt.Sprintf("%d/%d", v.Global, v.Local)
}
func mustCastVersionInt64Pair(v2 Version) VersionInt64Pair {
if v2i, ok := v2.(VersionInt64Pair); ok {
return v2i