From a7b24cbc537e4ea82219ee927defba7b6918121d Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 12 Dec 2022 10:57:22 +0800 Subject: [PATCH] Move APIs of types.DataNode to services.go (#21042) Signed-off-by: yangxuan Signed-off-by: yangxuan --- internal/datanode/channel_meta.go | 10 + internal/datanode/data_node.go | 946 +---------------- internal/datanode/data_node_test.go | 741 +------------ .../flow_graph_insert_buffer_node_test.go | 14 +- internal/datanode/segment.go | 10 - internal/datanode/services.go | 974 ++++++++++++++++++ internal/datanode/services_test.go | 746 ++++++++++++++ 7 files changed, 1748 insertions(+), 1693 deletions(-) create mode 100644 internal/datanode/services.go create mode 100644 internal/datanode/services_test.go diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 1030346443..ba95373103 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -103,6 +103,16 @@ type ChannelMeta struct { chunkManager storage.ChunkManager } +type addSegmentReq struct { + segType datapb.SegmentType + segID, collID, partitionID UniqueID + numOfRows int64 + startPos, endPos *internalpb.MsgPosition + statsBinLogs []*datapb.FieldBinlog + recoverTs Timestamp + importing bool +} + var _ Channel = &ChannelMeta{} func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 31ba43705f..5c0f7e6924 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -27,7 +27,6 @@ import ( "math/rand" "os" "path" - "strconv" "strings" "sync" "sync/atomic" @@ -36,35 +35,27 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/commonpb" - "github.com/milvus-io/milvus-proto/go-api/milvuspb" - "github.com/milvus-io/milvus-proto/go-api/schemapb" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + allocator2 "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/metrics" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/logutil" - "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/internal/util/timerecord" - "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" ) const ( @@ -113,7 +104,6 @@ type DataNode struct { ctx context.Context cancel context.CancelFunc Role string - State atomic.Value // commonpb.StateCode_Initializing stateCode atomic.Value // commonpb.StateCode_Initializing flowgraphManager *flowgraphManager eventManagerMap sync.Map // vchannel name -> channelEventManager @@ -536,36 +526,6 @@ func (node *DataNode) isHealthy() bool { return node.GetStateCode() == commonpb.StateCode_Healthy } -// WatchDmChannels is not in use -func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { - log.Warn("DataNode WatchDmChannels is not in use") - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "watchDmChannels do nothing", - }, nil -} - -// GetComponentStates will return current state of DataNode -func (node *DataNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { - log.Debug("DataNode current state", zap.Any("State", node.stateCode.Load())) - nodeID := common.NotRegisteredID - if node.session != nil && node.session.Registered() { - nodeID = node.session.ServerID - } - states := &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{ - // NodeID: Params.NodeID, // will race with DataNode.Register() - NodeID: nodeID, - Role: node.Role, - StateCode: node.stateCode.Load().(commonpb.StateCode), - }, - SubcomponentStates: make([]*milvuspb.ComponentInfo, 0), - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - } - return states, nil -} - // ReadyToFlush tells whether DataNode is ready for flushing func (node *DataNode) ReadyToFlush() error { if !node.isHealthy() { @@ -574,108 +534,6 @@ func (node *DataNode) ReadyToFlush() error { return nil } -// FlushSegments packs flush messages into flowGraph through flushChan. -// -// DataCoord calls FlushSegments if the segment is seal&flush only. -// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. -// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. -func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { - metrics.DataNodeFlushReqCounter.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - MetricRequestsTotal).Inc() - - errStatus := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } - - if !node.isHealthy() { - errStatus.Reason = "dataNode not in HEALTHY state" - return errStatus, nil - } - - if req.GetBase().GetTargetID() != node.session.ServerID { - log.Warn("flush segment target id not matched", - zap.Int64("targetID", req.GetBase().GetTargetID()), - zap.Int64("serverID", node.session.ServerID), - ) - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_NodeIDNotMatch, - Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID), - } - return status, nil - } - - log.Info("receiving FlushSegments request", - zap.Int64("collectionID", req.GetCollectionID()), - zap.Int64s("sealedSegments", req.GetSegmentIDs()), - ) - - segmentIDs := req.GetSegmentIDs() - var flushedSeg []UniqueID - for _, segID := range segmentIDs { - // if the segment in already being flushed, skip it. - if node.segmentCache.checkIfCached(segID) { - logDupFlush(req.GetCollectionID(), segID) - continue - } - // Get the flush channel for the given segment ID. - // If no flush channel is found, report an error. - flushCh, err := node.flowgraphManager.getFlushCh(segID) - if err != nil { - errStatus.Reason = "no flush channel found for the segment, unable to flush" - log.Error(errStatus.Reason, zap.Int64("segmentID", segID), zap.Error(err)) - return errStatus, nil - } - - // Double check that the segment is still not cached. - // Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed. - exist := node.segmentCache.checkOrCache(segID) - if exist { - logDupFlush(req.GetCollectionID(), segID) - continue - } - // flushedSeg is only for logging purpose. - flushedSeg = append(flushedSeg, segID) - // Send the segment to its flush channel. - flushCh <- flushMsg{ - msgID: req.GetBase().GetMsgID(), - timestamp: req.GetBase().GetTimestamp(), - segmentID: segID, - collectionID: req.GetCollectionID(), - } - } - - // Log success flushed segments. - if len(flushedSeg) > 0 { - log.Info("sending segments to flush channel", - zap.Int64("collectionID", req.GetCollectionID()), - zap.Int64s("sealedSegments", flushedSeg)) - } - - metrics.DataNodeFlushReqCounter.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil -} - -// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message. -// It returns a list of segments to be sent. -func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) { - log.Info("start resending segment stats, if any", - zap.Int64("DataNode ID", paramtable.GetNodeID())) - segResent := node.flowgraphManager.resendTT() - log.Info("found segment(s) with stats to resend", - zap.Int64s("segment IDs", segResent)) - return &datapb.ResendSegmentStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - SegResent: segResent, - }, nil -} - // Stop will release DataNode resources and shutdown datanode func (node *DataNode) Stop() error { // https://github.com/milvus-io/milvus/issues/12282 @@ -700,793 +558,3 @@ func (node *DataNode) Stop() error { return nil } - -// GetTimeTickChannel currently do nothing -func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - }, nil -} - -// GetStatisticsChannel currently do nothing -func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - }, nil -} - -// ShowConfigurations returns the configurations of DataNode matching req.Pattern -func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { - log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern)) - if !node.isHealthy() { - log.Warn("DataNode.ShowConfigurations failed", - zap.Int64("nodeId", paramtable.GetNodeID()), - zap.String("req", req.Pattern), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) - - return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, - Configuations: nil, - }, nil - } - configList := make([]*commonpb.KeyValuePair, 0) - for key, value := range Params.GetComponentConfigurations(ctx, "datanode", req.Pattern) { - configList = append(configList, - &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - - return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Configuations: configList, - }, nil -} - -// GetMetrics return datanode metrics -func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - if !node.isHealthy() { - log.Warn("DataNode.GetMetrics failed", - zap.Int64("node_id", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, - }, nil - } - - metricType, err := metricsinfo.ParseMetricType(req.Request) - if err != nil { - log.Warn("DataNode.GetMetrics failed to parse metric type", - zap.Int64("node_id", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.Error(err)) - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), - }, - }, nil - } - - if metricType == metricsinfo.SystemInfoMetrics { - systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req) - if err != nil { - log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err)) - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), - }, - }, nil - } - - return systemInfoMetrics, nil - } - - log.Debug("DataNode.GetMetrics failed, request metric type is not implemented yet", - zap.Int64("node_id", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.String("metric_type", metricType)) - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - }, nil -} - -// Compaction handles compaction request from DataCoord -// returns status as long as compaction task enqueued or invalid -func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } - - ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel()) - if !ok { - log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel())) - status.Reason = errIllegalCompactionPlan.Error() - return status, nil - } - - if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) { - log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel())) - status.Reason = "channel marked invalid" - return status, nil - } - - binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} - task := newCompactionTask( - node.ctx, - binlogIO, binlogIO, - ds.channel, - ds.flushManager, - ds.idAllocator, - req, - node.chunkManager, - ) - - node.compactionExecutor.execute(task) - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil -} - -// GetCompactionState called by DataCoord -// return status of all compaction plans -func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { - if !node.isHealthy() { - return &datapb.CompactionStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DataNode is unhealthy", - }, - }, nil - } - results := make([]*datapb.CompactionStateResult, 0) - node.compactionExecutor.executing.Range(func(k, v any) bool { - results = append(results, &datapb.CompactionStateResult{ - State: commonpb.CompactionState_Executing, - PlanID: k.(UniqueID), - }) - return true - }) - node.compactionExecutor.completed.Range(func(k, v any) bool { - results = append(results, &datapb.CompactionStateResult{ - State: commonpb.CompactionState_Completed, - PlanID: k.(UniqueID), - Result: v.(*datapb.CompactionResult), - }) - node.compactionExecutor.completed.Delete(k) - return true - }) - - if len(results) > 0 { - log.Info("Compaction results", zap.Any("results", results)) - } - return &datapb.CompactionStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - Results: results, - }, nil -} - -// SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN -func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { - log.Ctx(ctx).Info("DataNode receives SyncSegments", - zap.Int64("planID", req.GetPlanID()), - zap.Int64("target segmentID", req.GetCompactedTo()), - zap.Int64s("compacted from", req.GetCompactedFrom()), - zap.Int64("numOfRows", req.GetNumOfRows()), - ) - status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} - - if !node.isHealthy() { - status.Reason = "DataNode is unhealthy" - return status, nil - } - - if len(req.GetCompactedFrom()) <= 0 { - status.Reason = "invalid request, compacted from segments shouldn't be empty" - return status, nil - } - - getChannel := func() (int64, Channel) { - for _, segmentFrom := range req.GetCompactedFrom() { - channel, err := node.flowgraphManager.getChannel(segmentFrom) - if err != nil { - log.Warn("invalid segmentID", zap.Int64("segment_from", segmentFrom), zap.Error(err)) - continue - } - return segmentFrom, channel - } - return 0, nil - } - oneSegment, channel := getChannel() - if channel == nil { - log.Warn("no available channel") - status.ErrorCode = commonpb.ErrorCode_Success - return status, nil - } - - ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment)) - if !ok { - status.Reason = fmt.Sprintf("failed to find flow graph service, segmentID: %d", oneSegment) - return status, nil - } - - // oneSegment is definitely in the channel, guaranteed by the check before. - collID, partID, _ := channel.getCollectionAndPartitionID(oneSegment) - targetSeg := &Segment{ - collectionID: collID, - partitionID: partID, - segmentID: req.GetCompactedTo(), - numRows: req.GetNumOfRows(), - } - - err := channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime()) - if err != nil { - status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error()) - return status, nil - } - - // block all flow graph so it's safe to remove segment - ds.fg.Blockall() - defer ds.fg.Unblock() - if err := channel.mergeFlushedSegments(targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil { - status.Reason = err.Error() - return status, nil - } - - status.ErrorCode = commonpb.ErrorCode_Success - return status, nil -} - -// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments -func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) { - log.Info("DataNode receive import request", - zap.Int64("task ID", req.GetImportTask().GetTaskId()), - zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), - zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), - zap.Strings("channel names", req.GetImportTask().GetChannelNames()), - zap.Int64s("working dataNodes", req.WorkingNodes)) - defer func() { - log.Info("DataNode finish import request", zap.Int64("task ID", req.GetImportTask().GetTaskId())) - }() - - importResult := &rootcoordpb.ImportResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - TaskId: req.GetImportTask().TaskId, - DatanodeId: paramtable.GetNodeID(), - State: commonpb.ImportState_ImportStarted, - Segments: make([]int64, 0), - AutoIds: make([]int64, 0), - RowCount: 0, - } - - // Spawn a new context to ignore cancellation from parental context. - newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout) - defer cancel() - // func to report import state to RootCoord. - reportFunc := func(res *rootcoordpb.ImportResult) error { - status, err := node.rootCoord.ReportImport(ctx, res) - if err != nil { - log.Error("fail to report import state to RootCoord", zap.Error(err)) - return err - } - if status != nil && status.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(status.GetReason()) - } - return nil - } - - if !node.isHealthy() { - log.Warn("DataNode import failed", - zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), - zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), - zap.Int64("task ID", req.GetImportTask().GetTaskId()), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, nil - } - - // get a timestamp for all the rows - // Ignore cancellation from parent context. - rep, err := node.rootCoord.AllocTimestamp(newCtx, &rootcoordpb.AllocTimestampRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), - commonpbutil.WithMsgID(0), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - Count: 1, - }) - - if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { - msg := "DataNode alloc ts failed" - log.Warn(msg) - importResult.State = commonpb.ImportState_ImportFailed - importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: msg}) - if reportErr := reportFunc(importResult); reportErr != nil { - log.Warn("fail to report import state to RootCoord", zap.Error(reportErr)) - } - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msg, - }, nil - } - } - - ts := rep.GetTimestamp() - - // get collection schema and shard number - metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId()) - colInfo, err := metaService.getCollectionInfo(newCtx, req.GetImportTask().GetCollectionId(), 0) - if err != nil { - log.Warn("failed to get collection info for collection ID", - zap.Int64("task ID", req.GetImportTask().GetTaskId()), - zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), - zap.Error(err)) - importResult.State = commonpb.ImportState_ImportFailed - importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: err.Error()}) - reportErr := reportFunc(importResult) - if reportErr != nil { - log.Warn("fail to report import state to RootCoord", zap.Error(err)) - } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil - } - - returnFailFunc := func(inputErr error) (*commonpb.Status, error) { - log.Warn("import wrapper failed to parse import request", - zap.Int64("task ID", req.GetImportTask().GetTaskId()), - zap.Error(inputErr)) - importResult.State = commonpb.ImportState_ImportFailed - importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: inputErr.Error()}) - reportErr := reportFunc(importResult) - if reportErr != nil { - log.Warn("fail to report import state to RootCoord", zap.Error(inputErr)) - } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: inputErr.Error(), - }, nil - } - - // parse files and generate segments - segmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, - node.chunkManager, importResult, reportFunc) - importWrapper.SetCallbackFunctions(assignSegmentFunc(node, req), - createBinLogsFunc(node, req, colInfo.GetSchema(), ts), - saveSegmentFunc(node, req, importResult, ts)) - // todo: pass tsStart and tsStart after import_wrapper support - tsStart, tsEnd, err := importutil.ParseTSFromOptions(req.GetImportTask().GetInfos()) - isBackup := importutil.IsBackup(req.GetImportTask().GetInfos()) - if err != nil { - return returnFailFunc(err) - } - log.Info("import time range", zap.Uint64("start_ts", tsStart), zap.Uint64("end_ts", tsEnd)) - err = importWrapper.Import(req.GetImportTask().GetFiles(), - importutil.ImportOptions{OnlyValidate: false, TsStartPoint: tsStart, TsEndPoint: tsEnd, IsBackup: isBackup}) - if err != nil { - return returnFailFunc(err) - } - - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - } - return resp, nil -} - -// AddImportSegment adds the import segment to the current DataNode. -func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { - log.Info("adding segment to DataNode flow graph", - zap.Int64("segment ID", req.GetSegmentId()), - zap.Int64("collection ID", req.GetCollectionId()), - zap.Int64("partition ID", req.GetPartitionId()), - zap.String("channel name", req.GetChannelName()), - zap.Int64("# of rows", req.GetRowNum())) - // Fetch the flow graph on the given v-channel. - var ds *dataSyncService - // Retry in case the channel hasn't been watched yet. - err := retry.Do(ctx, func() error { - var ok bool - ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName()) - if !ok { - return errors.New("channel not found") - } - return nil - }, retry.Attempts(getFlowGraphServiceAttempts)) - if err != nil { - log.Error("channel not found in current DataNode", - zap.String("channel name", req.GetChannelName()), - zap.Int64("node ID", paramtable.GetNodeID())) - return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "channel not found in current DataNode", - }, - }, nil - } - // Get the current dml channel position ID, that will be used in segments start positions and end positions. - posID, err := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) - if err != nil { - return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "failed to get channel position", - }, - }, nil - } - // Add the new segment to the channel. - if !ds.channel.hasSegment(req.GetSegmentId(), true) { - log.Info("adding a new segment to channel", - zap.Int64("segment ID", req.GetSegmentId())) - // Add segment as a flushed segment, but set `importing` to true to add extra information of the segment. - // By 'extra information' we mean segment info while adding a `SegmentType_Flushed` typed segment. - if err := ds.channel.addSegment( - addSegmentReq{ - segType: datapb.SegmentType_Flushed, - segID: req.GetSegmentId(), - collID: req.GetCollectionId(), - partitionID: req.GetPartitionId(), - numOfRows: req.GetRowNum(), - statsBinLogs: req.GetStatsLog(), - startPos: &internalpb.MsgPosition{ - ChannelName: req.GetChannelName(), - MsgID: posID, - Timestamp: req.GetBase().GetTimestamp(), - }, - endPos: &internalpb.MsgPosition{ - ChannelName: req.GetChannelName(), - MsgID: posID, - Timestamp: req.GetBase().GetTimestamp(), - }, - recoverTs: req.GetBase().GetTimestamp(), - importing: true, - }); err != nil { - log.Error("failed to add segment to flow graph", - zap.Error(err)) - return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - }, nil - } - } - ds.flushingSegCache.Remove(req.GetSegmentId()) - return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - ChannelPos: posID, - }, nil -} - -func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc { - return func(shardID int) (int64, string, error) { - chNames := req.GetImportTask().GetChannelNames() - importTaskID := req.GetImportTask().GetTaskId() - if shardID >= len(chNames) { - log.Error("import task returns invalid shard ID", - zap.Int64("task ID", importTaskID), - zap.Int("shard ID", shardID), - zap.Int("# of channels", len(chNames)), - zap.Strings("channel names", chNames), - ) - return 0, "", fmt.Errorf("syncSegmentID Failed: invalid shard ID %d", shardID) - } - - tr := timerecord.NewTimeRecorder("import callback function") - defer tr.Elapse("finished") - - colID := req.GetImportTask().GetCollectionId() - partID := req.GetImportTask().GetPartitionId() - segmentIDReq := composeAssignSegmentIDRequest(1, shardID, chNames, colID, partID) - targetChName := segmentIDReq.GetSegmentIDRequests()[0].GetChannelName() - log.Info("target channel for the import task", - zap.Int64("task ID", importTaskID), - zap.Int("shard ID", shardID), - zap.String("target channel name", targetChName)) - resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq) - if err != nil { - return 0, "", fmt.Errorf("syncSegmentID Failed:%w", err) - } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) - } - segmentID := resp.SegIDAssignments[0].SegID - log.Info("new segment assigned", - zap.Int64("task ID", importTaskID), - zap.Int64("segmentID", segmentID), - zap.Int("shard ID", shardID), - zap.String("target channel name", targetChName)) - return segmentID, targetChName, nil - } -} - -func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *schemapb.CollectionSchema, ts Timestamp) importutil.CreateBinlogsFunc { - return func(fields map[storage.FieldID]storage.FieldData, segmentID int64) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error) { - var rowNum int - for _, field := range fields { - rowNum = field.RowNum() - break - } - - chNames := req.GetImportTask().GetChannelNames() - importTaskID := req.GetImportTask().GetTaskId() - if rowNum <= 0 { - log.Info("fields data is empty, no need to generate binlog", - zap.Int64("task ID", importTaskID), - zap.Int("# of channels", len(chNames)), - zap.Strings("channel names", chNames), - ) - return nil, nil, nil - } - - colID := req.GetImportTask().GetCollectionId() - partID := req.GetImportTask().GetPartitionId() - - fieldInsert, fieldStats, err := createBinLogs(rowNum, schema, ts, fields, node, segmentID, colID, partID) - if err != nil { - log.Error("failed to create binlogs", - zap.Int64("task ID", importTaskID), - zap.Int("# of channels", len(chNames)), - zap.Strings("channel names", chNames), - zap.Any("err", err), - ) - return nil, nil, err - } - - log.Info("new binlog created", - zap.Int64("task ID", importTaskID), - zap.Int64("segmentID", segmentID), - zap.Int("insert log count", len(fieldInsert)), - zap.Int("stats log count", len(fieldStats))) - - return fieldInsert, fieldStats, err - } -} - -func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoordpb.ImportResult, ts Timestamp) importutil.SaveSegmentFunc { - importTaskID := req.GetImportTask().GetTaskId() - return func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error { - log.Info("adding segment to the correct DataNode flow graph and saving binlog paths", - zap.Int64("task ID", importTaskID), - zap.Int64("segmentID", segmentID), - zap.String("targetChName", targetChName), - zap.Int64("rowCount", rowCount), - zap.Uint64("ts", ts)) - - err := retry.Do(context.Background(), func() error { - // Ask DataCoord to save binlog path and add segment to the corresponding DataNode flow graph. - resp, err := node.dataCoord.SaveImportSegment(context.Background(), &datapb.SaveImportSegmentRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithTimeStamp(ts), // Pass current timestamp downstream. - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - SegmentId: segmentID, - ChannelName: targetChName, - CollectionId: req.GetImportTask().GetCollectionId(), - PartitionId: req.GetImportTask().GetPartitionId(), - RowNum: rowCount, - SaveBinlogPathReq: &datapb.SaveBinlogPathsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), - commonpbutil.WithMsgID(0), - commonpbutil.WithTimeStamp(ts), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - SegmentID: segmentID, - CollectionID: req.GetImportTask().GetCollectionId(), - Field2BinlogPaths: fieldsInsert, - Field2StatslogPaths: fieldsStats, - // Set start positions of a SaveBinlogPathRequest explicitly. - StartPositions: []*datapb.SegmentStartPosition{ - { - StartPosition: &internalpb.MsgPosition{ - ChannelName: targetChName, - Timestamp: ts, - }, - SegmentID: segmentID, - }, - }, - Importing: true, - }, - }) - // Only retrying when DataCoord is unhealthy or err != nil, otherwise return immediately. - if err != nil { - return fmt.Errorf(err.Error()) - } - if resp.ErrorCode != commonpb.ErrorCode_Success && resp.ErrorCode != commonpb.ErrorCode_DataCoordNA { - return retry.Unrecoverable(fmt.Errorf("failed to save import segment, reason = %s", resp.Reason)) - } else if resp.ErrorCode == commonpb.ErrorCode_DataCoordNA { - return fmt.Errorf("failed to save import segment: %s", resp.GetReason()) - } - return nil - }) - if err != nil { - log.Warn("failed to save import segment", zap.Error(err)) - return err - } - log.Info("segment imported and persisted", - zap.Int64("task ID", importTaskID), - zap.Int64("segmentID", segmentID)) - res.Segments = append(res.Segments, segmentID) - res.RowCount += rowCount - return nil - } -} - -func composeAssignSegmentIDRequest(rowNum int, shardID int, chNames []string, - collID int64, partID int64) *datapb.AssignSegmentIDRequest { - // use the first field's row count as segment row count - // all the fields row count are same, checked by ImportWrapper - // ask DataCoord to alloc a new segment - log.Info("import task flush segment", - zap.Any("channel names", chNames), - zap.Int("shard ID", shardID)) - segReqs := []*datapb.SegmentIDRequest{ - { - ChannelName: chNames[shardID], - Count: uint32(rowNum), - CollectionID: collID, - PartitionID: partID, - IsImport: true, - }, - } - segmentIDReq := &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: typeutil.ProxyRole, - SegmentIDRequests: segReqs, - } - return segmentIDReq -} - -func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, - fields map[storage.FieldID]storage.FieldData, node *DataNode, segmentID, colID, partID UniqueID) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tsFieldData := make([]int64, rowNum) - for i := range tsFieldData { - tsFieldData[i] = int64(ts) - } - fields[common.TimeStampField] = &storage.Int64FieldData{ - Data: tsFieldData, - NumRows: []int64{int64(rowNum)}, - } - - if status, _ := node.dataCoord.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ - Stats: []*datapb.SegmentStats{{ - SegmentID: segmentID, - NumRows: int64(rowNum), - }}, - }); status.GetErrorCode() != commonpb.ErrorCode_Success { - return nil, nil, fmt.Errorf(status.GetReason()) - } - - data := BufferData{buffer: &InsertData{ - Data: fields, - }} - data.updateSize(int64(rowNum)) - meta := &etcdpb.CollectionMeta{ - ID: colID, - Schema: schema, - } - binLogs, statsBinLogs, err := storage.NewInsertCodec(meta).Serialize(partID, segmentID, data.buffer) - if err != nil { - return nil, nil, err - } - - var alloc allocatorInterface = newAllocator(node.rootCoord) - start, _, err := alloc.allocIDBatch(uint32(len(binLogs))) - if err != nil { - return nil, nil, err - } - - field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs)) - kvs := make(map[string][]byte, len(binLogs)) - field2Logidx := make(map[UniqueID]UniqueID, len(binLogs)) - for idx, blob := range binLogs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return nil, nil, err - } - - logidx := start + int64(idx) - - // no error raise if alloc=false - k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) - - key := path.Join(node.chunkManager.RootPath(), common.SegmentInsertLogPath, k) - kvs[key] = blob.Value[:] - field2Insert[fieldID] = &datapb.Binlog{ - EntriesNum: data.size, - TimestampFrom: ts, - TimestampTo: ts, - LogPath: key, - LogSize: int64(len(blob.Value)), - } - field2Logidx[fieldID] = logidx - } - - field2Stats := make(map[UniqueID]*datapb.Binlog) - // write stats binlog - for _, blob := range statsBinLogs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return nil, nil, err - } - - logidx := field2Logidx[fieldID] - - // no error raise if alloc=false - k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) - - key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k) - kvs[key] = blob.Value - field2Stats[fieldID] = &datapb.Binlog{ - EntriesNum: data.size, - TimestampFrom: ts, - TimestampTo: ts, - LogPath: key, - LogSize: int64(len(blob.Value)), - } - } - - err = node.chunkManager.MultiWrite(ctx, kvs) - if err != nil { - return nil, nil, err - } - var ( - fieldInsert []*datapb.FieldBinlog - fieldStats []*datapb.FieldBinlog - ) - for k, v := range field2Insert { - fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}}) - } - for k, v := range field2Stats { - fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}}) - } - return fieldInsert, fieldStats, nil -} - -func logDupFlush(cID, segID int64) { - log.Info("segment is already being flushed, ignoring flush request", - zap.Int64("collection ID", cID), - zap.Int64("segment ID", segID)) -} diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 9fb30c3daa..d96389304b 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -19,38 +19,30 @@ package datanode import ( "context" "fmt" - "math" "math/rand" "os" - "path/filepath" "strconv" "strings" - "sync" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" - "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/schemapb" - "github.com/milvus-io/milvus/internal/common" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.uber.org/zap" ) const returnError = "ReturnError" @@ -114,13 +106,6 @@ func TestDataNode(t *testing.T) { node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) paramtable.SetNodeID(1) - t.Run("Test WatchDmChannels ", func(t *testing.T) { - emptyNode := &DataNode{} - - status, err := emptyNode.WatchDmChannels(ctx, &datapb.WatchDmChannelsRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - }) t.Run("Test SetRootCoord", func(t *testing.T) { emptyDN := &DataNode{} @@ -168,195 +153,6 @@ func TestDataNode(t *testing.T) { } }) - t.Run("Test GetComponentStates", func(t *testing.T) { - stat, err := node.GetComponentStates(node.ctx) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, stat.Status.ErrorCode) - }) - - t.Run("Test GetCompactionState", func(t *testing.T) { - node.compactionExecutor.executing.Store(int64(3), 0) - node.compactionExecutor.executing.Store(int64(2), 0) - node.compactionExecutor.completed.Store(int64(1), &datapb.CompactionResult{ - PlanID: 1, - SegmentID: 10, - }) - stat, err := node.GetCompactionState(node.ctx, nil) - assert.NoError(t, err) - - assert.Equal(t, 3, len(stat.GetResults())) - - cnt := 0 - for _, v := range stat.GetResults() { - if v.GetState() == commonpb.CompactionState_Completed { - cnt++ - } - } - assert.Equal(t, 1, cnt) - - cnt = 0 - node.compactionExecutor.completed.Range(func(k, v any) bool { - cnt++ - return true - }) - assert.Equal(t, 0, cnt) - }) - - t.Run("Test GetCompactionState unhealthy", func(t *testing.T) { - node.UpdateStateCode(commonpb.StateCode_Abnormal) - resp, _ := node.GetCompactionState(ctx, nil) - assert.Equal(t, "DataNode is unhealthy", resp.GetStatus().GetReason()) - node.UpdateStateCode(commonpb.StateCode_Healthy) - }) - - t.Run("Test FlushSegments", func(t *testing.T) { - dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" - - node1 := newIDLEDataNodeMock(context.TODO(), schemapb.DataType_Int64) - node1.SetEtcdClient(etcdCli) - err = node1.Init() - assert.Nil(t, err) - err = node1.Start() - assert.Nil(t, err) - defer func() { - err := node1.Stop() - assert.Nil(t, err) - }() - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - } - - err := node1.flowgraphManager.addAndStart(node1, vchan, nil) - require.Nil(t, err) - - fgservice, ok := node1.flowgraphManager.getFlowgraphService(dmChannelName) - assert.True(t, ok) - - err = fgservice.channel.addSegment(addSegmentReq{ - segType: datapb.SegmentType_New, - segID: 0, - collID: 1, - partitionID: 1, - startPos: &internalpb.MsgPosition{}, - endPos: &internalpb.MsgPosition{}, - }) - assert.Nil(t, err) - - req := &datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{ - TargetID: node1.session.ServerID, - }, - DbID: 0, - CollectionID: 1, - SegmentIDs: []int64{0}, - } - - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - - status, err := node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - }() - - go func() { - defer wg.Done() - - timeTickMsgPack := msgstream.MsgPack{} - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: Timestamp(0), - EndTimestamp: Timestamp(0), - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: UniqueID(0), - Timestamp: math.MaxUint64, - SourceID: 0, - }, - }, - } - timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) - - // pulsar produce - factory := dependency.NewDefaultFactory(true) - insertStream, err := factory.NewMsgStream(node1.ctx) - assert.NoError(t, err) - insertStream.AsProducer([]string{dmChannelName}) - defer insertStream.Close() - - _, err = insertStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) - - _, err = insertStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) - }() - - wg.Wait() - // dup call - status, err := node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - - // failure call - req = &datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{ - TargetID: -1, - }, - DbID: 0, - CollectionID: 1, - SegmentIDs: []int64{1}, - } - status, err = node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_NodeIDNotMatch, status.ErrorCode) - - req = &datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{ - TargetID: node1.session.ServerID, - }, - DbID: 0, - CollectionID: 1, - SegmentIDs: []int64{1}, - } - - status, err = node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) - - req = &datapb.FlushSegmentsRequest{ - Base: &commonpb.MsgBase{ - TargetID: node1.session.ServerID, - }, - DbID: 0, - CollectionID: 1, - SegmentIDs: []int64{}, - } - - status, err = node1.FlushSegments(node1.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - }) - - t.Run("Test GetTimeTickChannel", func(t *testing.T) { - _, err := node.GetTimeTickChannel(node.ctx) - assert.NoError(t, err) - }) - - t.Run("Test GetStatisticsChannel", func(t *testing.T) { - _, err := node.GetStatisticsChannel(node.ctx) - assert.NoError(t, err) - }) - t.Run("Test getSystemInfoMetrics", func(t *testing.T) { emptyNode := &DataNode{} emptyNode.session = &sessionutil.Session{ServerID: 1} @@ -386,258 +182,6 @@ func TestDataNode(t *testing.T) { rateCol.Register(metricsinfo.InsertConsumeThroughput) }) - t.Run("Test ShowConfigurations", func(t *testing.T) { - pattern := "datanode.Port" - req := &internalpb.ShowConfigurationsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_WatchQueryChannels, - MsgID: rand.Int63(), - }, - Pattern: pattern, - } - - //test closed server - node := &DataNode{} - node.session = &sessionutil.Session{ServerID: 1} - node.stateCode.Store(commonpb.StateCode_Abnormal) - - resp, err := node.ShowConfigurations(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) - node.stateCode.Store(commonpb.StateCode_Healthy) - - resp, err = node.ShowConfigurations(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - - assert.Equal(t, 1, len(resp.Configuations)) - assert.Equal(t, "datanode.port", resp.Configuations[0].Key) - }) - - t.Run("Test GetMetrics", func(t *testing.T) { - node := &DataNode{} - node.session = &sessionutil.Session{ServerID: 1} - node.flowgraphManager = newFlowgraphManager() - // server is closed - node.stateCode.Store(commonpb.StateCode_Abnormal) - resp, err := node.GetMetrics(ctx, &milvuspb.GetMetricsRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - node.stateCode.Store(commonpb.StateCode_Healthy) - - // failed to parse metric type - invalidRequest := "invalid request" - resp, err = node.GetMetrics(ctx, &milvuspb.GetMetricsRequest{ - Request: invalidRequest, - }) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - - // unsupported metric type - unsupportedMetricType := "unsupported" - req, err := metricsinfo.ConstructRequestByMetricType(unsupportedMetricType) - assert.NoError(t, err) - resp, err = node.GetMetrics(ctx, req) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - - // normal case - req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) - assert.NoError(t, err) - resp, err = node.GetMetrics(node.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - log.Info("Test DataNode.GetMetrics", - zap.String("name", resp.ComponentName), - zap.String("response", resp.Response)) - }) - - t.Run("Test Import", func(t *testing.T) { - node.rootCoord = &RootCoordFactory{ - collectionID: 100, - pkType: schemapb.DataType_Int64, - } - content := []byte(`{ - "rows":[ - {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, - {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, - {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, - {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, - {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} - ] - }`) - - chName1 := "fake-by-dev-rootcoord-dml-testimport-1" - chName2 := "fake-by-dev-rootcoord-dml-testimport-2" - err := node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 100, - ChannelName: chName1, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 100, - ChannelName: chName2, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - - _, ok := node.flowgraphManager.getFlowgraphService(chName1) - assert.True(t, ok) - _, ok = node.flowgraphManager.getFlowgraphService(chName2) - assert.True(t, ok) - - filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") - err = node.chunkManager.Write(ctx, filePath, content) - assert.NoError(t, err) - req := &datapb.ImportTaskRequest{ - ImportTask: &datapb.ImportTask{ - CollectionId: 100, - PartitionId: 100, - ChannelNames: []string{chName1, chName2}, - Files: []string{filePath}, - RowBased: true, - }, - } - node.rootCoord.(*RootCoordFactory).ReportImportErr = true - _, err = node.Import(node.ctx, req) - assert.NoError(t, err) - node.rootCoord.(*RootCoordFactory).ReportImportErr = false - - node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = true - _, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = false - - node.dataCoord.(*DataCoordFactory).AddSegmentError = true - _, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - node.dataCoord.(*DataCoordFactory).AddSegmentError = false - - node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = true - _, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = false - - stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, stat.GetErrorCode()) - assert.Equal(t, "", stat.GetReason()) - }) - - t.Run("Test Import bad flow graph", func(t *testing.T) { - node.rootCoord = &RootCoordFactory{ - collectionID: 100, - pkType: schemapb.DataType_Int64, - } - - chName1 := "fake-by-dev-rootcoord-dml-testimport-1" - chName2 := "fake-by-dev-rootcoord-dml-testimport-2" - err := node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 100, - ChannelName: chName1, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 999, // wrong collection ID. - ChannelName: chName2, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - - _, ok := node.flowgraphManager.getFlowgraphService(chName1) - assert.True(t, ok) - _, ok = node.flowgraphManager.getFlowgraphService(chName2) - assert.True(t, ok) - - content := []byte(`{ - "rows":[ - {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, - {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, - {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, - {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, - {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} - ] - }`) - - filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") - err = node.chunkManager.Write(ctx, filePath, content) - assert.NoError(t, err) - req := &datapb.ImportTaskRequest{ - ImportTask: &datapb.ImportTask{ - CollectionId: 100, - PartitionId: 100, - ChannelNames: []string{chName1, chName2}, - Files: []string{filePath}, - RowBased: true, - }, - } - stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, stat.GetErrorCode()) - assert.Equal(t, "", stat.GetReason()) - }) - - t.Run("Test Import report import error", func(t *testing.T) { - node.rootCoord = &RootCoordFactory{ - collectionID: 100, - pkType: schemapb.DataType_Int64, - ReportImportErr: true, - } - content := []byte(`{ - "rows":[ - {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, - {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, - {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, - {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, - {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} - ] - }`) - - filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json") - err = node.chunkManager.Write(ctx, filePath, content) - assert.NoError(t, err) - req := &datapb.ImportTaskRequest{ - ImportTask: &datapb.ImportTask{ - CollectionId: 100, - PartitionId: 100, - ChannelNames: []string{"ch1", "ch2"}, - Files: []string{filePath}, - RowBased: true, - }, - } - stat, err := node.Import(node.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) - }) - - t.Run("Test Import error", func(t *testing.T) { - node.rootCoord = &RootCoordFactory{collectionID: -1} - req := &datapb.ImportTaskRequest{ - ImportTask: &datapb.ImportTask{ - CollectionId: 100, - PartitionId: 100, - }, - } - stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.ErrorCode) - - stat, err = node.Import(context.WithValue(ctx, ctxKey{}, returnError), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) - - node.stateCode.Store(commonpb.StateCode_Abnormal) - stat, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) - }) - t.Run("Test BackGroundGC", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) @@ -666,185 +210,6 @@ func TestDataNode(t *testing.T) { cancel() }) - t.Run("Test SyncSegments", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1" - - node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - node.SetEtcdClient(etcdCli) - err = node.Init() - assert.Nil(t, err) - err = node.Start() - assert.Nil(t, err) - defer node.Stop() - - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - ChannelName: chanName, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{100, 200, 300}, - }, nil) - require.NoError(t, err) - fg, ok := node.flowgraphManager.getFlowgraphService(chanName) - assert.True(t, ok) - - s1 := Segment{segmentID: 100} - s2 := Segment{segmentID: 200} - s3 := Segment{segmentID: 300} - s1.setType(datapb.SegmentType_Flushed) - s2.setType(datapb.SegmentType_Flushed) - s3.setType(datapb.SegmentType_Flushed) - fg.channel.(*ChannelMeta).segments = map[UniqueID]*Segment{ - s1.segmentID: &s1, - s2.segmentID: &s2, - s3.segmentID: &s3, - } - - t.Run("invalid compacted from", func(t *testing.T) { - req := &datapb.SyncSegmentsRequest{ - CompactedTo: 400, - NumOfRows: 100, - } - - req.CompactedFrom = []UniqueID{} - status, err := node.SyncSegments(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - - req.CompactedFrom = []UniqueID{101, 201} - status, err = node.SyncSegments(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) - }) - - t.Run("valid request numRows>0", func(t *testing.T) { - req := &datapb.SyncSegmentsRequest{ - CompactedFrom: []UniqueID{100, 200, 101, 201}, - CompactedTo: 102, - NumOfRows: 100, - } - status, err := node.SyncSegments(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) - - assert.True(t, fg.channel.hasSegment(req.CompactedTo, true)) - assert.False(t, fg.channel.hasSegment(req.CompactedFrom[0], true)) - assert.False(t, fg.channel.hasSegment(req.CompactedFrom[1], true)) - }) - - t.Run("valid request numRows=0", func(t *testing.T) { - s1.setType(datapb.SegmentType_Flushed) - s2.setType(datapb.SegmentType_Flushed) - s3.setType(datapb.SegmentType_Flushed) - - fg.channel.(*ChannelMeta).segments = map[UniqueID]*Segment{ - s1.segmentID: &s1, - s2.segmentID: &s2, - s3.segmentID: &s3, - } - - req := &datapb.SyncSegmentsRequest{ - CompactedFrom: []int64{s1.segmentID, s2.segmentID}, - CompactedTo: 101, - NumOfRows: 0, - } - status, err := node.SyncSegments(ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) - - assert.False(t, fg.channel.hasSegment(req.CompactedTo, true)) - assert.False(t, fg.channel.hasSegment(req.CompactedFrom[0], true)) - assert.False(t, fg.channel.hasSegment(req.CompactedFrom[1], true)) - }) - }) -} - -func TestDataNode_AddSegment(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - node.SetEtcdClient(etcdCli) - err = node.Init() - assert.Nil(t, err) - err = node.Start() - assert.Nil(t, err) - defer node.Stop() - - node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) - paramtable.SetNodeID(1) - - t.Run("test AddSegment", func(t *testing.T) { - node.rootCoord = &RootCoordFactory{ - collectionID: 100, - pkType: schemapb.DataType_Int64, - } - - chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1" - chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2" - err := node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 100, - ChannelName: chName1, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{ - CollectionID: 100, - ChannelName: chName2, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - }, nil) - require.Nil(t, err) - - _, ok := node.flowgraphManager.getFlowgraphService(chName1) - assert.True(t, ok) - _, ok = node.flowgraphManager.getFlowgraphService(chName2) - assert.True(t, ok) - - stat, err := node.AddImportSegment(context.WithValue(ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ - SegmentId: 100, - CollectionId: 100, - PartitionId: 100, - ChannelName: chName1, - RowNum: 500, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, stat.GetStatus().GetErrorCode()) - assert.Equal(t, "", stat.GetStatus().GetReason()) - assert.NotEqual(t, nil, stat.GetChannelPos()) - - getFlowGraphServiceAttempts = 3 - stat, err = node.AddImportSegment(context.WithValue(ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ - SegmentId: 100, - CollectionId: 100, - PartitionId: 100, - ChannelName: "bad-ch-name", - RowNum: 500, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetStatus().GetErrorCode()) - }) } func TestWatchChannel(t *testing.T) { @@ -1112,101 +477,3 @@ func TestWatchChannel(t *testing.T) { assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds()) }) } - -func TestDataNode_GetComponentStates(t *testing.T) { - n := &DataNode{} - n.stateCode.Store(commonpb.StateCode_Healthy) - resp, err := n.GetComponentStates(context.Background()) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.Equal(t, common.NotRegisteredID, resp.State.NodeID) - n.session = &sessionutil.Session{} - n.session.UpdateRegistered(true) - resp, err = n.GetComponentStates(context.Background()) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) -} - -func TestDataNode_ResendSegmentStats(t *testing.T) { - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-ResendSegmentStats" - - node := newIDLEDataNodeMock(context.TODO(), schemapb.DataType_Int64) - node.SetEtcdClient(etcdCli) - err = node.Init() - assert.Nil(t, err) - err = node.Start() - assert.Nil(t, err) - defer func() { - err := node.Stop() - assert.Nil(t, err) - }() - - vChan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - } - - err = node.flowgraphManager.addAndStart(node, vChan, nil) - require.Nil(t, err) - - fgService, ok := node.flowgraphManager.getFlowgraphService(dmChannelName) - assert.True(t, ok) - - err = fgService.channel.addSegment(addSegmentReq{ - segType: datapb.SegmentType_New, - segID: 0, - collID: 1, - partitionID: 1, - startPos: &internalpb.MsgPosition{}, - endPos: &internalpb.MsgPosition{}, - }) - assert.Nil(t, err) - err = fgService.channel.addSegment(addSegmentReq{ - segType: datapb.SegmentType_New, - segID: 1, - collID: 1, - partitionID: 2, - startPos: &internalpb.MsgPosition{}, - endPos: &internalpb.MsgPosition{}, - }) - assert.Nil(t, err) - err = fgService.channel.addSegment(addSegmentReq{ - segType: datapb.SegmentType_New, - segID: 2, - collID: 1, - partitionID: 3, - startPos: &internalpb.MsgPosition{}, - endPos: &internalpb.MsgPosition{}, - }) - assert.Nil(t, err) - - req := &datapb.ResendSegmentStatsRequest{ - Base: &commonpb.MsgBase{}, - } - - wg := sync.WaitGroup{} - wg.Add(2) - - resp, err := node.ResendSegmentStats(node.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent()) - - // Duplicate call. - resp, err = node.ResendSegmentStats(node.ctx, req) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent()) -} diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 91b75465e4..065f7a7f78 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -672,7 +672,7 @@ func TestRollBF(t *testing.T) { }) } -type InsertBufferNodeSuit struct { +type InsertBufferNodeSuite struct { suite.Suite channel *ChannelMeta @@ -684,7 +684,7 @@ type InsertBufferNodeSuit struct { originalConfig int64 } -func (s *InsertBufferNodeSuit) SetupSuite() { +func (s *InsertBufferNodeSuite) SetupSuite() { insertBufferNodeTestDir := "/tmp/milvus_test/insert_buffer_node" rc := &RootCoordFactory{ pkType: schemapb.DataType_Int64, @@ -700,12 +700,12 @@ func (s *InsertBufferNodeSuit) SetupSuite() { paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200") } -func (s *InsertBufferNodeSuit) TearDownSuite() { +func (s *InsertBufferNodeSuite) TearDownSuite() { s.cm.RemoveWithPrefix(context.Background(), s.cm.RootPath()) paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, strconv.FormatInt(s.originalConfig, 10)) } -func (s *InsertBufferNodeSuit) SetupTest() { +func (s *InsertBufferNodeSuite) SetupTest() { segs := []struct { segID UniqueID sType datapb.SegmentType @@ -728,11 +728,11 @@ func (s *InsertBufferNodeSuit) SetupTest() { } } -func (s *InsertBufferNodeSuit) TearDownTest() { +func (s *InsertBufferNodeSuite) TearDownTest() { s.channel.removeSegments(1, 2, 3) } -func (s *InsertBufferNodeSuit) TestFillInSyncTasks() { +func (s *InsertBufferNodeSuite) TestFillInSyncTasks() { s.Run("drop collection", func() { fgMsg := &flowGraphMsg{dropCollection: true} @@ -857,7 +857,7 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() { } func TestInsertBufferNodeSuite(t *testing.T) { - suite.Run(t, new(InsertBufferNodeSuit)) + suite.Run(t, new(InsertBufferNodeSuite)) } // CompactedRootCoord has meta info compacted at ts diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index bc470f38c8..a9a5a7405b 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -54,16 +54,6 @@ type Segment struct { startPos *internalpb.MsgPosition // TODO readonly } -type addSegmentReq struct { - segType datapb.SegmentType - segID, collID, partitionID UniqueID - numOfRows int64 - startPos, endPos *internalpb.MsgPosition - statsBinLogs []*datapb.FieldBinlog - recoverTs Timestamp - importing bool -} - func (s *Segment) isValid() bool { return s.getType() != datapb.SegmentType_Compacted } diff --git a/internal/datanode/services.go b/internal/datanode/services.go new file mode 100644 index 0000000000..bf3b2a797a --- /dev/null +++ b/internal/datanode/services.go @@ -0,0 +1,974 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package datanode implements data persistence logic. +// +// Data node persists insert logs into persistent storage like minIO/S3. +package datanode + +import ( + "context" + "errors" + "fmt" + "path" + "strconv" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/importutil" + "github.com/milvus-io/milvus/internal/util/metautil" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +// WatchDmChannels is not in use +func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { + log.Warn("DataNode WatchDmChannels is not in use") + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "watchDmChannels do nothing", + }, nil +} + +// GetComponentStates will return current state of DataNode +func (node *DataNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { + log.Debug("DataNode current state", zap.Any("State", node.stateCode.Load())) + nodeID := common.NotRegisteredID + if node.session != nil && node.session.Registered() { + nodeID = node.session.ServerID + } + states := &milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + // NodeID: Params.NodeID, // will race with DataNode.Register() + NodeID: nodeID, + Role: node.Role, + StateCode: node.stateCode.Load().(commonpb.StateCode), + }, + SubcomponentStates: make([]*milvuspb.ComponentInfo, 0), + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + } + return states, nil +} + +// FlushSegments packs flush messages into flowGraph through flushChan. +// +// DataCoord calls FlushSegments if the segment is seal&flush only. +// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. +// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. +func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { + metrics.DataNodeFlushReqCounter.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + MetricRequestsTotal).Inc() + + errStatus := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + } + + if !node.isHealthy() { + errStatus.Reason = "dataNode not in HEALTHY state" + return errStatus, nil + } + + if req.GetBase().GetTargetID() != node.session.ServerID { + log.Warn("flush segment target id not matched", + zap.Int64("targetID", req.GetBase().GetTargetID()), + zap.Int64("serverID", node.session.ServerID), + ) + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_NodeIDNotMatch, + Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID), + } + return status, nil + } + + log.Info("receiving FlushSegments request", + zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64s("sealedSegments", req.GetSegmentIDs()), + ) + + segmentIDs := req.GetSegmentIDs() + var flushedSeg []UniqueID + for _, segID := range segmentIDs { + // if the segment in already being flushed, skip it. + if node.segmentCache.checkIfCached(segID) { + logDupFlush(req.GetCollectionID(), segID) + continue + } + // Get the flush channel for the given segment ID. + // If no flush channel is found, report an error. + flushCh, err := node.flowgraphManager.getFlushCh(segID) + if err != nil { + errStatus.Reason = "no flush channel found for the segment, unable to flush" + log.Error(errStatus.Reason, zap.Int64("segmentID", segID), zap.Error(err)) + return errStatus, nil + } + + // Double check that the segment is still not cached. + // Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed. + exist := node.segmentCache.checkOrCache(segID) + if exist { + logDupFlush(req.GetCollectionID(), segID) + continue + } + // flushedSeg is only for logging purpose. + flushedSeg = append(flushedSeg, segID) + // Send the segment to its flush channel. + flushCh <- flushMsg{ + msgID: req.GetBase().GetMsgID(), + timestamp: req.GetBase().GetTimestamp(), + segmentID: segID, + collectionID: req.GetCollectionID(), + } + } + + // Log success flushed segments. + if len(flushedSeg) > 0 { + log.Info("sending segments to flush channel", + zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64s("sealedSegments", flushedSeg)) + } + + metrics.DataNodeFlushReqCounter.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + MetricRequestsSuccess).Inc() + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil +} + +// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message. +// It returns a list of segments to be sent. +func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) { + log.Info("start resending segment stats, if any", + zap.Int64("DataNode ID", paramtable.GetNodeID())) + segResent := node.flowgraphManager.resendTT() + log.Info("found segment(s) with stats to resend", + zap.Int64s("segment IDs", segResent)) + return &datapb.ResendSegmentStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + SegResent: segResent, + }, nil +} + +// GetTimeTickChannel currently do nothing +func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil +} + +// GetStatisticsChannel currently do nothing +func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil +} + +// ShowConfigurations returns the configurations of DataNode matching req.Pattern +func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { + log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern)) + if !node.isHealthy() { + log.Warn("DataNode.ShowConfigurations failed", + zap.Int64("nodeId", paramtable.GetNodeID()), + zap.String("req", req.Pattern), + zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) + + return &internalpb.ShowConfigurationsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), + }, + Configuations: nil, + }, nil + } + configList := make([]*commonpb.KeyValuePair, 0) + for key, value := range Params.GetComponentConfigurations(ctx, "datanode", req.Pattern) { + configList = append(configList, + &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + return &internalpb.ShowConfigurationsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Configuations: configList, + }, nil +} + +// GetMetrics return datanode metrics +func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + if !node.isHealthy() { + log.Warn("DataNode.GetMetrics failed", + zap.Int64("node_id", paramtable.GetNodeID()), + zap.String("req", req.Request), + zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), + }, + }, nil + } + + metricType, err := metricsinfo.ParseMetricType(req.Request) + if err != nil { + log.Warn("DataNode.GetMetrics failed to parse metric type", + zap.Int64("node_id", paramtable.GetNodeID()), + zap.String("req", req.Request), + zap.Error(err)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), + }, + }, nil + } + + if metricType == metricsinfo.SystemInfoMetrics { + systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req) + if err != nil { + log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err)) + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), + }, + }, nil + } + + return systemInfoMetrics, nil + } + + log.Debug("DataNode.GetMetrics failed, request metric type is not implemented yet", + zap.Int64("node_id", paramtable.GetNodeID()), + zap.String("req", req.Request), + zap.String("metric_type", metricType)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + }, nil +} + +// Compaction handles compaction request from DataCoord +// returns status as long as compaction task enqueued or invalid +func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + } + + ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel()) + if !ok { + log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel())) + status.Reason = errIllegalCompactionPlan.Error() + return status, nil + } + + if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) { + log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel())) + status.Reason = "channel marked invalid" + return status, nil + } + + binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} + task := newCompactionTask( + node.ctx, + binlogIO, binlogIO, + ds.channel, + ds.flushManager, + ds.idAllocator, + req, + node.chunkManager, + ) + + node.compactionExecutor.execute(task) + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil +} + +// GetCompactionState called by DataCoord +// return status of all compaction plans +func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { + if !node.isHealthy() { + return &datapb.CompactionStateResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "DataNode is unhealthy", + }, + }, nil + } + results := make([]*datapb.CompactionStateResult, 0) + node.compactionExecutor.executing.Range(func(k, v any) bool { + results = append(results, &datapb.CompactionStateResult{ + State: commonpb.CompactionState_Executing, + PlanID: k.(UniqueID), + }) + return true + }) + node.compactionExecutor.completed.Range(func(k, v any) bool { + results = append(results, &datapb.CompactionStateResult{ + State: commonpb.CompactionState_Completed, + PlanID: k.(UniqueID), + Result: v.(*datapb.CompactionResult), + }) + node.compactionExecutor.completed.Delete(k) + return true + }) + + if len(results) > 0 { + log.Info("Compaction results", zap.Any("results", results)) + } + return &datapb.CompactionStateResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Results: results, + }, nil +} + +// SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN +func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { + log.Ctx(ctx).Info("DataNode receives SyncSegments", + zap.Int64("planID", req.GetPlanID()), + zap.Int64("target segmentID", req.GetCompactedTo()), + zap.Int64s("compacted from", req.GetCompactedFrom()), + zap.Int64("numOfRows", req.GetNumOfRows()), + ) + status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} + + if !node.isHealthy() { + status.Reason = "DataNode is unhealthy" + return status, nil + } + + if len(req.GetCompactedFrom()) <= 0 { + status.Reason = "invalid request, compacted from segments shouldn't be empty" + return status, nil + } + + getChannel := func() (int64, Channel) { + for _, segmentFrom := range req.GetCompactedFrom() { + channel, err := node.flowgraphManager.getChannel(segmentFrom) + if err != nil { + log.Warn("invalid segmentID", zap.Int64("segment_from", segmentFrom), zap.Error(err)) + continue + } + return segmentFrom, channel + } + return 0, nil + } + oneSegment, channel := getChannel() + if channel == nil { + log.Warn("no available channel") + status.ErrorCode = commonpb.ErrorCode_Success + return status, nil + } + + ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment)) + if !ok { + status.Reason = fmt.Sprintf("failed to find flow graph service, segmentID: %d", oneSegment) + return status, nil + } + + // oneSegment is definitely in the channel, guaranteed by the check before. + collID, partID, _ := channel.getCollectionAndPartitionID(oneSegment) + targetSeg := &Segment{ + collectionID: collID, + partitionID: partID, + segmentID: req.GetCompactedTo(), + numRows: req.GetNumOfRows(), + } + + err := channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime()) + if err != nil { + status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error()) + return status, nil + } + + // block all flow graph so it's safe to remove segment + ds.fg.Blockall() + defer ds.fg.Unblock() + if err := channel.mergeFlushedSegments(targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil { + status.Reason = err.Error() + return status, nil + } + + status.ErrorCode = commonpb.ErrorCode_Success + return status, nil +} + +// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments +func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) { + log.Info("DataNode receive import request", + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), + zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), + zap.Strings("channel names", req.GetImportTask().GetChannelNames()), + zap.Int64s("working dataNodes", req.WorkingNodes)) + defer func() { + log.Info("DataNode finish import request", zap.Int64("task ID", req.GetImportTask().GetTaskId())) + }() + + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: req.GetImportTask().TaskId, + DatanodeId: paramtable.GetNodeID(), + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + + // Spawn a new context to ignore cancellation from parental context. + newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout) + defer cancel() + // func to report import state to RootCoord. + reportFunc := func(res *rootcoordpb.ImportResult) error { + status, err := node.rootCoord.ReportImport(ctx, res) + if err != nil { + log.Error("fail to report import state to RootCoord", zap.Error(err)) + return err + } + if status != nil && status.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(status.GetReason()) + } + return nil + } + + if !node.isHealthy() { + log.Warn("DataNode import failed", + zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), + zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), + }, nil + } + + // get a timestamp for all the rows + // Ignore cancellation from parent context. + rep, err := node.rootCoord.AllocTimestamp(newCtx, &rootcoordpb.AllocTimestampRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), + commonpbutil.WithMsgID(0), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + Count: 1, + }) + + if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { + msg := "DataNode alloc ts failed" + log.Warn(msg) + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: msg}) + if reportErr := reportFunc(importResult); reportErr != nil { + log.Warn("fail to report import state to RootCoord", zap.Error(reportErr)) + } + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msg, + }, nil + } + } + + ts := rep.GetTimestamp() + + // get collection schema and shard number + metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId()) + colInfo, err := metaService.getCollectionInfo(newCtx, req.GetImportTask().GetCollectionId(), 0) + if err != nil { + log.Warn("failed to get collection info for collection ID", + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), + zap.Error(err)) + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: err.Error()}) + reportErr := reportFunc(importResult) + if reportErr != nil { + log.Warn("fail to report import state to RootCoord", zap.Error(err)) + } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, nil + } + + returnFailFunc := func(inputErr error) (*commonpb.Status, error) { + log.Warn("import wrapper failed to parse import request", + zap.Int64("task ID", req.GetImportTask().GetTaskId()), + zap.Error(inputErr)) + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: inputErr.Error()}) + reportErr := reportFunc(importResult) + if reportErr != nil { + log.Warn("fail to report import state to RootCoord", zap.Error(inputErr)) + } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: inputErr.Error(), + }, nil + } + + // parse files and generate segments + segmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 + importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, + node.chunkManager, importResult, reportFunc) + importWrapper.SetCallbackFunctions(assignSegmentFunc(node, req), + createBinLogsFunc(node, req, colInfo.GetSchema(), ts), + saveSegmentFunc(node, req, importResult, ts)) + // todo: pass tsStart and tsStart after import_wrapper support + tsStart, tsEnd, err := importutil.ParseTSFromOptions(req.GetImportTask().GetInfos()) + isBackup := importutil.IsBackup(req.GetImportTask().GetInfos()) + if err != nil { + return returnFailFunc(err) + } + log.Info("import time range", zap.Uint64("start_ts", tsStart), zap.Uint64("end_ts", tsEnd)) + err = importWrapper.Import(req.GetImportTask().GetFiles(), + importutil.ImportOptions{OnlyValidate: false, TsStartPoint: tsStart, TsEndPoint: tsEnd, IsBackup: isBackup}) + if err != nil { + return returnFailFunc(err) + } + + resp := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + return resp, nil +} + +// AddImportSegment adds the import segment to the current DataNode. +func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + log.Info("adding segment to DataNode flow graph", + zap.Int64("segment ID", req.GetSegmentId()), + zap.Int64("collection ID", req.GetCollectionId()), + zap.Int64("partition ID", req.GetPartitionId()), + zap.String("channel name", req.GetChannelName()), + zap.Int64("# of rows", req.GetRowNum())) + // Fetch the flow graph on the given v-channel. + var ds *dataSyncService + // Retry in case the channel hasn't been watched yet. + err := retry.Do(ctx, func() error { + var ok bool + ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName()) + if !ok { + return errors.New("channel not found") + } + return nil + }, retry.Attempts(getFlowGraphServiceAttempts)) + if err != nil { + log.Error("channel not found in current DataNode", + zap.String("channel name", req.GetChannelName()), + zap.Int64("node ID", paramtable.GetNodeID())) + return &datapb.AddImportSegmentResponse{ + Status: &commonpb.Status{ + // TODO: Add specific error code. + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "channel not found in current DataNode", + }, + }, nil + } + // Get the current dml channel position ID, that will be used in segments start positions and end positions. + posID, err := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) + if err != nil { + return &datapb.AddImportSegmentResponse{ + Status: &commonpb.Status{ + // TODO: Add specific error code. + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "failed to get channel position", + }, + }, nil + } + // Add the new segment to the channel. + if !ds.channel.hasSegment(req.GetSegmentId(), true) { + log.Info("adding a new segment to channel", + zap.Int64("segment ID", req.GetSegmentId())) + // Add segment as a flushed segment, but set `importing` to true to add extra information of the segment. + // By 'extra information' we mean segment info while adding a `SegmentType_Flushed` typed segment. + if err := ds.channel.addSegment( + addSegmentReq{ + segType: datapb.SegmentType_Flushed, + segID: req.GetSegmentId(), + collID: req.GetCollectionId(), + partitionID: req.GetPartitionId(), + numOfRows: req.GetRowNum(), + statsBinLogs: req.GetStatsLog(), + startPos: &internalpb.MsgPosition{ + ChannelName: req.GetChannelName(), + MsgID: posID, + Timestamp: req.GetBase().GetTimestamp(), + }, + endPos: &internalpb.MsgPosition{ + ChannelName: req.GetChannelName(), + MsgID: posID, + Timestamp: req.GetBase().GetTimestamp(), + }, + recoverTs: req.GetBase().GetTimestamp(), + importing: true, + }); err != nil { + log.Error("failed to add segment to flow graph", + zap.Error(err)) + return &datapb.AddImportSegmentResponse{ + Status: &commonpb.Status{ + // TODO: Add specific error code. + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil + } + } + ds.flushingSegCache.Remove(req.GetSegmentId()) + return &datapb.AddImportSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + ChannelPos: posID, + }, nil +} + +func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc { + return func(shardID int) (int64, string, error) { + chNames := req.GetImportTask().GetChannelNames() + importTaskID := req.GetImportTask().GetTaskId() + if shardID >= len(chNames) { + log.Error("import task returns invalid shard ID", + zap.Int64("task ID", importTaskID), + zap.Int("shard ID", shardID), + zap.Int("# of channels", len(chNames)), + zap.Strings("channel names", chNames), + ) + return 0, "", fmt.Errorf("syncSegmentID Failed: invalid shard ID %d", shardID) + } + + tr := timerecord.NewTimeRecorder("import callback function") + defer tr.Elapse("finished") + + colID := req.GetImportTask().GetCollectionId() + partID := req.GetImportTask().GetPartitionId() + segmentIDReq := composeAssignSegmentIDRequest(1, shardID, chNames, colID, partID) + targetChName := segmentIDReq.GetSegmentIDRequests()[0].GetChannelName() + log.Info("target channel for the import task", + zap.Int64("task ID", importTaskID), + zap.Int("shard ID", shardID), + zap.String("target channel name", targetChName)) + resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq) + if err != nil { + return 0, "", fmt.Errorf("syncSegmentID Failed:%w", err) + } + if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) + } + segmentID := resp.SegIDAssignments[0].SegID + log.Info("new segment assigned", + zap.Int64("task ID", importTaskID), + zap.Int64("segmentID", segmentID), + zap.Int("shard ID", shardID), + zap.String("target channel name", targetChName)) + return segmentID, targetChName, nil + } +} + +func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *schemapb.CollectionSchema, ts Timestamp) importutil.CreateBinlogsFunc { + return func(fields map[storage.FieldID]storage.FieldData, segmentID int64) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error) { + var rowNum int + for _, field := range fields { + rowNum = field.RowNum() + break + } + + chNames := req.GetImportTask().GetChannelNames() + importTaskID := req.GetImportTask().GetTaskId() + if rowNum <= 0 { + log.Info("fields data is empty, no need to generate binlog", + zap.Int64("task ID", importTaskID), + zap.Int("# of channels", len(chNames)), + zap.Strings("channel names", chNames), + ) + return nil, nil, nil + } + + colID := req.GetImportTask().GetCollectionId() + partID := req.GetImportTask().GetPartitionId() + + fieldInsert, fieldStats, err := createBinLogs(rowNum, schema, ts, fields, node, segmentID, colID, partID) + if err != nil { + log.Error("failed to create binlogs", + zap.Int64("task ID", importTaskID), + zap.Int("# of channels", len(chNames)), + zap.Strings("channel names", chNames), + zap.Any("err", err), + ) + return nil, nil, err + } + + log.Info("new binlog created", + zap.Int64("task ID", importTaskID), + zap.Int64("segmentID", segmentID), + zap.Int("insert log count", len(fieldInsert)), + zap.Int("stats log count", len(fieldStats))) + + return fieldInsert, fieldStats, err + } +} + +func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoordpb.ImportResult, ts Timestamp) importutil.SaveSegmentFunc { + importTaskID := req.GetImportTask().GetTaskId() + return func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error { + log.Info("adding segment to the correct DataNode flow graph and saving binlog paths", + zap.Int64("task ID", importTaskID), + zap.Int64("segmentID", segmentID), + zap.String("targetChName", targetChName), + zap.Int64("rowCount", rowCount), + zap.Uint64("ts", ts)) + + err := retry.Do(context.Background(), func() error { + // Ask DataCoord to save binlog path and add segment to the corresponding DataNode flow graph. + resp, err := node.dataCoord.SaveImportSegment(context.Background(), &datapb.SaveImportSegmentRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithTimeStamp(ts), // Pass current timestamp downstream. + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + SegmentId: segmentID, + ChannelName: targetChName, + CollectionId: req.GetImportTask().GetCollectionId(), + PartitionId: req.GetImportTask().GetPartitionId(), + RowNum: rowCount, + SaveBinlogPathReq: &datapb.SaveBinlogPathsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(0), + commonpbutil.WithMsgID(0), + commonpbutil.WithTimeStamp(ts), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + SegmentID: segmentID, + CollectionID: req.GetImportTask().GetCollectionId(), + Field2BinlogPaths: fieldsInsert, + Field2StatslogPaths: fieldsStats, + // Set start positions of a SaveBinlogPathRequest explicitly. + StartPositions: []*datapb.SegmentStartPosition{ + { + StartPosition: &internalpb.MsgPosition{ + ChannelName: targetChName, + Timestamp: ts, + }, + SegmentID: segmentID, + }, + }, + Importing: true, + }, + }) + // Only retrying when DataCoord is unhealthy or err != nil, otherwise return immediately. + if err != nil { + return fmt.Errorf(err.Error()) + } + if resp.ErrorCode != commonpb.ErrorCode_Success && resp.ErrorCode != commonpb.ErrorCode_DataCoordNA { + return retry.Unrecoverable(fmt.Errorf("failed to save import segment, reason = %s", resp.Reason)) + } else if resp.ErrorCode == commonpb.ErrorCode_DataCoordNA { + return fmt.Errorf("failed to save import segment: %s", resp.GetReason()) + } + return nil + }) + if err != nil { + log.Warn("failed to save import segment", zap.Error(err)) + return err + } + log.Info("segment imported and persisted", + zap.Int64("task ID", importTaskID), + zap.Int64("segmentID", segmentID)) + res.Segments = append(res.Segments, segmentID) + res.RowCount += rowCount + return nil + } +} + +func composeAssignSegmentIDRequest(rowNum int, shardID int, chNames []string, + collID int64, partID int64) *datapb.AssignSegmentIDRequest { + // use the first field's row count as segment row count + // all the fields row count are same, checked by ImportWrapper + // ask DataCoord to alloc a new segment + log.Info("import task flush segment", + zap.Any("channel names", chNames), + zap.Int("shard ID", shardID)) + segReqs := []*datapb.SegmentIDRequest{ + { + ChannelName: chNames[shardID], + Count: uint32(rowNum), + CollectionID: collID, + PartitionID: partID, + IsImport: true, + }, + } + segmentIDReq := &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: typeutil.ProxyRole, + SegmentIDRequests: segReqs, + } + return segmentIDReq +} + +func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, + fields map[storage.FieldID]storage.FieldData, node *DataNode, segmentID, colID, partID UniqueID) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tsFieldData := make([]int64, rowNum) + for i := range tsFieldData { + tsFieldData[i] = int64(ts) + } + fields[common.TimeStampField] = &storage.Int64FieldData{ + Data: tsFieldData, + NumRows: []int64{int64(rowNum)}, + } + + if status, _ := node.dataCoord.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ + Stats: []*datapb.SegmentStats{{ + SegmentID: segmentID, + NumRows: int64(rowNum), + }}, + }); status.GetErrorCode() != commonpb.ErrorCode_Success { + return nil, nil, fmt.Errorf(status.GetReason()) + } + + data := BufferData{buffer: &InsertData{ + Data: fields, + }} + data.updateSize(int64(rowNum)) + meta := &etcdpb.CollectionMeta{ + ID: colID, + Schema: schema, + } + binLogs, statsBinLogs, err := storage.NewInsertCodec(meta).Serialize(partID, segmentID, data.buffer) + if err != nil { + return nil, nil, err + } + + var alloc allocatorInterface = newAllocator(node.rootCoord) + start, _, err := alloc.allocIDBatch(uint32(len(binLogs))) + if err != nil { + return nil, nil, err + } + + field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs)) + kvs := make(map[string][]byte, len(binLogs)) + field2Logidx := make(map[UniqueID]UniqueID, len(binLogs)) + for idx, blob := range binLogs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + if err != nil { + log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) + return nil, nil, err + } + + logidx := start + int64(idx) + + // no error raise if alloc=false + k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) + + key := path.Join(node.chunkManager.RootPath(), common.SegmentInsertLogPath, k) + kvs[key] = blob.Value[:] + field2Insert[fieldID] = &datapb.Binlog{ + EntriesNum: data.size, + TimestampFrom: ts, + TimestampTo: ts, + LogPath: key, + LogSize: int64(len(blob.Value)), + } + field2Logidx[fieldID] = logidx + } + + field2Stats := make(map[UniqueID]*datapb.Binlog) + // write stats binlog + for _, blob := range statsBinLogs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + if err != nil { + log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) + return nil, nil, err + } + + logidx := field2Logidx[fieldID] + + // no error raise if alloc=false + k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) + + key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k) + kvs[key] = blob.Value + field2Stats[fieldID] = &datapb.Binlog{ + EntriesNum: data.size, + TimestampFrom: ts, + TimestampTo: ts, + LogPath: key, + LogSize: int64(len(blob.Value)), + } + } + + err = node.chunkManager.MultiWrite(ctx, kvs) + if err != nil { + return nil, nil, err + } + var ( + fieldInsert []*datapb.FieldBinlog + fieldStats []*datapb.FieldBinlog + ) + for k, v := range field2Insert { + fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}}) + } + for k, v := range field2Stats { + fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}}) + } + return fieldInsert, fieldStats, nil +} + +func logDupFlush(cID, segID int64) { + log.Info("segment is already being flushed, ignoring flush request", + zap.Int64("collection ID", cID), + zap.Int64("segment ID", segID)) +} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go new file mode 100644 index 0000000000..b2d08513f8 --- /dev/null +++ b/internal/datanode/services_test.go @@ -0,0 +1,746 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "math" + "math/rand" + "path/filepath" + "sync" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/importutil" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/sessionutil" + + "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +type DataNodeServicesSuite struct { + suite.Suite + + node *DataNode + etcdCli *clientv3.Client + ctx context.Context + cancel context.CancelFunc +} + +func TestDataNodeServicesSuite(t *testing.T) { + suite.Run(t, new(DataNodeServicesSuite)) +} + +func (s *DataNodeServicesSuite) SetupSuite() { + importutil.ReportImportAttempts = 1 + + s.ctx, s.cancel = context.WithCancel(context.Background()) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + s.Require().NoError(err) + s.etcdCli = etcdCli +} + +func (s *DataNodeServicesSuite) SetupTest() { + s.node = newIDLEDataNodeMock(s.ctx, schemapb.DataType_Int64) + s.node.SetEtcdClient(s.etcdCli) + + err := s.node.Init() + s.Require().NoError(err) + + err = s.node.Start() + s.Require().NoError(err) + + s.node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) + paramtable.SetNodeID(1) +} + +func (s *DataNodeServicesSuite) TearDownSuite() { + s.cancel() + err := s.etcdCli.Close() + s.Require().NoError(err) + s.node.Stop() +} + +func (s *DataNodeServicesSuite) TestNotInUseAPIs() { + s.Run("WatchDmChannels", func() { + status, err := s.node.WatchDmChannels(s.ctx, &datapb.WatchDmChannelsRequest{}) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + }) + s.Run("GetTimeTickChannel", func() { + _, err := s.node.GetTimeTickChannel(s.ctx) + s.Assert().NoError(err) + }) + + s.Run("GetStatisticsChannel", func() { + _, err := s.node.GetStatisticsChannel(s.ctx) + s.Assert().NoError(err) + }) +} + +func (s *DataNodeServicesSuite) TestGetComponentStates() { + resp, err := s.node.GetComponentStates(s.ctx) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().Equal(common.NotRegisteredID, resp.State.NodeID) + + s.node.session = &sessionutil.Session{} + s.node.session.UpdateRegistered(true) + resp, err = s.node.GetComponentStates(context.Background()) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) +} + +func (s *DataNodeServicesSuite) TestGetCompactionState() { + s.Run("success", func() { + s.node.compactionExecutor.executing.Store(int64(3), 0) + s.node.compactionExecutor.executing.Store(int64(2), 0) + s.node.compactionExecutor.completed.Store(int64(1), &datapb.CompactionResult{ + PlanID: 1, + SegmentID: 10, + }) + stat, err := s.node.GetCompactionState(s.ctx, nil) + s.Assert().NoError(err) + s.Assert().Equal(3, len(stat.GetResults())) + + var mu sync.RWMutex + cnt := 0 + for _, v := range stat.GetResults() { + if v.GetState() == commonpb.CompactionState_Completed { + mu.Lock() + cnt++ + mu.Unlock() + } + } + mu.Lock() + s.Assert().Equal(1, cnt) + mu.Unlock() + + mu.Lock() + cnt = 0 + mu.Unlock() + s.node.compactionExecutor.completed.Range(func(k, v any) bool { + mu.Lock() + cnt++ + mu.Unlock() + return true + }) + s.Assert().Equal(0, cnt) + }) + + s.Run("unhealthy", func() { + node := &DataNode{} + node.UpdateStateCode(commonpb.StateCode_Abnormal) + resp, _ := node.GetCompactionState(s.ctx, nil) + s.Assert().Equal("DataNode is unhealthy", resp.GetStatus().GetReason()) + }) +} + +func (s *DataNodeServicesSuite) TestFlushSegments() { + dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: dmChannelName, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + } + + err := s.node.flowgraphManager.addAndStart(s.node, vchan, nil) + s.Require().NoError(err) + + fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) + s.Require().True(ok) + + err = fgservice.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_New, + segID: 0, + collID: 1, + partitionID: 1, + startPos: &internalpb.MsgPosition{}, + endPos: &internalpb.MsgPosition{}, + }) + s.Require().NoError(err) + + req := &datapb.FlushSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.node.session.ServerID, + }, + DbID: 0, + CollectionID: 1, + SegmentIDs: []int64{0}, + } + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + + status, err := s.node.FlushSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + }() + + go func() { + defer wg.Done() + + timeTickMsgPack := msgstream.MsgPack{} + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: Timestamp(0), + EndTimestamp: Timestamp(0), + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + MsgID: UniqueID(0), + Timestamp: math.MaxUint64, + SourceID: 0, + }, + }, + } + timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) + + // pulsar produce + factory := dependency.NewDefaultFactory(true) + insertStream, err := factory.NewMsgStream(s.ctx) + s.Assert().NoError(err) + insertStream.AsProducer([]string{dmChannelName}) + defer insertStream.Close() + + _, err = insertStream.Broadcast(&timeTickMsgPack) + s.Assert().NoError(err) + + _, err = insertStream.Broadcast(&timeTickMsgPack) + s.Assert().NoError(err) + }() + + wg.Wait() + // dup call + status, err := s.node.FlushSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + + // failure call + req = &datapb.FlushSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: -1, + }, + DbID: 0, + CollectionID: 1, + SegmentIDs: []int64{1}, + } + status, err = s.node.FlushSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_NodeIDNotMatch, status.ErrorCode) + + req = &datapb.FlushSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.node.session.ServerID, + }, + DbID: 0, + CollectionID: 1, + SegmentIDs: []int64{1}, + } + + status, err = s.node.FlushSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + + req = &datapb.FlushSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.node.session.ServerID, + }, + DbID: 0, + CollectionID: 1, + SegmentIDs: []int64{}, + } + + status, err = s.node.FlushSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) +} + +func (s *DataNodeServicesSuite) TestShowConfigurations() { + pattern := "datanode.Port" + req := &internalpb.ShowConfigurationsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchQueryChannels, + MsgID: rand.Int63(), + }, + Pattern: pattern, + } + + //test closed server + node := &DataNode{} + node.session = &sessionutil.Session{ServerID: 1} + node.stateCode.Store(commonpb.StateCode_Abnormal) + + resp, err := node.ShowConfigurations(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + + node.stateCode.Store(commonpb.StateCode_Healthy) + resp, err = node.ShowConfigurations(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().Equal(1, len(resp.Configuations)) + s.Assert().Equal("datanode.port", resp.Configuations[0].Key) +} + +func (s *DataNodeServicesSuite) TestGetMetrics() { + node := &DataNode{} + node.session = &sessionutil.Session{ServerID: 1} + node.flowgraphManager = newFlowgraphManager() + // server is closed + node.stateCode.Store(commonpb.StateCode_Abnormal) + resp, err := node.GetMetrics(s.ctx, &milvuspb.GetMetricsRequest{}) + s.Assert().NoError(err) + s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + node.stateCode.Store(commonpb.StateCode_Healthy) + + // failed to parse metric type + invalidRequest := "invalid request" + resp, err = node.GetMetrics(s.ctx, &milvuspb.GetMetricsRequest{ + Request: invalidRequest, + }) + s.Assert().NoError(err) + s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + // unsupported metric type + unsupportedMetricType := "unsupported" + req, err := metricsinfo.ConstructRequestByMetricType(unsupportedMetricType) + s.Assert().NoError(err) + resp, err = node.GetMetrics(s.ctx, req) + s.Assert().NoError(err) + s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + // normal case + req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + s.Assert().NoError(err) + resp, err = node.GetMetrics(node.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + log.Info("Test DataNode.GetMetrics", + zap.String("name", resp.ComponentName), + zap.String("response", resp.Response)) +} + +func (s *DataNodeServicesSuite) TestImport() { + s.node.rootCoord = &RootCoordFactory{ + collectionID: 100, + pkType: schemapb.DataType_Int64, + } + s.Run("test normal", func() { + content := []byte(`{ + "rows":[ + {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, + {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, + {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, + {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, + {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} + ] + }`) + + chName1 := "fake-by-dev-rootcoord-dml-testimport-1" + chName2 := "fake-by-dev-rootcoord-dml-testimport-2" + err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: chName1, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().Nil(err) + err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: chName2, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().Nil(err) + + _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + s.Require().True(ok) + _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + s.Require().True(ok) + + filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json") + err = s.node.chunkManager.Write(s.ctx, filePath, content) + s.Require().Nil(err) + req := &datapb.ImportTaskRequest{ + ImportTask: &datapb.ImportTask{ + CollectionId: 100, + PartitionId: 100, + ChannelNames: []string{chName1, chName2}, + Files: []string{filePath}, + RowBased: true, + }, + } + s.node.rootCoord.(*RootCoordFactory).ReportImportErr = true + _, err = s.node.Import(s.ctx, req) + s.Assert().NoError(err) + s.node.rootCoord.(*RootCoordFactory).ReportImportErr = false + + s.node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = true + _, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = false + + s.node.dataCoord.(*DataCoordFactory).AddSegmentError = true + _, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.node.dataCoord.(*DataCoordFactory).AddSegmentError = false + + s.node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = true + _, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = false + + stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode()) + s.Assert().Equal("", stat.GetReason()) + }) + + s.Run("Test Import bad flow graph", func() { + chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph" + chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph" + err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: chName1, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().Nil(err) + err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 999, // wrong collection ID. + ChannelName: chName2, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().Nil(err) + + _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + s.Require().True(ok) + _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + s.Require().True(ok) + + content := []byte(`{ + "rows":[ + {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, + {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, + {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, + {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, + {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} + ] + }`) + + filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json") + err = s.node.chunkManager.Write(s.ctx, filePath, content) + s.Assert().NoError(err) + req := &datapb.ImportTaskRequest{ + ImportTask: &datapb.ImportTask{ + CollectionId: 100, + PartitionId: 100, + ChannelNames: []string{chName1, chName2}, + Files: []string{filePath}, + RowBased: true, + }, + } + stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode()) + s.Assert().Equal("", stat.GetReason()) + }) + s.Run("Test Import report import error", func() { + s.node.rootCoord = &RootCoordFactory{ + collectionID: 100, + pkType: schemapb.DataType_Int64, + ReportImportErr: true, + } + content := []byte(`{ + "rows":[ + {"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]}, + {"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]}, + {"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]}, + {"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]}, + {"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]} + ] + }`) + + filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json") + err := s.node.chunkManager.Write(s.ctx, filePath, content) + s.Assert().NoError(err) + req := &datapb.ImportTaskRequest{ + ImportTask: &datapb.ImportTask{ + CollectionId: 100, + PartitionId: 100, + ChannelNames: []string{"ch1", "ch2"}, + Files: []string{filePath}, + RowBased: true, + }, + } + stat, err := s.node.Import(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + }) + + s.Run("Test Import error", func() { + s.node.rootCoord = &RootCoordFactory{collectionID: -1} + req := &datapb.ImportTaskRequest{ + ImportTask: &datapb.ImportTask{ + CollectionId: 100, + PartitionId: 100, + }, + } + stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.ErrorCode) + + stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, returnError), req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + + s.node.stateCode.Store(commonpb.StateCode_Abnormal) + stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + }) +} + +func (s *DataNodeServicesSuite) TestAddImportSegment() { + s.Run("test AddSegment", func() { + s.node.rootCoord = &RootCoordFactory{ + collectionID: 100, + pkType: schemapb.DataType_Int64, + } + + chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1" + chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2" + err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: chName1, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().NoError(err) + err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: chName2, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + }, nil) + s.Require().NoError(err) + + _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + s.Assert().True(ok) + _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + s.Assert().True(ok) + + stat, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ + SegmentId: 100, + CollectionId: 100, + PartitionId: 100, + ChannelName: chName1, + RowNum: 500, + }) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetStatus().GetErrorCode()) + s.Assert().Equal("", stat.GetStatus().GetReason()) + s.Assert().NotEqual(nil, stat.GetChannelPos()) + + getFlowGraphServiceAttempts = 3 + stat, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ + SegmentId: 100, + CollectionId: 100, + PartitionId: 100, + ChannelName: "bad-ch-name", + RowNum: 500, + }) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetStatus().GetErrorCode()) + }) + +} + +func (s *DataNodeServicesSuite) TestSyncSegments() { + chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1" + + err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + ChannelName: chanName, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{100, 200, 300}, + }, nil) + s.Require().NoError(err) + fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName) + s.Assert().True(ok) + + s1 := Segment{segmentID: 100} + s2 := Segment{segmentID: 200} + s3 := Segment{segmentID: 300} + s1.setType(datapb.SegmentType_Flushed) + s2.setType(datapb.SegmentType_Flushed) + s3.setType(datapb.SegmentType_Flushed) + fg.channel.(*ChannelMeta).segments = map[UniqueID]*Segment{ + s1.segmentID: &s1, + s2.segmentID: &s2, + s3.segmentID: &s3, + } + + s.Run("invalid compacted from", func() { + req := &datapb.SyncSegmentsRequest{ + CompactedTo: 400, + NumOfRows: 100, + } + + req.CompactedFrom = []UniqueID{} + status, err := s.node.SyncSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + + req.CompactedFrom = []UniqueID{101, 201} + status, err = s.node.SyncSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + }) + + s.Run("valid request numRows>0", func() { + req := &datapb.SyncSegmentsRequest{ + CompactedFrom: []UniqueID{100, 200, 101, 201}, + CompactedTo: 102, + NumOfRows: 100, + } + status, err := s.node.SyncSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + + s.Assert().True(fg.channel.hasSegment(req.CompactedTo, true)) + s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true)) + s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[1], true)) + }) + + s.Run("valid request numRows=0", func() { + s1.setType(datapb.SegmentType_Flushed) + s2.setType(datapb.SegmentType_Flushed) + s3.setType(datapb.SegmentType_Flushed) + + fg.channel.(*ChannelMeta).segments = map[UniqueID]*Segment{ + s1.segmentID: &s1, + s2.segmentID: &s2, + s3.segmentID: &s3, + } + + req := &datapb.SyncSegmentsRequest{ + CompactedFrom: []int64{s1.segmentID, s2.segmentID}, + CompactedTo: 101, + NumOfRows: 0, + } + status, err := s.node.SyncSegments(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + + s.Assert().False(fg.channel.hasSegment(req.CompactedTo, true)) + s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true)) + s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[1], true)) + }) +} + +func (s *DataNodeServicesSuite) TestResendSegmentStats() { + dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-ResendSegmentStats" + vChan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: dmChannelName, + UnflushedSegmentIds: []int64{}, + FlushedSegmentIds: []int64{}, + } + + err := s.node.flowgraphManager.addAndStart(s.node, vChan, nil) + s.Require().Nil(err) + + fgService, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) + s.Assert().True(ok) + + err = fgService.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_New, + segID: 0, + collID: 1, + partitionID: 1, + startPos: &internalpb.MsgPosition{}, + endPos: &internalpb.MsgPosition{}, + }) + s.Assert().Nil(err) + err = fgService.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_New, + segID: 1, + collID: 1, + partitionID: 2, + startPos: &internalpb.MsgPosition{}, + endPos: &internalpb.MsgPosition{}, + }) + s.Assert().Nil(err) + err = fgService.channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_New, + segID: 2, + collID: 1, + partitionID: 3, + startPos: &internalpb.MsgPosition{}, + endPos: &internalpb.MsgPosition{}, + }) + s.Assert().Nil(err) + + req := &datapb.ResendSegmentStatsRequest{ + Base: &commonpb.MsgBase{}, + } + + wg := sync.WaitGroup{} + wg.Add(2) + + resp, err := s.node.ResendSegmentStats(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent()) + + // Duplicate call. + resp, err = s.node.ResendSegmentStats(s.ctx, req) + s.Assert().NoError(err) + s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent()) +}