diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 7d525ead6c..d56cf9d624 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -3,10 +3,12 @@ package etcdkv import ( "context" "fmt" - "log" "path" "time" + "github.com/zilliztech/milvus-distributed/internal/log" + "go.uber.org/zap" + "go.etcd.io/etcd/clientv3" ) @@ -37,7 +39,7 @@ func (kv *EtcdKV) GetPath(key string) string { func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { key = path.Join(kv.rootPath, key) - log.Printf("LoadWithPrefix %s", key) + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) @@ -101,12 +103,12 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { result = append(result, "") } for _, ev := range rp.GetResponseRange().Kvs { - log.Printf("MultiLoad: %s -> %s\n", string(ev.Key), string(ev.Value)) + log.Debug("MultiLoad", zap.ByteString("key", ev.Key), zap.ByteString("value", ev.Value)) result = append(result, string(ev.Value)) } } if len(invalid) != 0 { - log.Printf("MultiLoad: there are invalid keys: %s", invalid) + log.Debug("MultiLoad: there are invalid keys", zap.Strings("keys", invalid)) err = fmt.Errorf("there are invalid keys: %s", invalid) return result, err } @@ -175,7 +177,7 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } - log.Printf("MultiSaveAndRemove") + log.Debug("MultiSaveAndRemove") ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -201,7 +203,7 @@ func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error { op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix()) ops = append(ops, op) } - log.Printf("MultiRemoveWithPrefix") + log.Debug("MultiRemoveWithPrefix") ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -219,7 +221,7 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) } - log.Printf("MultiSaveAndRemove") + log.Debug("MultiSaveAndRemove") ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go index b833cac31b..5716a485c1 100644 --- a/internal/timesync/timetick_watcher.go +++ b/internal/timesync/timetick_watcher.go @@ -2,8 +2,10 @@ package timesync import ( "context" - "log" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -33,14 +35,14 @@ func (watcher *MsgTimeTickWatcher) StartBackgroundLoop(ctx context.Context) { for { select { case <-ctx.Done(): - log.Println("msg time tick watcher closed") + log.Debug("msg time tick watcher closed") return case msg := <-watcher.msgQueue: msgPack := &ms.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, msg) for _, stream := range watcher.streams { if err := stream.Broadcast(ctx, msgPack); err != nil { - log.Printf("stream broadcast failed %s", err.Error()) + log.Warn("stream broadcast failed", zap.Error(err)) } } }