Use the project log instead of standard log of proxyservice and proxynode

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-03-08 19:39:36 +08:00 committed by yefu.chen
parent e30b18a8f1
commit aa5ecbdc02
17 changed files with 214 additions and 197 deletions

View File

@ -2,7 +2,6 @@ package proxynode
import (
"context"
"errors"
)

View File

@ -2,12 +2,13 @@ package proxynode
import (
"context"
"log"
"errors"
"strconv"
"time"
"errors"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -35,7 +36,7 @@ func (node *ProxyNode) InvalidateCollectionMetaCache(ctx context.Context, reques
}
func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
log.Println("create collection: ", request)
log.Debug("create collection...")
cct := &CreateCollectionTask{
ctx: ctx,
@ -65,7 +66,7 @@ func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.C
}
func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
log.Println("drop collection: ", request)
log.Debug("drop collection... ")
dct := &DropCollectionTask{
ctx: ctx,
@ -94,7 +95,7 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro
}
func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
log.Println("has collection: ", request)
log.Debug("has collection... ")
hct := &HasCollectionTask{
ctx: ctx,
@ -127,7 +128,7 @@ func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasC
}
func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
log.Println("load collection: ", request)
log.Debug("load collection...")
//ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
//defer cancel()
@ -158,7 +159,7 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa
}
func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
log.Println("release collection: ", request)
log.Debug("release collection...")
rct := &ReleaseCollectionTask{
ctx: ctx,
@ -187,7 +188,7 @@ func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.
}
func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
log.Println("describe collection: ", request)
log.Debug("describe collection...")
dct := &DescribeCollectionTask{
ctx: ctx,
@ -220,7 +221,7 @@ func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb
}
func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
log.Println("get collection statistics")
log.Debug("get collection statistics...")
g := &GetCollectionsStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -252,7 +253,7 @@ func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *mil
}
func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
log.Println("show collections")
log.Debug("show collections...")
sct := &ShowCollectionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -284,7 +285,7 @@ func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.Sh
}
func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
log.Println("create partition", request)
log.Debug("create partition...")
cpt := &CreatePartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -311,7 +312,7 @@ func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.Cr
}
func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
log.Println("drop partition: ", request)
log.Debug("drop partition...")
dpt := &DropPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -339,7 +340,7 @@ func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.Drop
}
func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
log.Println("has partition: ", request)
log.Debug("has partition...")
hpt := &HasPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -373,7 +374,7 @@ func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPa
}
func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
log.Println("load partitions: ", request)
log.Debug("load partitions...")
lpt := &LoadPartitionTask{
ctx: ctx,
@ -402,7 +403,7 @@ func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.Loa
}
func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
log.Println("load partitions: ", request)
log.Debug("load partitions...")
rpt := &ReleasePartitionTask{
ctx: ctx,
@ -435,7 +436,7 @@ func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milv
}
func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
log.Println("show partitions: ", request)
log.Debug("show partitions...")
spt := &ShowPartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -468,7 +469,7 @@ func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.Sho
}
func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
log.Println("create index for: ", request)
log.Debug("create index for...")
cit := &CreateIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -496,7 +497,7 @@ func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.Create
}
func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
log.Println("Describe index for: ", request)
log.Debug("Describe index for...")
dit := &DescribeIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -528,7 +529,7 @@ func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.Desc
}
func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
log.Println("Drop index for: ", request)
log.Debug("Drop index for...")
dit := &DropIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -553,7 +554,6 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde
}
func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
// log.Println("Describe index progress for: ", request)
dipt := &GetIndexStateTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -674,7 +674,7 @@ func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchReque
}
func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
log.Println("AA Flush collections: ", request.CollectionNames)
log.Debug("proxynode", zap.Strings("Flush collections: ", request.CollectionNames))
ft := &FlushTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),

View File

@ -2,15 +2,15 @@ package proxynode
import (
"context"
"errors"
"fmt"
"log"
"reflect"
"sort"
"sync"
"errors"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"go.uber.org/zap"
)
func SliceContain(s interface{}, item interface{}) bool {
@ -102,8 +102,7 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
stream, _ := m.msFactory.NewMsgStream(context.Background())
stream.AsProducer(channels)
// FIXME(wxyu): use log.Debug instead
log.Println("proxynode AsProducer: ", channels)
log.Debug("proxynode", zap.Strings("proxynode AsProducer: ", channels))
repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true)
}
@ -136,7 +135,7 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error {
m.insertMsgStreams[loc].Close()
m.droppedBitMap[loc] = 1
delete(m.collectionID2InsertChannels, collID)
log.Print("close insert message stream ...")
log.Warn("close insert message stream ...")
}
return nil
}

