enhance: improve the handling for segcore error (#29471)

- fix lost exception details in segcore
- improve the logs of handling errors from segcore

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2023-12-26 14:06:46 +08:00 committed by GitHub
parent 4c979538a4
commit b8318fcd7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 11 deletions

View File

@ -774,7 +774,7 @@ LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
}
channel->close();
} catch (std::exception e) {
} catch (std::exception& e) {
LOG_INFO("failed to load data from remote: {}", e.what());
channel->close(std::move(e));
}

View File

@ -40,7 +40,7 @@ import (
)
// HandleCStatus deals with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
func HandleCStatus(status *C.CStatus, extraInfo string, fields ...zap.Field) error {
if status.error_code == 0 {
return nil
}
@ -55,7 +55,7 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error {
finalMsg := fmt.Sprintf("%s: %s", errorName, errorMsg)
logMsg := fmt.Sprintf("%s, segcore error: %s\n", extraInfo, finalMsg)
log := log.With().WithOptions(zap.AddCallerSkip(1))
log.Warn(logMsg)
log.Warn(logMsg, fields...)
return errors.New(finalMsg)
}

View File

@ -188,7 +188,11 @@ func NewSegment(collection *Collection,
var newPtr C.CSegmentInterface
_, err := GetDynamicPool().Submit(func() (any, error) {
status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(segmentID), &newPtr)
err := HandleCStatus(&status, "NewSegmentFailed")
err := HandleCStatus(&status, "NewSegmentFailed",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.String("segmentType", segmentType.String()))
return nil, err
}).Await()
if err != nil {
@ -383,7 +387,10 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Search failed"); err != nil {
if err := HandleCStatus(&status, "Search failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.typ.String())); err != nil {
return nil, err
}
log.Debug("search segment done")
@ -435,7 +442,12 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Retrieve failed"); err != nil {
if err := HandleCStatus(&status, "Retrieve failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("msgID", plan.msgID),
zap.String("segmentType", s.typ.String())); err != nil {
return nil, err
}
@ -665,7 +677,10 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadMultiFieldData failed"); err != nil {
if err := HandleCStatus(&status, "LoadMultiFieldData failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
return err
}
@ -720,7 +735,11 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadFieldData failed"); err != nil {
if err := HandleCStatus(&status, "LoadFieldData failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", fieldID)); err != nil {
return err
}
@ -771,7 +790,10 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "AddFieldDataInfo failed"); err != nil {
if err := HandleCStatus(&status, "AddFieldDataInfo failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
return err
}
@ -844,7 +866,10 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadDeletedRecord failed"); err != nil {
if err := HandleCStatus(&status, "LoadDeletedRecord failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
return err
}
@ -901,7 +926,11 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
return nil, nil
}).Await()
if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed"); err != nil {
if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID)); err != nil {
return err
}
log.Info("updateSegmentIndex done")