mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
issue: https://github.com/milvus-io/milvus/issues/43473 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
286 lines
9.8 KiB
Go
286 lines
9.8 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"time"
|
|
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
|
"github.com/milvus-io/milvus/internal/datacoord/session"
|
|
"github.com/milvus-io/milvus/internal/json"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
var _ ImportTask = (*importTask)(nil)
|
|
|
|
type importTask struct {
|
|
task atomic.Pointer[datapb.ImportTaskV2]
|
|
|
|
alloc allocator.Allocator
|
|
meta *meta
|
|
importMeta ImportMeta
|
|
tr *timerecord.TimeRecorder
|
|
times *taskcommon.Times
|
|
retryTimes int64
|
|
}
|
|
|
|
func (t *importTask) GetJobID() int64 {
|
|
return t.task.Load().GetJobID()
|
|
}
|
|
|
|
func (t *importTask) GetTaskID() int64 {
|
|
return t.task.Load().GetTaskID()
|
|
}
|
|
|
|
func (t *importTask) GetCollectionID() int64 {
|
|
return t.task.Load().GetCollectionID()
|
|
}
|
|
|
|
func (t *importTask) GetNodeID() int64 {
|
|
return t.task.Load().GetNodeID()
|
|
}
|
|
|
|
func (t *importTask) GetState() datapb.ImportTaskStateV2 {
|
|
return t.task.Load().GetState()
|
|
}
|
|
|
|
func (t *importTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) {
|
|
t.times.SetTaskTime(timeType, time)
|
|
}
|
|
|
|
func (t *importTask) GetTaskTime(timeType taskcommon.TimeType) time.Time {
|
|
return timeType.GetTaskTime(t.times)
|
|
}
|
|
|
|
func (t *importTask) GetTaskVersion() int64 {
|
|
return t.retryTimes
|
|
}
|
|
|
|
func (t *importTask) GetReason() string {
|
|
return t.task.Load().GetReason()
|
|
}
|
|
|
|
func (t *importTask) GetFileStats() []*datapb.ImportFileStats {
|
|
return t.task.Load().GetFileStats()
|
|
}
|
|
|
|
func (t *importTask) GetSegmentIDs() []int64 {
|
|
return t.task.Load().GetSegmentIDs()
|
|
}
|
|
|
|
func (t *importTask) GetSortedSegmentIDs() []int64 {
|
|
return t.task.Load().GetSortedSegmentIDs()
|
|
}
|
|
|
|
func (t *importTask) GetSource() datapb.ImportTaskSourceV2 {
|
|
return t.task.Load().GetSource()
|
|
}
|
|
|
|
func (t *importTask) GetCreatedTime() string {
|
|
return t.task.Load().GetCreatedTime()
|
|
}
|
|
|
|
func (t *importTask) GetCompleteTime() string {
|
|
return t.task.Load().GetCompleteTime()
|
|
}
|
|
|
|
func (t *importTask) GetTaskType() taskcommon.Type {
|
|
return taskcommon.Import
|
|
}
|
|
|
|
func (t *importTask) GetTaskState() taskcommon.State {
|
|
return taskcommon.FromImportState(t.GetState())
|
|
}
|
|
|
|
func (t *importTask) GetTaskNodeID() int64 {
|
|
return t.GetNodeID()
|
|
}
|
|
|
|
func (t *importTask) GetTaskSlot() int64 {
|
|
return int64(CalculateTaskSlot(t, t.importMeta))
|
|
}
|
|
|
|
func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
|
|
log.Info("processing pending import task...", WrapTaskLog(t)...)
|
|
job := t.importMeta.GetJob(context.TODO(), t.GetJobID())
|
|
req, err := AssembleImportRequest(t, job, t.meta, t.alloc)
|
|
if err != nil {
|
|
log.Warn("assemble import request failed", WrapTaskLog(t, zap.Error(err))...)
|
|
return
|
|
}
|
|
err = cluster.CreateImport(nodeID, req, t.GetTaskSlot())
|
|
if err != nil {
|
|
log.Warn("import failed", WrapTaskLog(t, zap.Error(err))...)
|
|
t.retryTimes++
|
|
return
|
|
}
|
|
err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(),
|
|
UpdateState(datapb.ImportTaskStateV2_InProgress),
|
|
UpdateNodeID(nodeID))
|
|
if err != nil {
|
|
log.Warn("update import task failed", WrapTaskLog(t, zap.Error(err))...)
|
|
return
|
|
}
|
|
pendingDuration := t.GetTR().RecordSpan()
|
|
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
|
|
log.Info("import task start to execute", WrapTaskLog(t, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
|
|
}
|
|
|
|
func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
|
|
req := &datapb.QueryImportRequest{
|
|
JobID: t.GetJobID(),
|
|
TaskID: t.GetTaskID(),
|
|
}
|
|
resp, err := cluster.QueryImport(t.GetNodeID(), req)
|
|
if err != nil || resp.GetState() == datapb.ImportTaskStateV2_Retry {
|
|
updateErr := t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
|
|
if updateErr != nil {
|
|
log.Warn("failed to update import task state to pending", WrapTaskLog(t, zap.Error(updateErr))...)
|
|
}
|
|
log.Info("reset import task state to pending due to error occurs", WrapTaskLog(t, zap.Error(err), zap.String("reason", resp.GetReason()))...)
|
|
return
|
|
}
|
|
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
|
|
err = t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
|
|
if err != nil {
|
|
log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(err))
|
|
}
|
|
log.Warn("import failed", WrapTaskLog(t, zap.String("reason", resp.GetReason()))...)
|
|
return
|
|
}
|
|
|
|
collInfo := t.meta.GetCollection(t.GetCollectionID())
|
|
dbName := ""
|
|
if collInfo != nil {
|
|
dbName = collInfo.DatabaseName
|
|
}
|
|
|
|
if resp.GetState() == datapb.ImportTaskStateV2_InProgress || resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
|
for _, info := range resp.GetImportSegmentsInfo() {
|
|
segment := t.meta.GetSegment(context.TODO(), info.GetSegmentID())
|
|
if info.GetImportedRows() <= segment.GetNumOfRows() {
|
|
continue // rows not changed, no need to update
|
|
}
|
|
diff := info.GetImportedRows() - segment.GetNumOfRows()
|
|
op := UpdateImportedRows(info.GetSegmentID(), info.GetImportedRows())
|
|
err = t.meta.UpdateSegmentsInfo(context.TODO(), op)
|
|
if err != nil {
|
|
log.Warn("update import segment rows failed", WrapTaskLog(t, zap.Error(err))...)
|
|
return
|
|
}
|
|
log.Info("update import segment rows done", WrapTaskLog(t, zap.Int64("segmentID", info.GetSegmentID()), zap.Int64("importedRows", info.GetImportedRows()))...)
|
|
|
|
metrics.DataCoordBulkVectors.WithLabelValues(
|
|
dbName,
|
|
strconv.FormatInt(t.GetCollectionID(), 10),
|
|
).Add(float64(diff))
|
|
}
|
|
}
|
|
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
|
for _, info := range resp.GetImportSegmentsInfo() {
|
|
// try to parse path and fill logID
|
|
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs())
|
|
if err != nil {
|
|
log.Warn("fail to CompressBinLogs for import binlogs",
|
|
WrapTaskLog(t, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
|
|
return
|
|
}
|
|
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
|
|
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
|
|
err = t.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
|
|
if err != nil {
|
|
updateErr := t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
|
if updateErr != nil {
|
|
log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(updateErr))
|
|
}
|
|
log.Warn("update import segment binlogs failed", WrapTaskLog(t, zap.String("err", err.Error()))...)
|
|
return
|
|
}
|
|
log.Info("update import segment info done", WrapTaskLog(t, zap.Int64("segmentID", info.GetSegmentID()), zap.Any("segmentInfo", info))...)
|
|
}
|
|
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
|
|
err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))
|
|
if err != nil {
|
|
log.Warn("update import task failed", WrapTaskLog(t, zap.Error(err))...)
|
|
return
|
|
}
|
|
importDuration := t.GetTR().RecordSpan()
|
|
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
|
|
log.Info("import done", WrapTaskLog(t, zap.Duration("taskTimeCost/import", importDuration))...)
|
|
}
|
|
log.Info("query import", WrapTaskLog(t, zap.String("respState", resp.GetState().String()),
|
|
zap.String("reason", resp.GetReason()))...)
|
|
}
|
|
|
|
func (t *importTask) DropTaskOnWorker(cluster session.Cluster) {
|
|
err := DropImportTask(t, cluster, t.importMeta)
|
|
if err != nil {
|
|
log.Warn("drop import failed", WrapTaskLog(t, zap.Error(err))...)
|
|
return
|
|
}
|
|
log.Info("drop import task done", WrapTaskLog(t, zap.Int64("nodeID", t.GetNodeID()))...)
|
|
}
|
|
|
|
func (t *importTask) GetType() TaskType {
|
|
return ImportTaskType
|
|
}
|
|
|
|
func (t *importTask) GetTR() *timerecord.TimeRecorder {
|
|
return t.tr
|
|
}
|
|
|
|
func (t *importTask) Clone() ImportTask {
|
|
cloned := &importTask{
|
|
alloc: t.alloc,
|
|
meta: t.meta,
|
|
importMeta: t.importMeta,
|
|
tr: t.tr,
|
|
times: t.times,
|
|
}
|
|
cloned.task.Store(typeutil.Clone(t.task.Load()))
|
|
return cloned
|
|
}
|
|
|
|
func (t *importTask) MarshalJSON() ([]byte, error) {
|
|
importTask := metricsinfo.ImportTask{
|
|
JobID: t.GetJobID(),
|
|
TaskID: t.GetTaskID(),
|
|
CollectionID: t.GetCollectionID(),
|
|
NodeID: t.GetNodeID(),
|
|
State: t.GetState().String(),
|
|
Reason: t.GetReason(),
|
|
TaskType: t.GetType().String(),
|
|
CreatedTime: t.GetCreatedTime(),
|
|
CompleteTime: t.GetCompleteTime(),
|
|
}
|
|
return json.Marshal(importTask)
|
|
}
|