View File

@ -2,17 +2,19 @@ package proxynode
import (
"bytes"
"log"
"fmt"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/spf13/cast"
"github.com/spf13/viper"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
@ -52,6 +54,8 @@ type ParamTable struct {
MaxDimension int64
DefaultPartitionTag string
DefaultIndexName string
Log log.Config
}
var Params ParamTable
@ -73,7 +77,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
for _, v := range val {
ss, err := cast.ToStringE(v)
if err != nil {
log.Panic(err)
log.Panic("proxynode", zap.String("error", err.Error()))
}
if len(str) == 0 {
str = ss
@ -83,10 +87,10 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
}
default:
log.Panicf("undefine config type, key=%s", key)
log.Panic("proxynode", zap.String("error", "Undefined config type, key="+key))
}
}
log.Println("key: ", key, ", value: ", str)
log.Debug("proxynode", zap.String(key, str))
err = pt.Save(key, str)
if err != nil {
panic(err)
@ -148,6 +152,7 @@ func (pt *ParamTable) initParams() {
pt.initMaxDimension()
pt.initDefaultPartitionTag()
pt.initDefaultIndexName()
pt.initLogCfg()
}
@ -173,7 +178,7 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
for _, i := range queryNodeIDs {
v, err := strconv.Atoi(i)
if err != nil {
log.Panicf("load proxynode id list error, %s", err.Error())
log.Panic("proxynode", zap.String("load proxynode id list error", err.Error()))
}
ret = append(ret, UniqueID(v))
}
@ -398,3 +403,34 @@ func (pt *ParamTable) initDefaultIndexName() {
}
pt.DefaultIndexName = name
}
func (pt *ParamTable) initLogCfg() {
pt.Log = log.Config{}
format, err := pt.Load("log.format")
if err != nil {
panic(err)
}
pt.Log.Format = format
level, err := pt.Load("log.level")
if err != nil {
panic(err)
}
pt.Log.Level = level
devStr, err := pt.Load("log.dev")
if err != nil {
panic(err)
}
dev, err := strconv.ParseBool(devStr)
if err != nil {
panic(err)
}
pt.Log.Development = dev
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
rootPath, err := pt.Load("log.file.rootPath")
if err != nil {
panic(err)
}
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID))
}

View File

