diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4673f538e3..a3de56a3d5 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -1253,6 +1253,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil colID := req.GetImportTask().GetCollectionId() segmentIDReq := composeAssignSegmentIDRequest(1, shardID, chNames, colID, partID) targetChName := segmentIDReq.GetSegmentIDRequests()[0].GetChannelName() + logFields = append(logFields, zap.Int64("collection ID", colID)) logFields = append(logFields, zap.String("target channel name", targetChName)) log.Info("assign segment for the import task", logFields...) resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq) @@ -1313,6 +1314,7 @@ func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *sc log.Info("fields data is empty, no need to generate binlog", logFields...) return nil, nil, nil } + logFields = append(logFields, zap.Int("row count", rowNum)) colID := req.GetImportTask().GetCollectionId() fieldInsert, fieldStats, err := createBinLogs(rowNum, schema, ts, fields, node, segmentID, colID, partID) @@ -1406,12 +1408,6 @@ func composeAssignSegmentIDRequest(rowNum int, shardID int, chNames []string, // use the first field's row count as segment row count // all the fields row count are same, checked by ImportWrapper // ask DataCoord to alloc a new segment - log.Info("import task flush segment", - zap.Int("rowCount", rowNum), - zap.Any("channel names", chNames), - zap.Int64("collectionID", collID), - zap.Int64("partitionID", partID), - zap.Int("shardID", shardID)) segReqs := []*datapb.SegmentIDRequest{ { ChannelName: chNames[shardID], diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index 62110c4352..d8900ecc79 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -454,33 +454,39 @@ func (p *ImportWrapper) flushFunc(fields BlockData, shardID int, partitionID int } // if fields data is empty, do nothing - var rowNum int + rowNum := 0 memSize := 0 for _, field := range fields { rowNum = field.RowNum() memSize += field.GetMemorySize() - break } if rowNum <= 0 { log.Warn("import wrapper: fields data is empty", logFields...) return nil } + logFields = append(logFields, zap.Int("rowNum", rowNum), zap.Int("memSize", memSize)) + log.Info("import wrapper: flush block data to binlog", logFields...) + // if there is no segment for this shard, create a new one // if the segment exists and its size almost exceed segmentSize, close it and create a new one var segment *WorkingSegment if shard, ok := p.workingSegments[shardID]; ok { - if segment, exists := shard[partitionID]; exists { - // the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment - if int64(segment.memSize)+int64(memSize) >= p.segmentSize { - err := p.closeWorkingSegment(segment) + if segmentTemp, exists := shard[partitionID]; exists { + log.Info("import wrapper: compare working segment memSize with segmentSize", + zap.Int("memSize", segmentTemp.memSize), zap.Int64("segmentSize", p.segmentSize)) + if int64(segmentTemp.memSize)+int64(memSize) >= p.segmentSize { + // the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment + err := p.closeWorkingSegment(segmentTemp) if err != nil { logFields = append(logFields, zap.Error(err)) log.Warn("import wrapper: failed to close working segment", logFields...) return err } - segment = nil p.workingSegments[shardID][partitionID] = nil + } else { + // the exist segment size is small, no need to close + segment = segmentTemp } } } else {