mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add more log for indexservice and indexnode
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
2c302c40a8
commit
30cc84b164
@ -48,6 +48,7 @@ type IndexNode struct {
|
||||
}
|
||||
|
||||
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
|
||||
log.Debug("new index node ...")
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
b := &IndexNode{
|
||||
@ -142,6 +143,14 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) {
|
||||
}
|
||||
|
||||
func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("indexnode building index ...",
|
||||
zap.Int64("IndexBuildID", request.IndexBuildID),
|
||||
zap.String("Indexname", request.IndexName),
|
||||
zap.Int64("IndexID", request.IndexID),
|
||||
zap.Strings("DataPaths", request.DataPaths),
|
||||
zap.Any("TypeParams", request.TypeParams),
|
||||
zap.Any("IndexParams", request.IndexParams))
|
||||
|
||||
t := &IndexBuildTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: ctx,
|
||||
@ -168,6 +177,7 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR
|
||||
}
|
||||
|
||||
func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("indexnode drop index ...", zap.Int64("index id", request.IndexID))
|
||||
i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID)
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -186,7 +196,7 @@ func (i *IndexNode) AddCloseCallback(callbacks ...func()) {
|
||||
}
|
||||
|
||||
func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
|
||||
log.Debug("get indexnode components states ...")
|
||||
stateInfo := &internalpb.ComponentInfo{
|
||||
NodeID: Params.NodeID,
|
||||
Role: "NodeImpl",
|
||||
@ -200,10 +210,17 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
}
|
||||
|
||||
log.Debug("indexnode compoents states",
|
||||
zap.Any("State", ret.State),
|
||||
zap.Any("Status", ret.Status),
|
||||
zap.Any("SubcomponentStates", ret.SubcomponentStates))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexnode time tick channel ...")
|
||||
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -212,6 +229,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes
|
||||
}
|
||||
|
||||
func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexnode statistics channel ...")
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
||||
@ -153,6 +153,7 @@ func (i *IndexService) UpdateStateCode(code internalpb.StateCode) {
|
||||
}
|
||||
|
||||
func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
log.Debug("get indexservice component states ...")
|
||||
stateInfo := &internalpb.ComponentInfo{
|
||||
NodeID: i.ID,
|
||||
Role: "IndexService",
|
||||
@ -170,6 +171,7 @@ func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.Comp
|
||||
}
|
||||
|
||||
func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexservice time tick channel ...")
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -180,6 +182,7 @@ func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.String
|
||||
}
|
||||
|
||||
func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexservice statistics channel ...")
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -190,7 +193,13 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
|
||||
}
|
||||
|
||||
func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
|
||||
log.Debug("builder building index", zap.String("indexName = ", req.IndexName), zap.Int64("indexID = ", req.IndexID), zap.Strings("dataPath = ", req.DataPaths))
|
||||
log.Debug("indexservice building index ...",
|
||||
zap.Int64("IndexBuildID", req.IndexBuildID),
|
||||
zap.String("IndexName = ", req.IndexName),
|
||||
zap.Int64("IndexID = ", req.IndexID),
|
||||
zap.Strings("DataPath = ", req.DataPaths),
|
||||
zap.Any("TypeParams", req.TypeParams),
|
||||
zap.Any("IndexParams", req.IndexParams))
|
||||
ret := &indexpb.BuildIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
@ -245,6 +254,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe
|
||||
}
|
||||
|
||||
func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
|
||||
log.Debug("get index states ...", zap.Int64s("IndexBuildIDs", req.IndexBuildIDs))
|
||||
var indexStates []*indexpb.IndexInfo
|
||||
for _, indexID := range req.IndexBuildIDs {
|
||||
indexState, err := i.metaTable.GetIndexState(indexID)
|
||||
@ -259,10 +269,16 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex
|
||||
},
|
||||
States: indexStates,
|
||||
}
|
||||
log.Debug("get index states success")
|
||||
log.Debug("get index states",
|
||||
zap.Any("index status", ret.Status),
|
||||
zap.Any("index states", ret.States))
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("indexservice dropping index ...", zap.Int64("indexID", req.IndexID))
|
||||
i.sched.IndexAddQueue.tryToRemoveUselessIndexAddTask(req.IndexID)
|
||||
|
||||
err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
|
||||
@ -286,12 +302,14 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ
|
||||
}()
|
||||
}()
|
||||
|
||||
log.Debug("indexservice drop index success")
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
|
||||
log.Debug("indexservice", zap.Int64s("get index file paths", req.IndexBuildIDs))
|
||||
var indexPaths []*indexpb.IndexFilePathInfo = nil
|
||||
|
||||
for _, indexID := range req.IndexBuildIDs {
|
||||
@ -301,6 +319,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
|
||||
}
|
||||
indexPaths = append(indexPaths, indexPathInfo)
|
||||
}
|
||||
log.Debug("indexservice, get index file paths success")
|
||||
|
||||
ret := &indexpb.GetIndexFilePathsResponse{
|
||||
Status: &commonpb.Status{
|
||||
@ -308,10 +327,16 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
|
||||
},
|
||||
FilePaths: indexPaths,
|
||||
}
|
||||
log.Debug("indexservice", zap.Any("index file paths", ret.FilePaths))
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("indexservice",
|
||||
zap.Int64("notify build index", nty.IndexBuildID),
|
||||
zap.Strings("index file paths", nty.IndexFilePaths),
|
||||
zap.Int64("node ID", nty.NodeID))
|
||||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}
|
||||
|
||||
@ -76,6 +76,7 @@ func (mt *metaTable) saveIndexMeta(meta *indexpb.IndexMeta) error {
|
||||
func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
log.Debug("indexservice add index ...")
|
||||
_, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if ok {
|
||||
return fmt.Errorf("index already exists with ID = %d", indexBuildID)
|
||||
@ -92,6 +93,8 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
log.Debug("indexservice", zap.Int64("mark index is deleted", indexID))
|
||||
|
||||
for indexBuildID, meta := range mt.indexBuildID2Meta {
|
||||
if meta.Req.IndexID == indexID {
|
||||
meta.State = commonpb.IndexState_Deleted
|
||||
@ -105,6 +108,8 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
|
||||
func (mt *metaTable) NotifyBuildIndex(nty *indexpb.NotifyBuildIndexRequest) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
log.Debug("indexservice", zap.Int64("notify build index", nty.IndexBuildID))
|
||||
indexBuildID := nty.IndexBuildID
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
|
||||
@ -5,7 +5,10 @@ import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
@ -56,6 +59,7 @@ func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair {
|
||||
}
|
||||
|
||||
func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
|
||||
log.Debug("indexservice", zap.Any("register index node, node address = ", req.Address))
|
||||
ret := &indexpb.RegisterNodeResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user