@ -3,21 +3,22 @@ package proxynode
import (
"context"
"errors"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
@ -77,7 +78,7 @@ func (node *ProxyNode) Init() error {
if err != nil {
return err
}
log.Println("service was ready ...")
log.Debug("service was ready ...")
request := &proxypb.RegisterNodeRequest{
Address: &commonpb.Address{
@ -154,8 +155,8 @@ func (node *ProxyNode) Init() error {
node.queryMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
node.queryMsgStream.AsProducer(Params.SearchChannelNames)
// FIXME(wxyu): use log.Debug instead
log.Println("proxynode AsProducer: ", Params.SearchChannelNames)
log.Println("create query message stream ...")
log.Debug("proxynode", zap.Strings("proxynode AsProducer:", Params.SearchChannelNames))
log.Debug("create query message stream ...")
masterAddr := Params.MasterAddress
idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr)
@ -182,13 +183,12 @@ func (node *ProxyNode) Init() error {
node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
// FIXME(wxyu): use log.Debug instead
log.Println("proxynode AsProducer: ", Params.InsertChannelNames)
log.Debug("proxynode", zap.Strings("proxynode AsProducer", Params.InsertChannelNames))
repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
}
node.manipulationMsgStream.SetRepackFunc(repackFunc)
log.Println("create manipulation message stream ...")
log.Debug("create manipulation message stream ...")
node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
if err != nil {
@ -205,31 +205,31 @@ func (node *ProxyNode) Start() error {
if err != nil {
return err
}
log.Println("init global meta cache ...")
log.Debug("init global meta cache ...")
initGlobalInsertChannelsMap(node)
log.Println("init global insert channels map ...")
log.Debug("init global insert channels map ...")
node.manipulationMsgStream.Start()
log.Println("start manipulation message stream ...")
log.Debug("start manipulation message stream ...")
node.queryMsgStream.Start()
log.Println("start query message stream ...")
log.Debug("start query message stream ...")
node.sched.Start()
log.Println("start scheduler ...")
log.Debug("start scheduler ...")
node.idAllocator.Start()
log.Println("start id allocator ...")
log.Debug("start id allocator ...")
node.tsoAllocator.Start()
log.Println("start tso allocator ...")
log.Debug("start tso allocator ...")
node.segAssigner.Start()
log.Println("start seg assigner ...")
log.Debug("start seg assigner ...")
node.tick.Start()
log.Println("start time tick ...")
log.Debug("start time tick ...")
// Start callbacks
for _, cb := range node.startCallbacks {
@ -237,7 +237,7 @@ func (node *ProxyNode) Start() error {
}
node.UpdateStateCode(internalpb2.StateCode_HEALTHY)
log.Println("proxy node is healthy ...")
log.Debug("proxy node is healthy ...")
return nil
}

View File

@ -1,17 +1,14 @@
package proxynode
import (
"errors"
"log"
"sort"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
func insertRepackFunc(tsMsgs []msgstream.TsMsg,

View File

@ -3,18 +3,18 @@ package proxynode
import (
"container/list"
"context"
"errors"
"fmt"
"log"
"time"
"errors"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
@ -80,7 +80,7 @@ func (info *assignInfo) RemoveExpired(ts Timestamp) {
for e := info.segInfos.Front(); e != nil; e = e.Next() {
segInfo, ok := e.Value.(*segInfo)
if !ok {
log.Printf("can not cast to segInfo")
log.Warn("can not cast to segInfo")
continue
}
if segInfo.IsExpired(ts) {
@ -292,7 +292,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
resp, err := sa.dataService.AssignSegmentID(ctx, req)
if err != nil {
log.Println("GRPC AssignSegmentID Failed", resp, err)
log.Debug("proxynode", zap.String("GRPC AssignSegmentID Failed", err.Error()))
return false
}
@ -300,7 +300,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
success := false
for _, info := range resp.SegIDAssignments {
if info.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS {
log.Println("SyncSegment Error:", info.Status.Reason)
log.Debug("proxynode", zap.String("SyncSegment Error", info.Status.Reason))
continue
}
assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)

View File

@ -2,17 +2,16 @@ package proxynode
import (
"context"
"errors"
"fmt"
"log"
"math"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -21,6 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -581,9 +581,9 @@ func (st *SearchTask) Execute(ctx context.Context) error {
}
msgPack.Msgs[0] = tsMsg
err := st.queryMsgStream.Produce(ctx, msgPack)
log.Printf("[ProxyNode] length of searchMsg: %v", len(msgPack.Msgs))
log.Debug("proxynode", zap.Int("length of searchMsg", len(msgPack.Msgs)))
if err != nil {
log.Printf("[ProxyNode] send search request failed: %v", err)
log.Debug("proxynode", zap.String("send search request failed", err.Error()))
}
return err
}
@ -592,7 +592,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
for {
select {
case <-st.Ctx().Done():
log.Print("SearchTask: wait to finish failed, timeout!, taskID:", st.ID())
log.Debug("proxynode", zap.Int64("SearchTask: wait to finish failed, timeout!, taskID:", st.ID()))
return fmt.Errorf("SearchTask:wait to finish failed, timeout: %d", st.ID())
case searchResults := <-st.resultBuf:
// fmt.Println("searchResults: ", searchResults)
@ -638,7 +638,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
partialHit := &milvuspb.Hits{}
err := proto.Unmarshal(bs, partialHit)
if err != nil {
log.Println("unmarshal error")
log.Debug("proxynode", zap.String("error", "unmarshal error"))
return err
}
partialHits = append(partialHits, partialHit)
@ -731,7 +731,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
}
reducedHitsBs, err := proto.Marshal(reducedHits)
if err != nil {
log.Println("marshal error")
log.Debug("proxynode", zap.String("error", "marshal error"))
return err
}
st.result.Hits = append(st.result.Hits, reducedHitsBs)
@ -1487,7 +1487,6 @@ func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.masterService.DescribeIndex(ctx, dit.DescribeIndexRequest)
log.Println("YYYYY:", dit.result)
if dit.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -1753,7 +1752,8 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
}
}
log.Println("GetIndexState:: len of allSegmentIDs:", len(allSegmentIDs), " len of IndexBuildIDs", len(indexBuildIDs))
log.Debug("proxynode", zap.Int("GetIndexState:: len of allSegmentIDs", len(allSegmentIDs)))
log.Debug("proxynode", zap.Int("GetIndexState:: len of IndexBuildIDs", len(indexBuildIDs)))
if len(allSegmentIDs) != len(indexBuildIDs) {
gist.result = &milvuspb.IndexStateResponse{
Status: &commonpb.Status{

View File

@ -3,16 +3,17 @@ package proxynode
import (
"container/list"
"context"
"errors"
"fmt"
"log"
"strconv"
"sync"
"errors"
"go.uber.org/zap"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
@ -77,7 +78,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task {
defer queue.utLock.Unlock()
if queue.unissuedTasks.Len() <= 0 {
log.Panic("sorry, but the unissued task list is empty!")
log.Warn("sorry, but the unissued task list is empty!")
return nil
}
@ -89,7 +90,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
defer queue.utLock.Unlock()
if queue.unissuedTasks.Len() <= 0 {
log.Fatal("sorry, but the unissued task list is empty!")
log.Warn("sorry, but the unissued task list is empty!")
return nil
}
@ -106,7 +107,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
ts := t.EndTs()
_, ok := queue.activeTasks[ts]
if ok {
log.Fatalf("task with timestamp %v already in active task list!", ts)
log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts))
}
queue.activeTasks[ts] = t
@ -122,7 +123,7 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
return t
}
log.Fatalf("sorry, but the timestamp %d was not found in the active task list!", ts)
log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts))
return nil
}
@ -173,11 +174,9 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
}
ts, _ := queue.sched.tsoAllocator.AllocOne()
// log.Printf("[ProxyNode] allocate timestamp: %v", ts)
t.SetTs(ts)
reqID, _ := queue.sched.idAllocator.AllocOne()
// log.Printf("[ProxyNode] allocate reqID: %v", reqID)
t.SetID(reqID)
return queue.addUnissuedTask(t)
@ -309,7 +308,6 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
defer func() {
t.Notify(err)
// log.Printf("notify with error: %v", err)
}()
if err != nil {
trace.LogError(span, err)
@ -319,11 +317,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
// log.Printf("task add to active list ...")
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.EndTs())
// log.Printf("pop from active list ...")
}()
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
err = t.Execute(ctx)
@ -331,10 +327,8 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
trace.LogError(span, err)
return
}
// log.Printf("task execution done ...")
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
err = t.PostExecute(ctx)
// log.Printf("post execute task done ...")
}
func (sched *TaskScheduler) definitionLoop() {
@ -375,12 +369,11 @@ func (sched *TaskScheduler) queryLoop() {
case <-sched.ctx.Done():
return
case <-sched.DqQueue.utChan():
// log.Print("scheduler receive query request ...")
if !sched.DqQueue.UTEmpty() {
t := sched.scheduleDqTask()
go sched.processTask(t, sched.DqQueue)
} else {
log.Print("query queue is empty ...")
log.Debug("query queue is empty ...")
}
}
}
@ -391,7 +384,8 @@ func (sched *TaskScheduler) queryResultLoop() {
queryResultMsgStream, _ := sched.msFactory.NewMsgStream(sched.ctx)
queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName)
log.Println("proxynode AsConsumer: ", Params.SearchResultChannelNames, " : ", Params.ProxySubName)
log.Debug("proxynode", zap.Strings("search result channel names", Params.SearchResultChannelNames))
log.Debug("proxynode", zap.String("proxySubName", Params.ProxySubName))
queryNodeNum := Params.QueryNodeNum
queryResultMsgStream.Start()
@ -403,7 +397,7 @@ func (sched *TaskScheduler) queryResultLoop() {
select {
case msgPack, ok := <-queryResultMsgStream.Chan():
if !ok {
log.Print("buf chan closed")
log.Debug("buf chan closed")
return
}
if msgPack == nil {
@ -415,7 +409,7 @@ func (sched *TaskScheduler) queryResultLoop() {
reqIDStr := strconv.FormatInt(reqID, 10)
t := sched.getTaskByReqID(reqID)
if t == nil {
log.Println(fmt.Sprint("QueryResult:czs:GetTaskByReqID failed, reqID:", reqIDStr))
log.Debug("proxynode", zap.String("QueryResult GetTaskByReqID failed, reqID = ", reqIDStr))
delete(queryResultBuf, reqID)
continue
}
@ -436,7 +430,6 @@ func (sched *TaskScheduler) queryResultLoop() {
if t != nil {
qt, ok := t.(*SearchTask)
if ok {
log.Printf("address of query task: %p", qt)
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
}
@ -447,7 +440,7 @@ func (sched *TaskScheduler) queryResultLoop() {
}
}
case <-sched.ctx.Done():
log.Print("proxynode server is closed ...")
log.Debug("proxynode server is closed ...")
return
}
}

View File

@ -2,17 +2,17 @@ package proxynode
import (
"context"
"log"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"go.uber.org/zap"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type tickCheckFunc = func(Timestamp) bool
@ -55,8 +55,7 @@ func newTimeTick(ctx context.Context,
t.tickMsgStream, _ = t.msFactory.NewMsgStream(t.ctx)
t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames)
// FIXME(wxyu): use log.Debug instead
log.Println("proxynode AsProducer: ", Params.ProxyTimeTickChannelNames)
log.Debug("proxynode", zap.Strings("proxynode AsProducer", Params.ProxyTimeTickChannelNames))
return t
}
@ -89,11 +88,10 @@ func (tt *timeTick) tick() error {
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err := tt.tickMsgStream.Produce(tt.ctx, &msgPack)
if err != nil {
log.Printf("proxynode send time tick error: %v", err)
log.Warn("proxynode", zap.String("error", err.Error()))
} else {
//log.Printf("proxynode send time tick message")
log.Warn("proxynode send time tick message")
}
//log.Println("send current tick: ", tt.currentTick)
tt.tickLock.Lock()
defer tt.tickLock.Unlock()
tt.lastTick = tt.currentTick
@ -107,7 +105,7 @@ func (tt *timeTick) tickLoop() {
select {
case <-tt.timer.C:
if err := tt.tick(); err != nil {
log.Printf("timeTick error")
log.Warn("timeTick error")
}
case <-tt.ctx.Done():
return

View File

@ -1,32 +0,0 @@
package proxynode
import (
"log"
"time"
)
// Reference: https://blog.cyeam.com/golang/2018/08/27/retry
func Retry(attempts int, sleep time.Duration, fn func() error) error {
if err := fn(); err != nil {
if s, ok := err.(InterruptError); ok {
return s.error
}
if attempts--; attempts > 0 {
log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep)
time.Sleep(sleep)
return Retry(attempts, 2*sleep, fn)
}
return err
}
return nil
}
type InterruptError struct {
error
}
func NoRetryError(err error) InterruptError {
return InterruptError{err}
}

View File

@ -3,18 +3,18 @@ package proxyservice
import (
"context"
"io/ioutil"
"log"
"os"
"path"
"runtime"
"strconv"
"time"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
)
@ -37,7 +37,7 @@ func (s *ProxyService) fillNodeInitParams() error {
_, fpath, _, _ := runtime.Caller(0)
configFile := path.Dir(fpath) + "/../../configs/" + fileName
_, err := os.Stat(configFile)
log.Printf("configFile = %s", configFile)
log.Debug("proxyservice", zap.String("configFile = ", configFile))
if os.IsNotExist(err) {
runPath, err := os.Getwd()
if err != nil {
@ -97,7 +97,7 @@ func (s *ProxyService) Init() error {
if err != nil {
return err
}
log.Println("fill node init params ...")
log.Debug("fill node init params ...")
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
@ -110,9 +110,8 @@ func (s *ProxyService) Init() error {
serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx)
serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel})
// FIXME(wxyu): use log.Debug instead
log.Println("proxyservice AsProducer: ", []string{Params.ServiceTimeTickChannel})
log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
log.Debug("proxyservice", zap.Strings("proxyservice AsProducer", []string{Params.ServiceTimeTickChannel}))
log.Debug("proxyservice", zap.Strings("create service time tick producer channel", []string{Params.ServiceTimeTickChannel}))
channels := make([]string, Params.InsertChannelNum)
var i int64 = 0
@ -121,20 +120,17 @@ func (s *ProxyService) Init() error {
}
insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx)
insertTickMsgStream.AsProducer(channels)
// FIXME(wxyu): use log.Debug instead
log.Println("proxyservice AsProducer: ", channels)
log.Println("create insert time tick producer channel: ", channels)
log.Debug("proxyservice", zap.Strings("create insert time tick producer channels", channels))
nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx)
nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
"proxyservicesub") // TODO: add config
log.Println("proxynode AsConsumer: ", Params.NodeTimeTickChannel, " : ", "proxyservicesub")
log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
log.Debug("proxyservice", zap.Strings("create node time tick consumer channel", Params.NodeTimeTickChannel))
ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10)
log.Println("create soft time tick barrier ...")
log.Debug("create soft time tick barrier ...")
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream)
log.Println("create time tick ...")
log.Debug("create time tick ...")
return nil
}
@ -142,21 +138,21 @@ func (s *ProxyService) Init() error {
func (s *ProxyService) Start() error {
s.stateCode = internalpb2.StateCode_HEALTHY
s.sched.Start()
log.Println("start scheduler ...")
log.Debug("start scheduler ...")
return s.tick.Start()
}
func (s *ProxyService) Stop() error {
s.sched.Close()
log.Println("close scheduler ...")
log.Debug("close scheduler ...")
s.tick.Close()
log.Println("close time tick")
log.Debug("close time tick")
err := s.nodeInfos.ReleaseAllClients()
if err != nil {
panic(err)
}
log.Println("stop all node ProxyNodes ...")
log.Debug("stop all node ProxyNodes ...")
s.cancel()
@ -198,7 +194,7 @@ func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
}
func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
log.Println("register link")
log.Debug("register link")
t := &RegisterLinkTask{
ctx: ctx,
@ -234,7 +230,6 @@ func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLink
}
func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
log.Println("RegisterNode: ", request)
t := &RegisterNodeTask{
ctx: ctx,
@ -273,7 +268,7 @@ func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.Regist
}
func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
log.Println("InvalidateCollectionMetaCache")
log.Debug("InvalidateCollectionMetaCache")
t := &InvalidateCollectionMetaCacheTask{
ctx: ctx,

View File

@ -3,12 +3,12 @@ package proxyservice
import (
"context"
"errors"
"log"
"math/rand"
"strconv"
"sync"
grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/types"
)
@ -66,8 +66,6 @@ func (table *GlobalNodeInfoTable) Register(id UniqueID, info *NodeInfo) error {
}
func (table *GlobalNodeInfoTable) createClients() error {
log.Println("infos: ", table.infos)
log.Println("ProxyNodes: ", table.ProxyNodes)
if len(table.ProxyNodes) == len(table.infos) {
return nil
}
@ -75,7 +73,6 @@ func (table *GlobalNodeInfoTable) createClients() error {
for nodeID, info := range table.infos {
_, ok := table.ProxyNodes[nodeID]
if !ok {
log.Println(info)
table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)))
var err error
err = table.ProxyNodes[nodeID].Init()
@ -94,10 +91,10 @@ func (table *GlobalNodeInfoTable) createClients() error {
func (table *GlobalNodeInfoTable) ReleaseAllClients() error {
table.mu.Lock()
log.Println("get write lock")
log.Debug("get write lock")
defer func() {
table.mu.Unlock()
log.Println("release write lock")
log.Debug("release write lock")
}()
var err error

View File

@ -1,9 +1,13 @@
package proxyservice
import (
"log"
"path"
"strconv"
"sync"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
@ -17,6 +21,8 @@ type ParamTable struct {
DataServiceAddress string
InsertChannelPrefixName string
InsertChannelNum int64
Log log.Config
}
var Params ParamTable
@ -37,6 +43,7 @@ func (pt *ParamTable) Init() {
pt.initDataServiceAddress()
pt.initInsertChannelPrefixName()
pt.initInsertChannelNum()
pt.initLogCfg()
})
}
@ -59,7 +66,7 @@ func (pt *ParamTable) initMasterAddress() {
func (pt *ParamTable) initNodeTimeTickChannel() {
prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
if err != nil {
log.Panic(err)
log.Panic("proxyservice", zap.Error(err))
}
prefix += "-0"
pt.NodeTimeTickChannel = []string{prefix}
@ -68,7 +75,7 @@ func (pt *ParamTable) initNodeTimeTickChannel() {
func (pt *ParamTable) initServiceTimeTickChannel() {
ch, err := pt.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
if err != nil {
log.Panic(err)
log.Panic("proxyservice", zap.Error(err))
}
pt.ServiceTimeTickChannel = ch
}
@ -89,3 +96,34 @@ func (pt *ParamTable) initInsertChannelPrefixName() {
panic(err)
}
}
func (pt *ParamTable) initLogCfg() {
pt.Log = log.Config{}
format, err := pt.Load("log.format")
if err != nil {
panic(err)
}
pt.Log.Format = format
level, err := pt.Load("log.level")
if err != nil {
panic(err)
}
pt.Log.Level = level
devStr, err := pt.Load("log.dev")
if err != nil {
panic(err)
}
dev, err := strconv.ParseBool(devStr)
if err != nil {
panic(err)
}
pt.Log.Development = dev
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
rootPath, err := pt.Load("log.file.rootPath")
if err != nil {
panic(err)
}
pt.Log.File.Filename = path.Join(rootPath, "proxyservice-%d.log")
}

View File

@ -2,10 +2,10 @@ package proxyservice
import (
"container/list"
"log"
"errors"
"sync"
"errors"
"github.com/zilliztech/milvus-distributed/internal/log"
)
type TaskQueue interface {

View File

@ -3,11 +3,13 @@ package proxyservice
import (
"context"
"errors"
"log"
"math"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
@ -42,7 +44,7 @@ func (ttBarrier *softTimeTickBarrier) AddPeer(peerID UniqueID) error {
_, ok := ttBarrier.peer2LastTt[peerID]
if ok {
log.Println("no need to add duplicated peer: ", peerID)
log.Debug("proxyservice", zap.Int64("no need to add duplicated peer", peerID))
return nil
}
@ -67,7 +69,6 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts))
// log.Println("current tick: ", ts)
return ts, ttBarrier.ctx.Err()
}
}
@ -77,26 +78,20 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
for {
select {
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
log.Warn("TtBarrierStart", zap.Error(ttBarrier.ctx.Err()))
return
case ttmsgs := <-ttBarrier.ttStream.Chan():
//log.Println("ttmsgs: ", ttmsgs)
ttBarrier.peerMtx.RLock()
//log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt)
//log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs))
if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs {
ttmsg := timetickmsg.(*ms.TimeTickMsg)
oldT, ok := ttBarrier.peer2LastTt[ttmsg.Base.SourceID]
// log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)
if !ok {
log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID)
log.Warn("softTimeTickBarrier", zap.Int64("peerID %d not exist", ttmsg.Base.SourceID))
continue
}
// log.Println("ttmsg.Base.Timestamp: ", ttmsg.Base.Timestamp)
// log.Println("oldT: ", oldT)
if ttmsg.Base.Timestamp > oldT {
ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp
@ -126,7 +121,7 @@ func newSoftTimeTickBarrier(ctx context.Context,
minTtInterval Timestamp) TimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Warning: peerIds is empty!\n")
log.Warn("[newSoftTimeTickBarrier] Warning: peerIds is empty!")
//return nil
}
@ -140,7 +135,7 @@ func newSoftTimeTickBarrier(ctx context.Context,
sttbarrier.peer2LastTt[id] = Timestamp(0)
}
if len(peerIds) != len(sttbarrier.peer2LastTt) {
log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n")
log.Warn("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!")
}
return &sttbarrier

View File

@ -2,9 +2,11 @@ package proxyservice
import (
"context"
"log"
"sync"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -19,19 +21,19 @@ type TimeTick struct {
}
func (tt *TimeTick) Start() error {
log.Println("start time tick ...")
log.Debug("start time tick ...")
tt.wg.Add(1)
go func() {
defer tt.wg.Done()
for {
select {
case <-tt.ctx.Done():
log.Println("time tick loop was canceled by context!")
log.Debug("time tick loop was canceled by context!")
return
default:
current, err := tt.ttBarrier.GetTimeTick()
if err != nil {
log.Println("GetTimeTick error: ", err)
log.Error("GetTimeTick error", zap.Error(err))
break
}
msgPack := msgstream.MsgPack{}
@ -50,12 +52,12 @@ func (tt *TimeTick) Start() error {
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
for _, msg := range msgPack.Msgs {
log.Println("msg type xxxxxxxxxxxxxxxxxxxxxxxx", msg.Type())
log.Debug("proxyservice", zap.Stringer("msg type", msg.Type()))
}
for _, channel := range tt.channels {
err = channel.Broadcast(tt.ctx, &msgPack)
if err != nil {
log.Println("send time tick error: ", err)
log.Error("proxyservice", zap.String("send time tick error", err.Error()))
}
}
}