fix: auto flush all segment that is not created by streaming service (#40767)

issue: #40532

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-03-26 16:32:22 +08:00 committed by GitHub
parent 0e83a08ffe
commit af80a4dac2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2077 additions and 1960 deletions

View File

@ -21,9 +21,9 @@ func (_m *MockManager) EXPECT() *MockManager_Expecter {
return &MockManager_Expecter{mock: &_m.Mock}
}
// AllocNewGrowingSegment provides a mock function with given fields: ctx, collectionID, partitionID, segmentID, channelName, storageVersion
func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string, storageVersion int64) (*SegmentInfo, error) {
ret := _m.Called(ctx, collectionID, partitionID, segmentID, channelName, storageVersion)
// AllocNewGrowingSegment provides a mock function with given fields: ctx, req
func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, req AllocNewGrowingSegmentRequest) (*SegmentInfo, error) {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for AllocNewGrowingSegment")
@ -31,19 +31,19 @@ func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, collectionID
var r0 *SegmentInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, int64) (*SegmentInfo, error)); ok {
return rf(ctx, collectionID, partitionID, segmentID, channelName, storageVersion)
if rf, ok := ret.Get(0).(func(context.Context, AllocNewGrowingSegmentRequest) (*SegmentInfo, error)); ok {
return rf(ctx, req)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, int64) *SegmentInfo); ok {
r0 = rf(ctx, collectionID, partitionID, segmentID, channelName, storageVersion)
if rf, ok := ret.Get(0).(func(context.Context, AllocNewGrowingSegmentRequest) *SegmentInfo); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string, int64) error); ok {
r1 = rf(ctx, collectionID, partitionID, segmentID, channelName, storageVersion)
if rf, ok := ret.Get(1).(func(context.Context, AllocNewGrowingSegmentRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
@ -58,18 +58,14 @@ type MockManager_AllocNewGrowingSegment_Call struct {
// AllocNewGrowingSegment is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - segmentID int64
// - channelName string
// - storageVersion int64
func (_e *MockManager_Expecter) AllocNewGrowingSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, segmentID interface{}, channelName interface{}, storageVersion interface{}) *MockManager_AllocNewGrowingSegment_Call {
return &MockManager_AllocNewGrowingSegment_Call{Call: _e.mock.On("AllocNewGrowingSegment", ctx, collectionID, partitionID, segmentID, channelName, storageVersion)}
// - req AllocNewGrowingSegmentRequest
func (_e *MockManager_Expecter) AllocNewGrowingSegment(ctx interface{}, req interface{}) *MockManager_AllocNewGrowingSegment_Call {
return &MockManager_AllocNewGrowingSegment_Call{Call: _e.mock.On("AllocNewGrowingSegment", ctx, req)}
}
func (_c *MockManager_AllocNewGrowingSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string, storageVersion int64)) *MockManager_AllocNewGrowingSegment_Call {
func (_c *MockManager_AllocNewGrowingSegment_Call) Run(run func(ctx context.Context, req AllocNewGrowingSegmentRequest)) *MockManager_AllocNewGrowingSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string), args[5].(int64))
run(args[0].(context.Context), args[1].(AllocNewGrowingSegmentRequest))
})
return _c
}
@ -79,7 +75,7 @@ func (_c *MockManager_AllocNewGrowingSegment_Call) Return(_a0 *SegmentInfo, _a1
return _c
}
func (_c *MockManager_AllocNewGrowingSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string, int64) (*SegmentInfo, error)) *MockManager_AllocNewGrowingSegment_Call {
func (_c *MockManager_AllocNewGrowingSegment_Call) RunAndReturn(run func(context.Context, AllocNewGrowingSegmentRequest) (*SegmentInfo, error)) *MockManager_AllocNewGrowingSegment_Call {
_c.Call.Return(run)
return _c
}

View File

@ -67,6 +67,15 @@ func putAllocation(a *Allocation) {
allocPool.Put(a)
}
type AllocNewGrowingSegmentRequest struct {
CollectionID UniqueID
PartitionID UniqueID
SegmentID UniqueID
ChannelName string
StorageVersion int64
IsCreatedByStreaming bool
}
// Manager manages segment related operations.
//
//go:generate mockery --name=Manager --structname=MockManager --output=./ --filename=mock_segment_manager.go --with-expecter --inpackage
@ -77,7 +86,7 @@ type Manager interface {
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64, storageVersion int64) ([]*Allocation, error)
// AllocNewGrowingSegment allocates segment for streaming node.
AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string, storageVersion int64) (*SegmentInfo, error)
AllocNewGrowingSegment(ctx context.Context, req AllocNewGrowingSegmentRequest) (*SegmentInfo, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, channel string, segmentID UniqueID)
@ -368,10 +377,10 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
}
// AllocNewGrowingSegment allocates segment for streaming node.
func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string, storageVersion int64) (*SegmentInfo, error) {
s.channelLock.Lock(channelName)
defer s.channelLock.Unlock(channelName)
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName, storageVersion)
func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, req AllocNewGrowingSegmentRequest) (*SegmentInfo, error) {
s.channelLock.Lock(req.ChannelName)
defer s.channelLock.Unlock(req.ChannelName)
return s.openNewSegmentWithGivenSegmentID(ctx, req)
}
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, storageVersion int64) (*SegmentInfo, error) {
@ -383,40 +392,50 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Error("failed to open new segment while AllocID", zap.Error(err))
return nil, err
}
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, id, channelName, storageVersion)
return s.openNewSegmentWithGivenSegmentID(ctx, AllocNewGrowingSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: id,
ChannelName: channelName,
StorageVersion: storageVersion,
IsCreatedByStreaming: false,
})
}
func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string, storageVersion int64) (*SegmentInfo, error) {
maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID)
func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, req AllocNewGrowingSegmentRequest) (*SegmentInfo, error) {
maxNumOfRows, err := s.estimateMaxNumOfRows(req.CollectionID)
if err != nil {
log.Error("failed to open new segment while estimateMaxNumOfRows", zap.Error(err))
return nil, err
}
segmentInfo := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(maxNumOfRows),
Level: datapb.SegmentLevel_L1,
LastExpireTime: 0,
StorageVersion: storageVersion,
ID: req.SegmentID,
CollectionID: req.CollectionID,
PartitionID: req.PartitionID,
InsertChannel: req.ChannelName,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(maxNumOfRows),
Level: datapb.SegmentLevel_L1,
LastExpireTime: 0,
StorageVersion: req.StorageVersion,
IsCreatedByStreaming: req.IsCreatedByStreaming,
}
segment := NewSegmentInfo(segmentInfo)
if err := s.meta.AddSegment(ctx, segment); err != nil {
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
growing, _ := s.channel2Growing.GetOrInsert(channelName, typeutil.NewUniqueSet())
growing.Insert(segmentID)
growing, _ := s.channel2Growing.GetOrInsert(req.ChannelName, typeutil.NewUniqueSet())
growing.Insert(req.SegmentID)
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", maxNumOfRows),
zap.String("Channel", segmentInfo.InsertChannel))
zap.String("Channel", segmentInfo.InsertChannel),
zap.Bool("IsCreatedByStreaming", segmentInfo.IsCreatedByStreaming),
)
return segment, s.helper.afterCreateSegment(segmentInfo)
}

