From a1cdc55bcba258abbb13900b008cb38ef0a97853 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 18 Oct 2022 14:07:25 +0800 Subject: [PATCH] Make newSegment transfer state after SaveBinlogPath succuess (#19858) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- .../flow_graph_insert_buffer_node_test.go | 6 +++++- internal/datanode/flush_manager.go | 11 ++++++++++- internal/datanode/segment_replica.go | 15 +++++++++++---- internal/datanode/segment_replica_test.go | 10 ++++++++-- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 5cdc59282f..1d788a72cd 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -434,7 +435,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { fpMut.Lock() flushPacks = append(flushPacks, pack) fpMut.Unlock() - colRep.listNewSegmentsStartPositions() + startPos := colRep.listNewSegmentsStartPositions() + colRep.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID { + return pos.GetSegmentID() + })) colRep.listSegmentsCheckPoints() if pack.flushed || pack.dropped { colRep.segmentFlushed(pack.segmentID) diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 052186e931..e89332ac1d 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/samber/lo" ) // flushManager defines a flush manager signature @@ -688,8 +689,9 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl } } + startPos := dsService.replica.listNewSegmentsStartPositions() // start positions for all new segments - for _, pos := range dsService.replica.listNewSegmentsStartPositions() { + for _, pos := range startPos { segment, has := segmentPack[pos.GetSegmentID()] if !has { segment = &datapb.DropVirtualChannelSegment{ @@ -726,6 +728,9 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason()) } + dsService.replica.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID { + return pos.GetSegmentID() + })) return nil }, opts...) if err != nil { @@ -827,6 +832,10 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet if rsp.ErrorCode != commonpb.ErrorCode_Success { return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason) } + + dsService.replica.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID { + return pos.GetSegmentID() + })) return nil }, opts...) if err != nil { diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index ecd3391308..3afaf449fd 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -65,6 +65,7 @@ type Replica interface { listPartitionSegments(partID UniqueID) []UniqueID filterSegments(channelName string, partitionID UniqueID) []*Segment listNewSegmentsStartPositions() []*datapb.SegmentStartPosition + transferNewSegments(segmentIDs []UniqueID) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) updateSegmentCheckPoint(segID UniqueID) @@ -525,18 +526,24 @@ func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.Segment result := make([]*datapb.SegmentStartPosition, 0, len(replica.newSegments)) for id, seg := range replica.newSegments { - result = append(result, &datapb.SegmentStartPosition{ SegmentID: id, StartPosition: seg.startPos, }) - - // transfer states - replica.new2NormalSegment(id) } return result } +// transferNewSegments make new segment transfer to normal segments. +func (replica *SegmentReplica) transferNewSegments(segmentIDs []UniqueID) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + for _, segmentID := range segmentIDs { + replica.new2NormalSegment(segmentID) + } +} + // listSegmentsCheckPoints gets check points from both *New* and *Normal* segments. func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint { replica.segMu.RLock() diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index c06ae43904..9ad77565a7 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -1003,8 +1004,13 @@ func TestInnerFunctionSegment(t *testing.T) { assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName) assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp) - assert.Equal(t, 0, len(replica.newSegments)) - assert.Equal(t, 2, len(replica.normalSegments)) + // not change until transferNewSegment called + assert.Equal(t, 1, len(replica.newSegments)) + assert.Equal(t, 1, len(replica.normalSegments)) + + replica.transferNewSegments(lo.Map(segPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID { + return pos.GetSegmentID() + })) cps := replica.listSegmentsCheckPoints() assert.Equal(t, 2, len(cps))