diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index d0791ecbe8..99b81b3916 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -168,12 +168,12 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT dbuff.delData.Append(pk, ts) - if Timestamp(ts) < dbuff.TimestampFrom { - dbuff.TimestampFrom = Timestamp(ts) + if ts < dbuff.TimestampFrom { + dbuff.TimestampFrom = ts } - if Timestamp(ts) > dbuff.TimestampTo { - dbuff.TimestampTo = Timestamp(ts) + if ts > dbuff.TimestampTo { + dbuff.TimestampTo = ts } } } @@ -181,7 +181,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT dbuff.updateSize(dbuff.delData.RowCount) log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()), zap.Int("number of pks to compact in insert logs", len(pk2ts)), - zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart)))) + zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) return pk2ts, dbuff, nil } @@ -310,7 +310,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), - zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart)))) + zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) return iDatas, numRows, nil } @@ -347,7 +347,7 @@ func (t *compactionTask) compact() error { targetSegID = t.plan.GetSegmentBinlogs()[0].GetSegmentID() } - log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("timeout in seconds", t.plan.GetTimeoutInSeconds())) + log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs())) for _, s := range t.plan.GetSegmentBinlogs() { segIDs = append(segIDs, s.GetSegmentID()) @@ -370,7 +370,7 @@ func (t *compactionTask) compact() error { <-ti.Injected() injectEnd := time.Now() defer func() { - log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(injectEnd.Sub(injectStart)))) + log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart)))) }() var ( @@ -463,7 +463,7 @@ func (t *compactionTask) compact() error { err = g.Wait() downloadEnd := time.Now() defer func() { - log.Debug("download elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(downloadEnd.Sub(downloadStart)))) + log.Debug("download elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart)))) }() if err != nil { @@ -493,7 +493,7 @@ func (t *compactionTask) compact() error { uploadEnd := time.Now() defer func() { - log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uploadEnd.Sub(uploadStart)))) + log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uploadEnd.Sub(uploadStart)))) }() for _, fbl := range segPaths.deltaInfo { @@ -526,7 +526,7 @@ func (t *compactionTask) compact() error { } rpcEnd := time.Now() defer func() { - log.Debug("rpc elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(rpcEnd.Sub(rpcStart)))) + log.Debug("rpc elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(rpcEnd.Sub(rpcStart)))) }() // Compaction I: update pk range. @@ -550,7 +550,7 @@ func (t *compactionTask) compact() error { ti.injectDone(true) uninjectEnd := time.Now() defer func() { - log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart)))) + log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart)))) }() log.Info("compaction done", @@ -561,7 +561,7 @@ func (t *compactionTask) compact() error { zap.Int("num of delta paths", len(segPaths.deltaInfo)), ) - log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart)))) + log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(compactStart)))) metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Observe(float64(t.tr.ElapseSpan().Milliseconds())) return nil diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 71170b753f..ce839f6141 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -457,9 +457,9 @@ func (node *DataNode) Start() error { }, Count: 1, }) - if err != nil { - log.Warn("fail to alloc timestamp", zap.Error(err)) - return err + if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err)) + return errors.New("DataNode fail to alloc timestamp") } connectEtcdFn := func() error { @@ -480,10 +480,6 @@ func (node *DataNode) Start() error { node.chunkManager = chunkManager - if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { - return errors.New("DataNode fail to start") - } - go node.BackGroundGC(node.clearSignal) go node.compactionExecutor.start(node.ctx) @@ -509,8 +505,7 @@ func (node *DataNode) GetStateCode() internalpb.StateCode { } func (node *DataNode) isHealthy() bool { - code := node.State.Load().(internalpb.StateCode) - return code == internalpb.StateCode_Healthy + return node.GetStateCode() == internalpb.StateCode_Healthy } // WatchDmChannels is not in use @@ -543,9 +538,9 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo return states, nil } -// ReadyToFlush tells wether DataNode is ready for flushing +// ReadyToFlush tells whether DataNode is ready for flushing func (node *DataNode) ReadyToFlush() error { - if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { + if !node.isHealthy() { return errors.New("DataNode not in HEALTHY state") } return nil @@ -565,7 +560,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen ErrorCode: commonpb.ErrorCode_UnexpectedError, } - if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { + if !node.isHealthy() { errStatus.Reason = "dataNode not in HEALTHY state" return errStatus, nil } @@ -653,7 +648,6 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend return &datapb.ResendSegmentStatsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, - Reason: "", }, SegResent: segResent, }, nil @@ -689,9 +683,7 @@ func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, - Reason: "", }, - Value: "", }, nil } @@ -700,9 +692,7 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, - Reason: "", }, - Value: "", }, nil } @@ -724,7 +714,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: msgDataNodeIsUnhealthy(Params.DataNodeCfg.GetNodeID()), }, - Response: "", }, nil } @@ -740,7 +729,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, - Response: "", }, nil } @@ -770,7 +758,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: metricsinfo.MsgUnimplementedMetric, }, - Response: "", }, nil } @@ -818,8 +805,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) zap.Int64("task ID", req.GetImportTask().GetTaskId()), zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), - zap.Any("channel names", req.GetImportTask().GetChannelNames()), - zap.Any("working dataNodes", req.WorkingNodes)) + zap.Strings("channel names", req.GetImportTask().GetChannelNames()), + zap.Int64s("working dataNodes", req.WorkingNodes)) defer func() { log.Info("DataNode finish import request", zap.Int64("task ID", req.GetImportTask().GetTaskId())) }() @@ -956,6 +943,7 @@ func (node *DataNode) AddSegment(ctx context.Context, req *datapb.AddSegmentRequ return &commonpb.Status{ // TODO: Add specific error code. ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), }, nil } } @@ -972,7 +960,7 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root log.Error("import task returns invalid shard number", zap.Int("shard num", shardNum), zap.Int("# of channels", len(req.GetImportTask().GetChannelNames())), - zap.Any("channel names", req.GetImportTask().GetChannelNames()), + zap.Strings("channel names", req.GetImportTask().GetChannelNames()), ) return fmt.Errorf("syncSegmentID Failed: invalid shard number %d", shardNum) } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 590b931be4..b1eba4e935 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -131,14 +131,14 @@ func newParallelConfig() parallelConfig { return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength, Params.DataNodeCfg.FlowGraphMaxParallelism} } -// start starts the flowgraph in datasyncservice +// start starts the flow graph in datasyncservice func (dsService *dataSyncService) start() { if dsService.fg != nil { - log.Info("dataSyncService starting flowgraph", zap.Int64("collectionID", dsService.collectionID), + log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID), zap.String("vChanName", dsService.vchannelName)) dsService.fg.Start() } else { - log.Warn("dataSyncService starting flowgraph is nil", zap.Int64("collectionID", dsService.collectionID), + log.Warn("dataSyncService starting flow graph is nil", zap.Int64("collectionID", dsService.collectionID), zap.String("vChanName", dsService.vchannelName)) } } @@ -351,7 +351,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro // getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb.SegmentInfo, error) { - var segmentInfos []*datapb.SegmentInfo infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_SegmentInfo, @@ -371,6 +370,5 @@ func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) return nil, err } - segmentInfos = infoResp.Infos - return segmentInfos, nil + return infoResp.Infos, nil } diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 649ebd4ce2..d3e7c41cc1 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -46,31 +46,13 @@ func newMetaService(rc types.RootCoord, collectionID UniqueID) *metaService { } } -// TODO: Replace with getCollectionInfo below. // getCollectionSchema get collection schema with provided collection id at specified timestamp. func (mService *metaService) getCollectionSchema(ctx context.Context, collID UniqueID, timestamp Timestamp) (*schemapb.CollectionSchema, error) { - req := &milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DescribeCollection, - MsgID: 0, //GOOSE TODO - Timestamp: 0, // GOOSE TODO - SourceID: Params.DataNodeCfg.GetNodeID(), - }, - DbName: "default", // GOOSE TODO - CollectionID: collID, - TimeStamp: timestamp, + response, err := mService.getCollectionInfo(ctx, collID, timestamp) + if response != nil { + return response.GetSchema(), err } - - response, err := mService.rootCoord.DescribeCollection(ctx, req) - if err != nil { - return nil, fmt.Errorf("grpc error when describe collection %v from rootcoord: %s", collID, err.Error()) - } - - if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return nil, fmt.Errorf("describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason()) - } - - return response.GetSchema(), nil + return nil, err } // getCollectionInfo get collection info with provided collection id at specified timestamp. diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index d02db69fad..bbc99ddfd4 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -298,14 +298,7 @@ func (replica *SegmentReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID compactedTo2From := make(map[UniqueID][]UniqueID) for segID, seg := range replica.compactedSegments { - var from []UniqueID - from, ok := compactedTo2From[seg.compactedTo] - if !ok { - from = []UniqueID{} - } - - from = append(from, segID) - compactedTo2From[seg.compactedTo] = from + compactedTo2From[seg.compactedTo] = append(compactedTo2From[seg.compactedTo], segID) } return compactedTo2From