From aa5ecbdc02cf16039f6316080526020a24b2ac08 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 8 Mar 2021 19:39:36 +0800 Subject: [PATCH] Use the project log instead of standard log of proxyservice and proxynode Signed-off-by: cai.zhang --- internal/proxynode/condition.go | 1 - internal/proxynode/impl.go | 42 +++++++++++----------- internal/proxynode/insert_channels.go | 11 +++--- internal/proxynode/paramtable.go | 50 +++++++++++++++++++++++---- internal/proxynode/proxy_node.go | 44 +++++++++++------------ internal/proxynode/repack_func.go | 9 ++--- internal/proxynode/segment.go | 16 ++++----- internal/proxynode/task.go | 22 ++++++------ internal/proxynode/task_scheduler.go | 33 +++++++----------- internal/proxynode/timetick.go | 18 +++++----- internal/proxynode/util.go | 32 ----------------- internal/proxyservice/impl.go | 41 ++++++++++------------ internal/proxyservice/node_info.go | 9 ++--- internal/proxyservice/paramtable.go | 44 +++++++++++++++++++++-- internal/proxyservice/task_queue.go | 4 +-- internal/proxyservice/timesync.go | 21 +++++------ internal/proxyservice/timetick.go | 14 ++++---- 17 files changed, 214 insertions(+), 197 deletions(-) delete mode 100644 internal/proxynode/util.go diff --git a/internal/proxynode/condition.go b/internal/proxynode/condition.go index 3c280ab400..ac3327606c 100644 --- a/internal/proxynode/condition.go +++ b/internal/proxynode/condition.go @@ -2,7 +2,6 @@ package proxynode import ( "context" - "errors" ) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index cb30672af0..3b717c4fe0 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -2,12 +2,13 @@ package proxynode import ( "context" - "log" + "errors" "strconv" "time" - "errors" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -35,7 +36,7 @@ func (node *ProxyNode) InvalidateCollectionMetaCache(ctx context.Context, reques } func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - log.Println("create collection: ", request) + log.Debug("create collection...") cct := &CreateCollectionTask{ ctx: ctx, @@ -65,7 +66,7 @@ func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.C } func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - log.Println("drop collection: ", request) + log.Debug("drop collection... ") dct := &DropCollectionTask{ ctx: ctx, @@ -94,7 +95,7 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro } func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - log.Println("has collection: ", request) + log.Debug("has collection... ") hct := &HasCollectionTask{ ctx: ctx, @@ -127,7 +128,7 @@ func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasC } func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) { - log.Println("load collection: ", request) + log.Debug("load collection...") //ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) //defer cancel() @@ -158,7 +159,7 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa } func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) { - log.Println("release collection: ", request) + log.Debug("release collection...") rct := &ReleaseCollectionTask{ ctx: ctx, @@ -187,7 +188,7 @@ func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb. } func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - log.Println("describe collection: ", request) + log.Debug("describe collection...") dct := &DescribeCollectionTask{ ctx: ctx, @@ -220,7 +221,7 @@ func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb } func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) { - log.Println("get collection statistics") + log.Debug("get collection statistics...") g := &GetCollectionsStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -252,7 +253,7 @@ func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *mil } func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - log.Println("show collections") + log.Debug("show collections...") sct := &ShowCollectionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -284,7 +285,7 @@ func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.Sh } func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - log.Println("create partition", request) + log.Debug("create partition...") cpt := &CreatePartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -311,7 +312,7 @@ func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.Cr } func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - log.Println("drop partition: ", request) + log.Debug("drop partition...") dpt := &DropPartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -339,7 +340,7 @@ func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.Drop } func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - log.Println("has partition: ", request) + log.Debug("has partition...") hpt := &HasPartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -373,7 +374,7 @@ func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPa } func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) { - log.Println("load partitions: ", request) + log.Debug("load partitions...") lpt := &LoadPartitionTask{ ctx: ctx, @@ -402,7 +403,7 @@ func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.Loa } func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) { - log.Println("load partitions: ", request) + log.Debug("load partitions...") rpt := &ReleasePartitionTask{ ctx: ctx, @@ -435,7 +436,7 @@ func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milv } func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - log.Println("show partitions: ", request) + log.Debug("show partitions...") spt := &ShowPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -468,7 +469,7 @@ func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.Sho } func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - log.Println("create index for: ", request) + log.Debug("create index for...") cit := &CreateIndexTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -496,7 +497,7 @@ func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.Create } func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - log.Println("Describe index for: ", request) + log.Debug("Describe index for...") dit := &DescribeIndexTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -528,7 +529,7 @@ func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.Desc } func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - log.Println("Drop index for: ", request) + log.Debug("Drop index for...") dit := &DropIndexTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -553,7 +554,6 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde } func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) { - // log.Println("Describe index progress for: ", request) dipt := &GetIndexStateTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -674,7 +674,7 @@ func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchReque } func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) { - log.Println("AA Flush collections: ", request.CollectionNames) + log.Debug("proxynode", zap.Strings("Flush collections: ", request.CollectionNames)) ft := &FlushTask{ ctx: ctx, Condition: NewTaskCondition(ctx), diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index 73a276bfda..15ae27b648 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -2,15 +2,15 @@ package proxynode import ( "context" + "errors" "fmt" - "log" "reflect" "sort" "sync" - "errors" - + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "go.uber.org/zap" ) func SliceContain(s interface{}, item interface{}) bool { @@ -102,8 +102,7 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st stream, _ := m.msFactory.NewMsgStream(context.Background()) stream.AsProducer(channels) - // FIXME(wxyu): use log.Debug instead - log.Println("proxynode AsProducer: ", channels) + log.Debug("proxynode", zap.Strings("proxynode AsProducer: ", channels)) repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true) } @@ -136,7 +135,7 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error { m.insertMsgStreams[loc].Close() m.droppedBitMap[loc] = 1 delete(m.collectionID2InsertChannels, collID) - log.Print("close insert message stream ...") + log.Warn("close insert message stream ...") } return nil } diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 307c477137..d1a720f082 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -2,17 +2,19 @@ package proxynode import ( "bytes" - "log" + "fmt" + "path" "strconv" "strings" "sync" "time" "github.com/spf13/cast" - "github.com/spf13/viper" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -52,6 +54,8 @@ type ParamTable struct { MaxDimension int64 DefaultPartitionTag string DefaultIndexName string + + Log log.Config } var Params ParamTable @@ -73,7 +77,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam for _, v := range val { ss, err := cast.ToStringE(v) if err != nil { - log.Panic(err) + log.Panic("proxynode", zap.String("error", err.Error())) } if len(str) == 0 { str = ss @@ -83,10 +87,10 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam } default: - log.Panicf("undefine config type, key=%s", key) + log.Panic("proxynode", zap.String("error", "Undefined config type, key="+key)) } } - log.Println("key: ", key, ", value: ", str) + log.Debug("proxynode", zap.String(key, str)) err = pt.Save(key, str) if err != nil { panic(err) @@ -148,6 +152,7 @@ func (pt *ParamTable) initParams() { pt.initMaxDimension() pt.initDefaultPartitionTag() pt.initDefaultIndexName() + pt.initLogCfg() } @@ -173,7 +178,7 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID { for _, i := range queryNodeIDs { v, err := strconv.Atoi(i) if err != nil { - log.Panicf("load proxynode id list error, %s", err.Error()) + log.Panic("proxynode", zap.String("load proxynode id list error", err.Error())) } ret = append(ret, UniqueID(v)) } @@ -398,3 +403,34 @@ func (pt *ParamTable) initDefaultIndexName() { } pt.DefaultIndexName = name } + +func (pt *ParamTable) initLogCfg() { + pt.Log = log.Config{} + format, err := pt.Load("log.format") + if err != nil { + panic(err) + } + pt.Log.Format = format + level, err := pt.Load("log.level") + if err != nil { + panic(err) + } + pt.Log.Level = level + devStr, err := pt.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + pt.Log.Development = dev + pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") + pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") + pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") + rootPath, err := pt.Load("log.file.rootPath") + if err != nil { + panic(err) + } + pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID)) +} diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index bb2d44cf99..a259c20d18 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -3,21 +3,22 @@ package proxynode import ( "context" "errors" - "log" "math/rand" "sync" "sync/atomic" "time" - "github.com/zilliztech/milvus-distributed/internal/allocator" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/types" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" + "github.com/zilliztech/milvus-distributed/internal/types" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type UniqueID = typeutil.UniqueID @@ -77,7 +78,7 @@ func (node *ProxyNode) Init() error { if err != nil { return err } - log.Println("service was ready ...") + log.Debug("service was ready ...") request := &proxypb.RegisterNodeRequest{ Address: &commonpb.Address{ @@ -154,8 +155,8 @@ func (node *ProxyNode) Init() error { node.queryMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.queryMsgStream.AsProducer(Params.SearchChannelNames) // FIXME(wxyu): use log.Debug instead - log.Println("proxynode AsProducer: ", Params.SearchChannelNames) - log.Println("create query message stream ...") + log.Debug("proxynode", zap.Strings("proxynode AsProducer:", Params.SearchChannelNames)) + log.Debug("create query message stream ...") masterAddr := Params.MasterAddress idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr) @@ -182,13 +183,12 @@ func (node *ProxyNode) Init() error { node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.manipulationMsgStream.AsProducer(Params.InsertChannelNames) - // FIXME(wxyu): use log.Debug instead - log.Println("proxynode AsProducer: ", Params.InsertChannelNames) + log.Debug("proxynode", zap.Strings("proxynode AsProducer", Params.InsertChannelNames)) repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) } node.manipulationMsgStream.SetRepackFunc(repackFunc) - log.Println("create manipulation message stream ...") + log.Debug("create manipulation message stream ...") node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory) if err != nil { @@ -205,31 +205,31 @@ func (node *ProxyNode) Start() error { if err != nil { return err } - log.Println("init global meta cache ...") + log.Debug("init global meta cache ...") initGlobalInsertChannelsMap(node) - log.Println("init global insert channels map ...") + log.Debug("init global insert channels map ...") node.manipulationMsgStream.Start() - log.Println("start manipulation message stream ...") + log.Debug("start manipulation message stream ...") node.queryMsgStream.Start() - log.Println("start query message stream ...") + log.Debug("start query message stream ...") node.sched.Start() - log.Println("start scheduler ...") + log.Debug("start scheduler ...") node.idAllocator.Start() - log.Println("start id allocator ...") + log.Debug("start id allocator ...") node.tsoAllocator.Start() - log.Println("start tso allocator ...") + log.Debug("start tso allocator ...") node.segAssigner.Start() - log.Println("start seg assigner ...") + log.Debug("start seg assigner ...") node.tick.Start() - log.Println("start time tick ...") + log.Debug("start time tick ...") // Start callbacks for _, cb := range node.startCallbacks { @@ -237,7 +237,7 @@ func (node *ProxyNode) Start() error { } node.UpdateStateCode(internalpb2.StateCode_HEALTHY) - log.Println("proxy node is healthy ...") + log.Debug("proxy node is healthy ...") return nil } diff --git a/internal/proxynode/repack_func.go b/internal/proxynode/repack_func.go index ec2534736d..9edd7d23b7 100644 --- a/internal/proxynode/repack_func.go +++ b/internal/proxynode/repack_func.go @@ -1,17 +1,14 @@ package proxynode import ( + "errors" "log" "sort" - "errors" - - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) func insertRepackFunc(tsMsgs []msgstream.TsMsg, diff --git a/internal/proxynode/segment.go b/internal/proxynode/segment.go index 6757d9a7e7..1792a150c7 100644 --- a/internal/proxynode/segment.go +++ b/internal/proxynode/segment.go @@ -3,18 +3,18 @@ package proxynode import ( "container/list" "context" + "errors" "fmt" - "log" "time" - "errors" + "go.uber.org/zap" "github.com/zilliztech/milvus-distributed/internal/allocator" - "github.com/zilliztech/milvus-distributed/internal/types" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/types" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( @@ -80,7 +80,7 @@ func (info *assignInfo) RemoveExpired(ts Timestamp) { for e := info.segInfos.Front(); e != nil; e = e.Next() { segInfo, ok := e.Value.(*segInfo) if !ok { - log.Printf("can not cast to segInfo") + log.Warn("can not cast to segInfo") continue } if segInfo.IsExpired(ts) { @@ -292,7 +292,7 @@ func (sa *SegIDAssigner) syncSegments() bool { resp, err := sa.dataService.AssignSegmentID(ctx, req) if err != nil { - log.Println("GRPC AssignSegmentID Failed", resp, err) + log.Debug("proxynode", zap.String("GRPC AssignSegmentID Failed", err.Error())) return false } @@ -300,7 +300,7 @@ func (sa *SegIDAssigner) syncSegments() bool { success := false for _, info := range resp.SegIDAssignments { if info.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS { - log.Println("SyncSegment Error:", info.Status.Reason) + log.Debug("proxynode", zap.String("SyncSegment Error", info.Status.Reason)) continue } assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index f30270f3db..a4a601ae0b 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -2,17 +2,16 @@ package proxynode import ( "context" + "errors" "fmt" - "log" "math" "strconv" - "github.com/zilliztech/milvus-distributed/internal/types" - - "errors" + "go.uber.org/zap" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -21,6 +20,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -581,9 +581,9 @@ func (st *SearchTask) Execute(ctx context.Context) error { } msgPack.Msgs[0] = tsMsg err := st.queryMsgStream.Produce(ctx, msgPack) - log.Printf("[ProxyNode] length of searchMsg: %v", len(msgPack.Msgs)) + log.Debug("proxynode", zap.Int("length of searchMsg", len(msgPack.Msgs))) if err != nil { - log.Printf("[ProxyNode] send search request failed: %v", err) + log.Debug("proxynode", zap.String("send search request failed", err.Error())) } return err } @@ -592,7 +592,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { for { select { case <-st.Ctx().Done(): - log.Print("SearchTask: wait to finish failed, timeout!, taskID:", st.ID()) + log.Debug("proxynode", zap.Int64("SearchTask: wait to finish failed, timeout!, taskID:", st.ID())) return fmt.Errorf("SearchTask:wait to finish failed, timeout: %d", st.ID()) case searchResults := <-st.resultBuf: // fmt.Println("searchResults: ", searchResults) @@ -638,7 +638,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { partialHit := &milvuspb.Hits{} err := proto.Unmarshal(bs, partialHit) if err != nil { - log.Println("unmarshal error") + log.Debug("proxynode", zap.String("error", "unmarshal error")) return err } partialHits = append(partialHits, partialHit) @@ -731,7 +731,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { } reducedHitsBs, err := proto.Marshal(reducedHits) if err != nil { - log.Println("marshal error") + log.Debug("proxynode", zap.String("error", "marshal error")) return err } st.result.Hits = append(st.result.Hits, reducedHitsBs) @@ -1487,7 +1487,6 @@ func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error { func (dit *DescribeIndexTask) Execute(ctx context.Context) error { var err error dit.result, err = dit.masterService.DescribeIndex(ctx, dit.DescribeIndexRequest) - log.Println("YYYYY:", dit.result) if dit.result == nil { return errors.New("get collection statistics resp is nil") } @@ -1753,7 +1752,8 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error { } } - log.Println("GetIndexState:: len of allSegmentIDs:", len(allSegmentIDs), " len of IndexBuildIDs", len(indexBuildIDs)) + log.Debug("proxynode", zap.Int("GetIndexState:: len of allSegmentIDs", len(allSegmentIDs))) + log.Debug("proxynode", zap.Int("GetIndexState:: len of IndexBuildIDs", len(indexBuildIDs))) if len(allSegmentIDs) != len(indexBuildIDs) { gist.result = &milvuspb.IndexStateResponse{ Status: &commonpb.Status{ diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 91132fd05b..0f47c1bb36 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -3,16 +3,17 @@ package proxynode import ( "container/list" "context" + "errors" "fmt" - "log" "strconv" "sync" - "errors" + "go.uber.org/zap" "github.com/opentracing/opentracing-go" oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/util/trace" @@ -77,7 +78,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task { defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { - log.Panic("sorry, but the unissued task list is empty!") + log.Warn("sorry, but the unissued task list is empty!") return nil } @@ -89,7 +90,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task { defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { - log.Fatal("sorry, but the unissued task list is empty!") + log.Warn("sorry, but the unissued task list is empty!") return nil } @@ -106,7 +107,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) { ts := t.EndTs() _, ok := queue.activeTasks[ts] if ok { - log.Fatalf("task with timestamp %v already in active task list!", ts) + log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts)) } queue.activeTasks[ts] = t @@ -122,7 +123,7 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { return t } - log.Fatalf("sorry, but the timestamp %d was not found in the active task list!", ts) + log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts)) return nil } @@ -173,11 +174,9 @@ func (queue *BaseTaskQueue) Enqueue(t task) error { } ts, _ := queue.sched.tsoAllocator.AllocOne() - // log.Printf("[ProxyNode] allocate timestamp: %v", ts) t.SetTs(ts) reqID, _ := queue.sched.idAllocator.AllocOne() - // log.Printf("[ProxyNode] allocate reqID: %v", reqID) t.SetID(reqID) return queue.addUnissuedTask(t) @@ -309,7 +308,6 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { defer func() { t.Notify(err) - // log.Printf("notify with error: %v", err) }() if err != nil { trace.LogError(span, err) @@ -319,11 +317,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID())) q.AddActiveTask(t) - // log.Printf("task add to active list ...") defer func() { span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID())) q.PopActiveTask(t.EndTs()) - // log.Printf("pop from active list ...") }() span.LogFields(oplog.Int64("scheduler process Execute", t.ID())) err = t.Execute(ctx) @@ -331,10 +327,8 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { trace.LogError(span, err) return } - // log.Printf("task execution done ...") span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) err = t.PostExecute(ctx) - // log.Printf("post execute task done ...") } func (sched *TaskScheduler) definitionLoop() { @@ -375,12 +369,11 @@ func (sched *TaskScheduler) queryLoop() { case <-sched.ctx.Done(): return case <-sched.DqQueue.utChan(): - // log.Print("scheduler receive query request ...") if !sched.DqQueue.UTEmpty() { t := sched.scheduleDqTask() go sched.processTask(t, sched.DqQueue) } else { - log.Print("query queue is empty ...") + log.Debug("query queue is empty ...") } } } @@ -391,7 +384,8 @@ func (sched *TaskScheduler) queryResultLoop() { queryResultMsgStream, _ := sched.msFactory.NewMsgStream(sched.ctx) queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName) - log.Println("proxynode AsConsumer: ", Params.SearchResultChannelNames, " : ", Params.ProxySubName) + log.Debug("proxynode", zap.Strings("search result channel names", Params.SearchResultChannelNames)) + log.Debug("proxynode", zap.String("proxySubName", Params.ProxySubName)) queryNodeNum := Params.QueryNodeNum queryResultMsgStream.Start() @@ -403,7 +397,7 @@ func (sched *TaskScheduler) queryResultLoop() { select { case msgPack, ok := <-queryResultMsgStream.Chan(): if !ok { - log.Print("buf chan closed") + log.Debug("buf chan closed") return } if msgPack == nil { @@ -415,7 +409,7 @@ func (sched *TaskScheduler) queryResultLoop() { reqIDStr := strconv.FormatInt(reqID, 10) t := sched.getTaskByReqID(reqID) if t == nil { - log.Println(fmt.Sprint("QueryResult:czs:GetTaskByReqID failed, reqID:", reqIDStr)) + log.Debug("proxynode", zap.String("QueryResult GetTaskByReqID failed, reqID = ", reqIDStr)) delete(queryResultBuf, reqID) continue } @@ -436,7 +430,6 @@ func (sched *TaskScheduler) queryResultLoop() { if t != nil { qt, ok := t.(*SearchTask) if ok { - log.Printf("address of query task: %p", qt) qt.resultBuf <- queryResultBuf[reqID] delete(queryResultBuf, reqID) } @@ -447,7 +440,7 @@ func (sched *TaskScheduler) queryResultLoop() { } } case <-sched.ctx.Done(): - log.Print("proxynode server is closed ...") + log.Debug("proxynode server is closed ...") return } } diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go index 6e89e69cd8..8382380894 100644 --- a/internal/proxynode/timetick.go +++ b/internal/proxynode/timetick.go @@ -2,17 +2,17 @@ package proxynode import ( "context" - "log" "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "go.uber.org/zap" "github.com/apache/pulsar-client-go/pulsar" "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type tickCheckFunc = func(Timestamp) bool @@ -55,8 +55,7 @@ func newTimeTick(ctx context.Context, t.tickMsgStream, _ = t.msFactory.NewMsgStream(t.ctx) t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames) - // FIXME(wxyu): use log.Debug instead - log.Println("proxynode AsProducer: ", Params.ProxyTimeTickChannelNames) + log.Debug("proxynode", zap.Strings("proxynode AsProducer", Params.ProxyTimeTickChannelNames)) return t } @@ -89,11 +88,10 @@ func (tt *timeTick) tick() error { msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) err := tt.tickMsgStream.Produce(tt.ctx, &msgPack) if err != nil { - log.Printf("proxynode send time tick error: %v", err) + log.Warn("proxynode", zap.String("error", err.Error())) } else { - //log.Printf("proxynode send time tick message") + log.Warn("proxynode send time tick message") } - //log.Println("send current tick: ", tt.currentTick) tt.tickLock.Lock() defer tt.tickLock.Unlock() tt.lastTick = tt.currentTick @@ -107,7 +105,7 @@ func (tt *timeTick) tickLoop() { select { case <-tt.timer.C: if err := tt.tick(); err != nil { - log.Printf("timeTick error") + log.Warn("timeTick error") } case <-tt.ctx.Done(): return diff --git a/internal/proxynode/util.go b/internal/proxynode/util.go deleted file mode 100644 index 2a8a269420..0000000000 --- a/internal/proxynode/util.go +++ /dev/null @@ -1,32 +0,0 @@ -package proxynode - -import ( - "log" - "time" -) - -// Reference: https://blog.cyeam.com/golang/2018/08/27/retry - -func Retry(attempts int, sleep time.Duration, fn func() error) error { - if err := fn(); err != nil { - if s, ok := err.(InterruptError); ok { - return s.error - } - - if attempts--; attempts > 0 { - log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep) - time.Sleep(sleep) - return Retry(attempts, 2*sleep, fn) - } - return err - } - return nil -} - -type InterruptError struct { - error -} - -func NoRetryError(err error) InterruptError { - return InterruptError{err} -} diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index ec36ca6ef9..cc4131e288 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -3,18 +3,18 @@ package proxyservice import ( "context" "io/ioutil" - "log" "os" "path" "runtime" "strconv" "time" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" ) @@ -37,7 +37,7 @@ func (s *ProxyService) fillNodeInitParams() error { _, fpath, _, _ := runtime.Caller(0) configFile := path.Dir(fpath) + "/../../configs/" + fileName _, err := os.Stat(configFile) - log.Printf("configFile = %s", configFile) + log.Debug("proxyservice", zap.String("configFile = ", configFile)) if os.IsNotExist(err) { runPath, err := os.Getwd() if err != nil { @@ -97,7 +97,7 @@ func (s *ProxyService) Init() error { if err != nil { return err } - log.Println("fill node init params ...") + log.Debug("fill node init params ...") m := map[string]interface{}{ "PulsarAddress": Params.PulsarAddress, @@ -110,9 +110,8 @@ func (s *ProxyService) Init() error { serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx) serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel}) - // FIXME(wxyu): use log.Debug instead - log.Println("proxyservice AsProducer: ", []string{Params.ServiceTimeTickChannel}) - log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel}) + log.Debug("proxyservice", zap.Strings("proxyservice AsProducer", []string{Params.ServiceTimeTickChannel})) + log.Debug("proxyservice", zap.Strings("create service time tick producer channel", []string{Params.ServiceTimeTickChannel})) channels := make([]string, Params.InsertChannelNum) var i int64 = 0 @@ -121,20 +120,17 @@ func (s *ProxyService) Init() error { } insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) insertTickMsgStream.AsProducer(channels) - // FIXME(wxyu): use log.Debug instead - log.Println("proxyservice AsProducer: ", channels) - log.Println("create insert time tick producer channel: ", channels) + log.Debug("proxyservice", zap.Strings("create insert time tick producer channels", channels)) nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel, "proxyservicesub") // TODO: add config - log.Println("proxynode AsConsumer: ", Params.NodeTimeTickChannel, " : ", "proxyservicesub") - log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) + log.Debug("proxyservice", zap.Strings("create node time tick consumer channel", Params.NodeTimeTickChannel)) ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10) - log.Println("create soft time tick barrier ...") + log.Debug("create soft time tick barrier ...") s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) - log.Println("create time tick ...") + log.Debug("create time tick ...") return nil } @@ -142,21 +138,21 @@ func (s *ProxyService) Init() error { func (s *ProxyService) Start() error { s.stateCode = internalpb2.StateCode_HEALTHY s.sched.Start() - log.Println("start scheduler ...") + log.Debug("start scheduler ...") return s.tick.Start() } func (s *ProxyService) Stop() error { s.sched.Close() - log.Println("close scheduler ...") + log.Debug("close scheduler ...") s.tick.Close() - log.Println("close time tick") + log.Debug("close time tick") err := s.nodeInfos.ReleaseAllClients() if err != nil { panic(err) } - log.Println("stop all node ProxyNodes ...") + log.Debug("stop all node ProxyNodes ...") s.cancel() @@ -198,7 +194,7 @@ func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri } func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) { - log.Println("register link") + log.Debug("register link") t := &RegisterLinkTask{ ctx: ctx, @@ -234,7 +230,6 @@ func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLink } func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { - log.Println("RegisterNode: ", request) t := &RegisterNodeTask{ ctx: ctx, @@ -273,7 +268,7 @@ func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.Regist } func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - log.Println("InvalidateCollectionMetaCache") + log.Debug("InvalidateCollectionMetaCache") t := &InvalidateCollectionMetaCacheTask{ ctx: ctx, diff --git a/internal/proxyservice/node_info.go b/internal/proxyservice/node_info.go index 2f7bc07efa..61d0eaf0fd 100644 --- a/internal/proxyservice/node_info.go +++ b/internal/proxyservice/node_info.go @@ -3,12 +3,12 @@ package proxyservice import ( "context" "errors" - "log" "math/rand" "strconv" "sync" grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/types" ) @@ -66,8 +66,6 @@ func (table *GlobalNodeInfoTable) Register(id UniqueID, info *NodeInfo) error { } func (table *GlobalNodeInfoTable) createClients() error { - log.Println("infos: ", table.infos) - log.Println("ProxyNodes: ", table.ProxyNodes) if len(table.ProxyNodes) == len(table.infos) { return nil } @@ -75,7 +73,6 @@ func (table *GlobalNodeInfoTable) createClients() error { for nodeID, info := range table.infos { _, ok := table.ProxyNodes[nodeID] if !ok { - log.Println(info) table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port))) var err error err = table.ProxyNodes[nodeID].Init() @@ -94,10 +91,10 @@ func (table *GlobalNodeInfoTable) createClients() error { func (table *GlobalNodeInfoTable) ReleaseAllClients() error { table.mu.Lock() - log.Println("get write lock") + log.Debug("get write lock") defer func() { table.mu.Unlock() - log.Println("release write lock") + log.Debug("release write lock") }() var err error diff --git a/internal/proxyservice/paramtable.go b/internal/proxyservice/paramtable.go index c1a95f1201..15dcd384bd 100644 --- a/internal/proxyservice/paramtable.go +++ b/internal/proxyservice/paramtable.go @@ -1,9 +1,13 @@ package proxyservice import ( - "log" + "path" + "strconv" "sync" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -17,6 +21,8 @@ type ParamTable struct { DataServiceAddress string InsertChannelPrefixName string InsertChannelNum int64 + + Log log.Config } var Params ParamTable @@ -37,6 +43,7 @@ func (pt *ParamTable) Init() { pt.initDataServiceAddress() pt.initInsertChannelPrefixName() pt.initInsertChannelNum() + pt.initLogCfg() }) } @@ -59,7 +66,7 @@ func (pt *ParamTable) initMasterAddress() { func (pt *ParamTable) initNodeTimeTickChannel() { prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick") if err != nil { - log.Panic(err) + log.Panic("proxyservice", zap.Error(err)) } prefix += "-0" pt.NodeTimeTickChannel = []string{prefix} @@ -68,7 +75,7 @@ func (pt *ParamTable) initNodeTimeTickChannel() { func (pt *ParamTable) initServiceTimeTickChannel() { ch, err := pt.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick") if err != nil { - log.Panic(err) + log.Panic("proxyservice", zap.Error(err)) } pt.ServiceTimeTickChannel = ch } @@ -89,3 +96,34 @@ func (pt *ParamTable) initInsertChannelPrefixName() { panic(err) } } + +func (pt *ParamTable) initLogCfg() { + pt.Log = log.Config{} + format, err := pt.Load("log.format") + if err != nil { + panic(err) + } + pt.Log.Format = format + level, err := pt.Load("log.level") + if err != nil { + panic(err) + } + pt.Log.Level = level + devStr, err := pt.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + pt.Log.Development = dev + pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") + pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") + pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") + rootPath, err := pt.Load("log.file.rootPath") + if err != nil { + panic(err) + } + pt.Log.File.Filename = path.Join(rootPath, "proxyservice-%d.log") +} diff --git a/internal/proxyservice/task_queue.go b/internal/proxyservice/task_queue.go index f58620eda0..eb881207c2 100644 --- a/internal/proxyservice/task_queue.go +++ b/internal/proxyservice/task_queue.go @@ -2,10 +2,10 @@ package proxyservice import ( "container/list" - "log" + "errors" "sync" - "errors" + "github.com/zilliztech/milvus-distributed/internal/log" ) type TaskQueue interface { diff --git a/internal/proxyservice/timesync.go b/internal/proxyservice/timesync.go index 45633c5391..0820d9a3e5 100644 --- a/internal/proxyservice/timesync.go +++ b/internal/proxyservice/timesync.go @@ -3,11 +3,13 @@ package proxyservice import ( "context" "errors" - "log" "math" "sync" "sync/atomic" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -42,7 +44,7 @@ func (ttBarrier *softTimeTickBarrier) AddPeer(peerID UniqueID) error { _, ok := ttBarrier.peer2LastTt[peerID] if ok { - log.Println("no need to add duplicated peer: ", peerID) + log.Debug("proxyservice", zap.Int64("no need to add duplicated peer", peerID)) return nil } @@ -67,7 +69,6 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts)) - // log.Println("current tick: ", ts) return ts, ttBarrier.ctx.Err() } } @@ -77,26 +78,20 @@ func (ttBarrier *softTimeTickBarrier) Start() error { for { select { case <-ttBarrier.ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + log.Warn("TtBarrierStart", zap.Error(ttBarrier.ctx.Err())) return case ttmsgs := <-ttBarrier.ttStream.Chan(): - //log.Println("ttmsgs: ", ttmsgs) ttBarrier.peerMtx.RLock() - //log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt) - //log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs)) if len(ttmsgs.Msgs) > 0 { for _, timetickmsg := range ttmsgs.Msgs { ttmsg := timetickmsg.(*ms.TimeTickMsg) oldT, ok := ttBarrier.peer2LastTt[ttmsg.Base.SourceID] - // log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp) if !ok { - log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID) + log.Warn("softTimeTickBarrier", zap.Int64("peerID %d not exist", ttmsg.Base.SourceID)) continue } - // log.Println("ttmsg.Base.Timestamp: ", ttmsg.Base.Timestamp) - // log.Println("oldT: ", oldT) if ttmsg.Base.Timestamp > oldT { ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp @@ -126,7 +121,7 @@ func newSoftTimeTickBarrier(ctx context.Context, minTtInterval Timestamp) TimeTickBarrier { if len(peerIds) <= 0 { - log.Printf("[newSoftTimeTickBarrier] Warning: peerIds is empty!\n") + log.Warn("[newSoftTimeTickBarrier] Warning: peerIds is empty!") //return nil } @@ -140,7 +135,7 @@ func newSoftTimeTickBarrier(ctx context.Context, sttbarrier.peer2LastTt[id] = Timestamp(0) } if len(peerIds) != len(sttbarrier.peer2LastTt) { - log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n") + log.Warn("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!") } return &sttbarrier diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go index 6d16384233..64321b7e8e 100644 --- a/internal/proxyservice/timetick.go +++ b/internal/proxyservice/timetick.go @@ -2,9 +2,11 @@ package proxyservice import ( "context" - "log" "sync" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -19,19 +21,19 @@ type TimeTick struct { } func (tt *TimeTick) Start() error { - log.Println("start time tick ...") + log.Debug("start time tick ...") tt.wg.Add(1) go func() { defer tt.wg.Done() for { select { case <-tt.ctx.Done(): - log.Println("time tick loop was canceled by context!") + log.Debug("time tick loop was canceled by context!") return default: current, err := tt.ttBarrier.GetTimeTick() if err != nil { - log.Println("GetTimeTick error: ", err) + log.Error("GetTimeTick error", zap.Error(err)) break } msgPack := msgstream.MsgPack{} @@ -50,12 +52,12 @@ func (tt *TimeTick) Start() error { } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) for _, msg := range msgPack.Msgs { - log.Println("msg type xxxxxxxxxxxxxxxxxxxxxxxx", msg.Type()) + log.Debug("proxyservice", zap.Stringer("msg type", msg.Type())) } for _, channel := range tt.channels { err = channel.Broadcast(tt.ctx, &msgPack) if err != nil { - log.Println("send time tick error: ", err) + log.Error("proxyservice", zap.String("send time tick error", err.Error())) } } }