diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 9bba5e46a6..7ad616e882 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2269,6 +2269,15 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) Status: unhealthyStatus(), }, nil } + log := log.Ctx(ctx).With( + zap.String("role", typeutil.ProxyRole), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows), + ) method := "Insert" tr := timerecord.NewTimeRecorder(method) metrics.ProxyReceiveBytes.WithLabelValues( @@ -2319,14 +2328,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) } } - log.Debug("Enqueue insert request in Proxy", - zap.String("role", typeutil.ProxyRole), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.Int("len(FieldsData)", len(request.FieldsData)), - zap.Int("len(HashKeys)", len(request.HashKeys)), - zap.Uint32("NumRows", request.NumRows)) + log.Debug("Enqueue insert request in Proxy") if err := node.sched.dmQueue.Enqueue(it); err != nil { log.Warn("Failed to enqueue insert task: " + err.Error()) @@ -2335,14 +2337,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) return constructFailedResponse(err), nil } - log.Debug("Detail of insert request in Proxy", - zap.String("role", typeutil.ProxyRole), - zap.Uint64("BeginTS", it.BeginTs()), - zap.Uint64("EndTS", it.EndTs()), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.Uint32("NumRows", request.NumRows)) + log.Debug("Detail of insert request in Proxy") if err := it.WaitToFinish(); err != nil { log.Warn("Failed to execute insert task in task scheduler: " + err.Error()) @@ -2362,6 +2357,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) } setErrorIndex() + log.Warn("fail to insert data", zap.Uint32s("err_index", it.result.ErrIndex)) } // InsertCnt always equals to the number of entities in the request @@ -2383,7 +2379,13 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete") defer sp.End() - log := log.Ctx(ctx) + log := log.Ctx(ctx).With( + zap.String("role", typeutil.ProxyRole), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.String("expr", request.Expr), + ) log.Debug("Start processing delete request in Proxy") defer log.Debug("Finish processing delete request in Proxy") @@ -2426,12 +2428,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) chTicker: node.chTicker, } - log.Debug("Enqueue delete request in Proxy", - zap.String("role", typeutil.ProxyRole), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.String("expr", request.Expr)) + log.Debug("Enqueue delete request in Proxy") // MsgID will be set by Enqueue() if err := node.sched.dmQueue.Enqueue(dt); err != nil { @@ -2447,13 +2444,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - log.Debug("Detail of delete request in Proxy", - zap.String("role", typeutil.ProxyRole), - zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.String("expr", request.Expr)) + log.Debug("Detail of delete request in Proxy") if err := dt.WaitToFinish(); err != nil { log.Error("Failed to execute delete task in task scheduler: " + err.Error()) diff --git a/internal/proxy/lb_policy.go b/internal/proxy/lb_policy.go index b28f041c3f..a63bc067a5 100644 --- a/internal/proxy/lb_policy.go +++ b/internal/proxy/lb_policy.go @@ -186,6 +186,7 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad) error { dml2leaders, err := globalMetaCache.GetShards(ctx, true, workload.db, workload.collection) if err != nil { + log.Ctx(ctx).Warn("failed to get shards", zap.Error(err)) return err } diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index aad3df6f52..24d2faa604 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -163,14 +163,15 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { Timestamp: dt.BeginTs(), } + log := log.Ctx(ctx) collName := dt.deleteMsg.CollectionName if err := validateCollectionName(collName); err != nil { - log.Info("Invalid collection name", zap.String("collectionName", collName), zap.Error(err)) + log.Warn("Invalid collection name", zap.Error(err)) return err } collID, err := globalMetaCache.GetCollectionID(ctx, dt.deleteMsg.GetDbName(), collName) if err != nil { - log.Info("Failed to get collection id", zap.String("collectionName", collName), zap.Error(err)) + log.Warn("Failed to get collection id", zap.Error(err)) return err } dt.deleteMsg.CollectionID = collID @@ -178,6 +179,7 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { partitionKeyMode, err := isPartitionKeyMode(ctx, dt.deleteMsg.GetDbName(), dt.deleteMsg.CollectionName) if err != nil { + log.Warn("Failed to get partition key mode", zap.Error(err)) return err } if partitionKeyMode && len(dt.deleteMsg.PartitionName) != 0 { @@ -188,12 +190,12 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { if len(dt.deleteMsg.PartitionName) > 0 { partName := dt.deleteMsg.PartitionName if err := validatePartitionTag(partName, true); err != nil { - log.Info("Invalid partition name", zap.String("partitionName", partName), zap.Error(err)) + log.Info("Invalid partition name", zap.Error(err)) return err } partID, err := globalMetaCache.GetPartitionID(ctx, dt.deleteMsg.GetDbName(), collName, partName) if err != nil { - log.Info("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName), zap.Error(err)) + log.Info("Failed to get partition id", zap.Error(err)) return err } dt.deleteMsg.PartitionID = partID @@ -203,7 +205,7 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.deleteMsg.GetDbName(), collName) if err != nil { - log.Info("Failed to get collection schema", zap.String("collectionName", collName), zap.Error(err)) + log.Info("Failed to get collection schema", zap.Error(err)) return err } dt.schema = schema @@ -228,18 +230,22 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { dt.deleteMsg.Timestamps[index] = dt.BeginTs() } + log.Debug("pre delete done", zap.Int64("collection_id", dt.collectionID)) + return nil } func (dt *deleteTask) Execute(ctx context.Context) (err error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute") defer sp.End() + log := log.Ctx(ctx) tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID())) collID := dt.deleteMsg.CollectionID stream, err := dt.chMgr.getOrCreateDmlStream(collID) if err != nil { + log.Warn("fail to get or create dml stream", zap.Error(err)) return err } diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 350b8ff05e..aa571baed7 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -218,8 +218,10 @@ func (it *insertTask) Execute(ctx context.Context) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute insert %d", it.ID())) collectionName := it.insertMsg.CollectionName - collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), it.insertMsg.CollectionName) + collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName) + log := log.Ctx(ctx) if err != nil { + log.Warn("fail to get collection id", zap.Error(err)) return err } it.insertMsg.CollectionID = collID @@ -233,16 +235,13 @@ func (it *insertTask) Execute(ctx context.Context) error { channelNames, err := it.chMgr.getVChannels(collID) if err != nil { - log.Ctx(ctx).Warn("get vChannels failed", - zap.Int64("collectionID", collID), - zap.Error(err)) + log.Warn("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() return err } - log.Ctx(ctx).Debug("send insert request to virtual channels", - zap.String("collectionName", it.insertMsg.GetCollectionName()), + log.Debug("send insert request to virtual channels", zap.String("partition", it.insertMsg.GetPartitionName()), zap.Int64("collectionID", collID), zap.Strings("virtual_channels", channelNames), @@ -258,9 +257,7 @@ func (it *insertTask) Execute(ctx context.Context) error { msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner) } if err != nil { - log.Warn("assign segmentID and repack insert data failed", - zap.Int64("collectionID", collID), - zap.Error(err)) + log.Warn("assign segmentID and repack insert data failed", zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() return err @@ -268,11 +265,10 @@ func (it *insertTask) Execute(ctx context.Context) error { assignSegmentIDDur := tr.RecordSpan() log.Debug("assign segmentID for insert data success", - zap.Int64("collectionID", collID), - zap.String("collectionName", it.insertMsg.CollectionName), zap.Duration("assign segmentID duration", assignSegmentIDDur)) err = stream.Produce(msgPack) if err != nil { + log.Warn("fail to produce insert msg", zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() return err diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 19b60683c6..f72f27b3f7 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -402,6 +402,7 @@ func (t *queryTask) Execute(ctx context.Context) error { exec: t.queryShard, }) if err != nil { + log.Warn("fail to execute query", zap.Error(err)) return merr.WrapErrShardDelegatorQueryFailed(err.Error()) } @@ -442,6 +443,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error { t.result, err = reducer.Reduce(toReduceResults) if err != nil { + log.Warn("fail to reduce query result", zap.Error(err)) return err } t.result.OutputFields = t.userOutputFields diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 570d510c02..1e383daed2 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -210,6 +210,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { t.Base.MsgType = commonpb.MsgType_Search t.Base.SourceID = paramtable.GetNodeID() + log := log.Ctx(ctx) collectionName := t.request.CollectionName t.collectionName = collectionName @@ -224,6 +225,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { partitionKeyMode, err := isPartitionKeyMode(ctx, t.request.GetDbName(), collectionName) if err != nil { + log.Warn("is partition key mode failed", zap.Error(err)) return err } if partitionKeyMode && len(t.request.GetPartitionNames()) != 0 { @@ -232,9 +234,10 @@ func (t *searchTask) PreExecute(ctx context.Context) error { t.request.OutputFields, t.userOutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false) if err != nil { + log.Warn("translate output fields failed", zap.Error(err)) return err } - log.Ctx(ctx).Debug("translate output fields", + log.Debug("translate output fields", zap.Strings("output fields", t.request.GetOutputFields())) // fetch search_growing from search param @@ -254,6 +257,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { // Manually update nq if not set. nq, err := getNq(t.request) if err != nil { + log.Warn("failed to get nq", zap.Error(err)) return err } // Check if nq is valid: @@ -265,6 +269,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { outputFieldIDs, err := getOutputFieldIDs(t.schema, t.request.GetOutputFields()) if err != nil { + log.Warn("fail to get output field ids", zap.Error(err)) return err } t.SearchRequest.OutputFieldsId = outputFieldIDs @@ -290,23 +295,25 @@ func (t *searchTask) PreExecute(ctx context.Context) error { plan, err := planparserv2.CreateSearchPlan(t.schema, t.request.Dsl, annsField, queryInfo) if err != nil { - log.Ctx(ctx).Warn("failed to create query plan", zap.Error(err), + log.Warn("failed to create query plan", zap.Error(err), zap.String("dsl", t.request.Dsl), // may be very large if large term passed. zap.String("anns field", annsField), zap.Any("query info", queryInfo)) return fmt.Errorf("failed to create query plan: %v", err) } - log.Ctx(ctx).Debug("create query plan", + log.Debug("create query plan", zap.String("dsl", t.request.Dsl), // may be very large if large term passed. zap.String("anns field", annsField), zap.Any("query info", queryInfo)) if partitionKeyMode { expr, err := ParseExprFromPlan(plan) if err != nil { + log.Warn("failed to parse expr", zap.Error(err)) return err } partitionKeys := ParsePartitionKeys(expr) hashedPartitionNames, err := assignPartitionKeys(ctx, t.request.GetDbName(), collectionName, partitionKeys) if err != nil { + log.Warn("failed to assign partition keys", zap.Error(err)) return err } @@ -321,6 +328,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { estimateSize, err := t.estimateResultSize(nq, t.SearchRequest.Topk) if err != nil { + log.Warn("failed to estimate result size", zap.Error(err)) return err } if estimateSize >= requeryThreshold { @@ -333,7 +341,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { return err } - log.Ctx(ctx).Debug("Proxy::searchTask::PreExecute", + log.Debug("Proxy::searchTask::PreExecute", zap.Int64s("plan.OutputFieldIds", plan.GetOutputFieldIds()), zap.String("plan", plan.String())) // may be very large if large term passed. } @@ -341,6 +349,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { // translate partition name to partition ids. Use regex-pattern to match partition name. t.SearchRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, partitionNames) if err != nil { + log.Warn("failed to get partition ids", zap.Error(err)) return err } @@ -354,7 +363,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } collectionInfo, err2 := globalMetaCache.GetCollectionInfo(ctx, t.request.GetDbName(), collectionName) if err2 != nil { - log.Ctx(ctx).Debug("Proxy::searchTask::PreExecute failed to GetCollectionInfo from cache", + log.Warn("Proxy::searchTask::PreExecute failed to GetCollectionInfo from cache", zap.Any("collectionName", collectionName), zap.Error(err2)) return err2 } @@ -388,7 +397,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { t.SearchRequest.Username = username } - log.Ctx(ctx).Debug("search PreExecute done.", + log.Debug("search PreExecute done.", zap.Uint64("travel_ts", travelTimestamp), zap.Uint64("guarantee_ts", guaranteeTs), zap.Bool("use_default_consistency", useDefaultConsistency), zap.Any("consistency level", consistencyLevel), @@ -414,6 +423,7 @@ func (t *searchTask) Execute(ctx context.Context) error { exec: t.searchShard, }) if err != nil { + log.Warn("search execute failed", zap.Error(err)) return merr.WrapErrShardDelegatorSearchFailed(err.Error()) } @@ -431,6 +441,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error { defer func() { tr.CtxElapse(ctx, "done") }() + log := log.Ctx(ctx) var ( Nq = t.SearchRequest.GetNq() @@ -439,6 +450,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error { ) toReduceResults, err := t.collectSearchResults(ctx) if err != nil { + log.Warn("failed to collect search results", zap.Error(err)) return err } @@ -450,31 +462,34 @@ func (t *searchTask) PostExecute(ctx context.Context) error { tr.CtxRecord(ctx, "decodeResultStart") validSearchResults, err := decodeSearchResults(ctx, toReduceResults) if err != nil { + log.Warn("failed to decode search results", zap.Error(err)) return err } metrics.ProxyDecodeResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds())) if len(validSearchResults) <= 0 { - log.Ctx(ctx).Warn("search result is empty") + log.Warn("search result is empty") t.fillInEmptyResult(Nq) return nil } // Reduce all search results - log.Ctx(ctx).Debug("proxy search post execute reduce", + log.Debug("proxy search post execute reduce", zap.Int64("collection", t.GetCollectionID()), zap.Int64s("partitionIDs", t.GetPartitionIDs()), zap.Int("number of valid search results", len(validSearchResults))) tr.CtxRecord(ctx, "reduceResultStart") primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(t.schema) if err != nil { + log.Warn("failed to get primary field schema", zap.Error(err)) return err } t.result, err = reduceSearchResultData(ctx, validSearchResults, Nq, Topk, MetricType, primaryFieldSchema.DataType, t.offset) if err != nil { + log.Warn("failed to reduce search results", zap.Error(err)) return err } @@ -486,12 +501,13 @@ func (t *searchTask) PostExecute(ctx context.Context) error { if t.requery { err = t.Requery() if err != nil { + log.Warn("failed to requery", zap.Error(err)) return err } } t.result.Results.OutputFields = t.userOutputFields - log.Ctx(ctx).Debug("Search post execute done", + log.Debug("Search post execute done", zap.Int64("collection", t.GetCollectionID()), zap.Int64s("partitionIDs", t.GetPartitionIDs())) return nil diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 8642b8ca1f..773e863604 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -292,7 +292,7 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any return generic.Zero[T](), err } if IsCrossClusterRoutingErr(err) { - log.Warn("CrossClusterRoutingErr, start to reset connection", + log.Ctx(ctx).Warn("CrossClusterRoutingErr, start to reset connection", zap.String("role", c.GetRole()), zap.Error(err), ) @@ -303,7 +303,7 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any log.Ctx(ctx).Warn("ClientBase:isNotGrpcErr", zap.Error(err)) return generic.Zero[T](), err } - log.Info("ClientBase grpc error, start to reset connection", + log.Ctx(ctx).Info("ClientBase grpc error, start to reset connection", zap.String("role", c.GetRole()), zap.Error(err), )