View File

@ -257,6 +257,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}
// AllocSegment alloc a new growing segment, add it into segment meta.
// Only used by Streamingnode, should be deprecated in the future after growing segment fully managed by streaming node.
func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
@ -273,7 +274,17 @@ func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentReque
}
// Alloc new growing segment and return the segment info.
segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(ctx, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetVchannel(), req.GetStorageVersion())
segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(
ctx,
AllocNewGrowingSegmentRequest{
CollectionID: req.GetCollectionId(),
PartitionID: req.GetPartitionId(),
SegmentID: req.GetSegmentId(),
ChannelName: req.GetVchannel(),
StorageVersion: req.GetStorageVersion(),
IsCreatedByStreaming: req.GetIsCreatedByStreaming(),
},
)
if err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}

View File

@ -510,7 +510,14 @@ func (s *ServerSuite) TestFlush_NormalCase() {
expireTs := allocations[0].ExpireTime
segID := allocations[0].SegmentID
info, err := s.testServer.segmentManager.AllocNewGrowingSegment(context.TODO(), 0, 1, 1, "channel-1", storage.StorageV1)
info, err := s.testServer.segmentManager.AllocNewGrowingSegment(context.TODO(), AllocNewGrowingSegmentRequest{
CollectionID: 0,
PartitionID: 1,
SegmentID: 1,
ChannelName: "channel1-1",
StorageVersion: storage.StorageV1,
IsCreatedByStreaming: true,
})
s.NoError(err)
s.NotNil(info)

View File

@ -185,6 +185,43 @@ func (impl *flusherComponents) recover(ctx context.Context, recoverInfos map[str
// buildDataSyncServiceWithRetry builds the data sync service with retry.
func (impl *flusherComponents) buildDataSyncServiceWithRetry(ctx context.Context, recoverInfo *datapb.GetChannelRecoveryInfoResponse) (*dataSyncServiceWrapper, error) {
// Flush all the growing segment that is not created by streaming.
segmentIDs := make([]int64, 0, len(recoverInfo.GetInfo().UnflushedSegments))
for _, segment := range recoverInfo.GetInfo().UnflushedSegments {
if !segment.IsCreatedByStreaming {
segmentIDs = append(segmentIDs, segment.ID)
}
}
if len(segmentIDs) > 0 {
msg := message.NewFlushMessageBuilderV2().
WithVChannel(recoverInfo.GetInfo().GetChannelName()).
WithHeader(&message.FlushMessageHeader{}).
WithBody(&message.FlushMessageBody{
CollectionId: recoverInfo.GetInfo().GetCollectionID(),
SegmentId: segmentIDs,
}).MustBuildMutable()
if err := retry.Do(ctx, func() error {
appendResult, err := impl.wal.Append(ctx, msg)
if err != nil {
impl.logger.Warn(
"fail to append flush message for segments that not created by streaming service into wal",
zap.String("vchannel", recoverInfo.GetInfo().GetChannelName()),
zap.Error(err))
return err
}
impl.logger.Info(
"append flush message for segments that not created by streaming service into wal",
zap.String("vchannel", recoverInfo.GetInfo().GetChannelName()),
zap.Int64s("segmentIDs", segmentIDs),
zap.Stringer("msgID", appendResult.MessageID),
zap.Uint64("timeTick", appendResult.TimeTick),
)
return nil
}, retry.AttemptAlways()); err != nil {
return nil, err
}
}
var ds *dataSyncServiceWrapper
err := retry.Do(ctx, func() error {
var err error

View File

@ -230,11 +230,12 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (*
}
resp, err := dc.AllocSegment(ctx, &datapb.AllocSegmentRequest{
CollectionId: pendingSegment.GetCollectionID(),
PartitionId: pendingSegment.GetPartitionID(),
SegmentId: pendingSegment.GetSegmentID(),
Vchannel: pendingSegment.GetVChannel(),
StorageVersion: pendingSegment.GetStorageVersion(),
CollectionId: pendingSegment.GetCollectionID(),
PartitionId: pendingSegment.GetPartitionID(),
SegmentId: pendingSegment.GetSegmentID(),
Vchannel: pendingSegment.GetVChannel(),
StorageVersion: pendingSegment.GetStorageVersion(),
IsCreatedByStreaming: true,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, errors.Wrap(err, "failed to alloc growing segment at datacoord")

View File

@ -182,6 +182,7 @@ message AllocSegmentRequest {
int64 segment_id = 3; // segment id must be allocate from rootcoord idalloc service.
string vchannel = 4;
int64 storage_version = 5;
bool is_created_by_streaming = 6;
}
message AllocSegmentResponse {
@ -366,6 +367,13 @@ message SegmentInfo {
// This field is used to indicate that some intermediate state segments should not be loaded.
// For example, segments that have been clustered but haven't undergone stats yet.
bool is_invisible = 28;
// This field is used to indicate that the segment is created by streaming service.
// This field is meaningful only when the segment state is growing.
// If the segment is created by streaming service, it will be a true.
// A segment generated by datacoord of old arch, will be false.
// After the growing segment is full managed by streamingnode, the true value can never be seen at coordinator.
bool is_created_by_streaming = 29;
}
message SegmentStartPosition {

File diff suppressed because it is too large Load Diff

View File

@ -198,6 +198,16 @@ func (b *mutableMesasgeBuilder[H, B]) BuildMutable() (MutableMessage, error) {
return msg, nil
}
// MustBuildMutable builds a mutable message.
// Panics if build failed.
func (b *mutableMesasgeBuilder[H, B]) MustBuildMutable() MutableMessage {
msg, err := b.BuildMutable()
if err != nil {
panic(err)
}
return msg
}
// BuildBroadcast builds a broad mutable message.
// Panic if not set payload and message type.
// should only used at client side.