mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Logging traceID in process of processing delete request (#11374)
Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
This commit is contained in:
parent
639800241d
commit
0eb71f7ee5
@ -175,7 +175,10 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||||||
msg.SetTraceCtx(ctx)
|
msg.SetTraceCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range fgMsg.deleteMessages {
|
for i, msg := range fgMsg.deleteMessages {
|
||||||
|
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||||
|
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))
|
||||||
|
|
||||||
if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil {
|
if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil {
|
||||||
log.Error("buffer delete msg failed", zap.Error(err))
|
log.Error("buffer delete msg failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1367,8 +1367,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||||||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
|
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||||
log.Info("Delete request begin", zap.String("traceID", traceID))
|
log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
|
||||||
defer log.Info("Delete request end", zap.String("traceID", traceID))
|
defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
|
||||||
|
|
||||||
if !node.checkHealthy() {
|
if !node.checkHealthy() {
|
||||||
return &milvuspb.MutationResult{
|
return &milvuspb.MutationResult{
|
||||||
@ -1405,7 +1405,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||||||
chTicker: node.chTicker,
|
chTicker: node.chTicker,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Delete request enqueue",
|
log.Debug("Enqueue delete request in Proxy",
|
||||||
zap.String("role", Params.RoleName),
|
zap.String("role", Params.RoleName),
|
||||||
zap.String("db", request.DbName),
|
zap.String("db", request.DbName),
|
||||||
zap.String("collection", request.CollectionName),
|
zap.String("collection", request.CollectionName),
|
||||||
@ -1423,14 +1423,15 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Delete request detail",
|
log.Debug("Detail of delete request in Proxy",
|
||||||
zap.String("role", Params.RoleName),
|
zap.String("role", Params.RoleName),
|
||||||
zap.Int64("msgID", dt.Base.MsgID),
|
zap.Int64("msgID", dt.Base.MsgID),
|
||||||
zap.Uint64("timestamp", dt.Base.Timestamp),
|
zap.Uint64("timestamp", dt.Base.Timestamp),
|
||||||
zap.String("db", request.DbName),
|
zap.String("db", request.DbName),
|
||||||
zap.String("collection", request.CollectionName),
|
zap.String("collection", request.CollectionName),
|
||||||
zap.String("partition", request.PartitionName),
|
zap.String("partition", request.PartitionName),
|
||||||
zap.String("expr", request.Expr))
|
zap.String("expr", request.Expr),
|
||||||
|
zap.String("traceID", traceID))
|
||||||
|
|
||||||
if err := dt.WaitToFinish(); err != nil {
|
if err := dt.WaitToFinish(); err != nil {
|
||||||
log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
|
log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
|
||||||
|
|||||||
@ -49,7 +49,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 1. filter segment by bloom filter
|
// 1. filter segment by bloom filter
|
||||||
for _, delMsg := range dMsg.deleteMessages {
|
for i, delMsg := range dMsg.deleteMessages {
|
||||||
|
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||||
|
log.Info("Process delete request in QueryNode", zap.String("traceID", traceID))
|
||||||
|
|
||||||
if dNode.replica.getSegmentNum() != 0 {
|
if dNode.replica.getSegmentNum() != 0 {
|
||||||
log.Debug("delete in historical replica",
|
log.Debug("delete in historical replica",
|
||||||
zap.Any("collectionID", delMsg.CollectionID),
|
zap.Any("collectionID", delMsg.CollectionID),
|
||||||
|
|||||||
@ -70,7 +70,9 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range msgStreamMsg.TsMessages() {
|
for i, msg := range msgStreamMsg.TsMessages() {
|
||||||
|
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||||
|
log.Info("Filter invalid message in QueryNode", zap.String("traceID", traceID))
|
||||||
switch msg.Type() {
|
switch msg.Type() {
|
||||||
case commonpb.MsgType_Insert:
|
case commonpb.MsgType_Insert:
|
||||||
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
|
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user