enhance: add manual alloc segment rpc for datacoord (#35002)

issue: #33285

- segment allocation will move to streamingnode, so a manual alloc
segment rpc is required

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-07-26 10:15:46 +08:00 committed by GitHub
parent 4ee6c69217
commit 1cff55381d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 260 additions and 11 deletions

View File

@ -476,6 +476,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockManager --with-expecter --inpackage
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.30.1. DO NOT EDIT.
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
@ -81,6 +81,64 @@ func (_c *MockManager_AllocImportSegment_Call) RunAndReturn(run func(context.Con
return _c
}
// AllocNewGrowingSegment provides a mock function with given fields: ctx, collectionID, partitionID, segmentID, channelName
func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string) (*SegmentInfo, error) {
ret := _m.Called(ctx, collectionID, partitionID, segmentID, channelName)
var r0 *SegmentInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)); ok {
return rf(ctx, collectionID, partitionID, segmentID, channelName)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) *SegmentInfo); ok {
r0 = rf(ctx, collectionID, partitionID, segmentID, channelName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string) error); ok {
r1 = rf(ctx, collectionID, partitionID, segmentID, channelName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockManager_AllocNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocNewGrowingSegment'
type MockManager_AllocNewGrowingSegment_Call struct {
*mock.Call
}
// AllocNewGrowingSegment is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - segmentID int64
// - channelName string
func (_e *MockManager_Expecter) AllocNewGrowingSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, segmentID interface{}, channelName interface{}) *MockManager_AllocNewGrowingSegment_Call {
return &MockManager_AllocNewGrowingSegment_Call{Call: _e.mock.On("AllocNewGrowingSegment", ctx, collectionID, partitionID, segmentID, channelName)}
}
func (_c *MockManager_AllocNewGrowingSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string)) *MockManager_AllocNewGrowingSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string))
})
return _c
}
func (_c *MockManager_AllocNewGrowingSegment_Call) Return(_a0 *SegmentInfo, _a1 error) *MockManager_AllocNewGrowingSegment_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockManager_AllocNewGrowingSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)) *MockManager_AllocNewGrowingSegment_Call {
_c.Call.Return(run)
return _c
}
// AllocSegment provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows
func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) {
ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows)

View File

@ -74,9 +74,13 @@ func putAllocation(a *Allocation) {
type Manager interface {
// CreateSegment create new segment when segment not exist
// AllocSegment allocates rows and record the allocation.
// Deprecated: AllocSegment allocates rows and record the allocation, will be deprecated after enabling streamingnode.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error)
// AllocNewGrowingSegment allocates segment for streaming node.
AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// FlushImportSegments set importing segment state to Flushed.
@ -320,7 +324,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return nil, err
}
for _, allocation := range newSegmentAllocations {
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Growing, datapb.SegmentLevel_L1)
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
log.Error("Failed to open new segment for segment allocation")
return nil, err
@ -417,9 +421,12 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c
return segment, nil
}
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
channelName string, segmentState commonpb.SegmentState, level datapb.SegmentLevel,
) (*SegmentInfo, error) {
// AllocNewGrowingSegment allocates segment for streaming node.
func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName)
}
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) {
log := log.Ctx(ctx)
ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment")
defer sp.End()
@ -428,6 +435,10 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Error("failed to open new segment while allocID", zap.Error(err))
return nil, err
}
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, id, channelName)
}
func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
log.Error("failed to open new segment while estimateMaxNumOfRows", zap.Error(err))
@ -435,14 +446,14 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
}
segmentInfo := &datapb.SegmentInfo{
ID: id,
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: segmentState,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(maxNumOfRows),
Level: level,
Level: datapb.SegmentLevel_L1,
LastExpireTime: 0,
}
segment := NewSegmentInfo(segmentInfo)
@ -450,7 +461,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
s.segments = append(s.segments, id)
s.segments = append(s.segments, segmentID)
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),

View File

@ -831,6 +831,10 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu
return nil, nil
}
func (s *spySegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
return nil, nil
}
func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) {
return nil, nil
}

View File

@ -245,6 +245,28 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}, nil
}
// AllocSegment alloc a new growing segment, add it into segment meta.
func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}
// !!! SegmentId must be allocated from rootCoord id allocation.
if req.GetCollectionId() == 0 || req.GetPartitionId() == 0 || req.GetVchannel() == "" || req.GetSegmentId() == 0 {
return &datapb.AllocSegmentResponse{Status: merr.Status(merr.ErrParameterInvalid)}, nil
}
// Alloc new growing segment and return the segment info.
segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(ctx, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetVchannel())
if err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}
clonedSegmentInfo := segmentInfo.Clone()
return &datapb.AllocSegmentResponse{
SegmentInfo: clonedSegmentInfo.SegmentInfo,
Status: merr.Success(),
}, nil
}
// GetSegmentStates returns segments state
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {

View File

@ -168,6 +168,12 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
})
}
func (c *Client) AllocSegment(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption) (*datapb.AllocSegmentResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.AllocSegmentResponse, error) {
return client.AllocSegment(ctx, in)
})
}
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation

View File

@ -303,6 +303,11 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
return s.dataCoord.AssignSegmentID(ctx, req)
}
// AllocSegment alloc a new growing segment, add it into segment meta.
func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
return s.dataCoord.AllocSegment(ctx, req)
}
// GetSegmentStates gets states of segments
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return s.dataCoord.GetSegmentStates(ctx, req)

View File

@ -38,6 +38,61 @@ func (_m *MockDataCoord) EXPECT() *MockDataCoord_Expecter {
return &MockDataCoord_Expecter{mock: &_m.Mock}
}
// AllocSegment provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) AllocSegment(_a0 context.Context, _a1 *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *datapb.AllocSegmentResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest) *datapb.AllocSegmentResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.AllocSegmentResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *datapb.AllocSegmentRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoord_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment'
type MockDataCoord_AllocSegment_Call struct {
*mock.Call
}
// AllocSegment is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.AllocSegmentRequest
func (_e *MockDataCoord_Expecter) AllocSegment(_a0 interface{}, _a1 interface{}) *MockDataCoord_AllocSegment_Call {
return &MockDataCoord_AllocSegment_Call{Call: _e.mock.On("AllocSegment", _a0, _a1)}
}
func (_c *MockDataCoord_AllocSegment_Call) Run(run func(_a0 context.Context, _a1 *datapb.AllocSegmentRequest)) *MockDataCoord_AllocSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.AllocSegmentRequest))
})
return _c
}
func (_c *MockDataCoord_AllocSegment_Call) Return(_a0 *datapb.AllocSegmentResponse, _a1 error) *MockDataCoord_AllocSegment_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoord_AllocSegment_Call) RunAndReturn(run func(context.Context, *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error)) *MockDataCoord_AllocSegment_Call {
_c.Call.Return(run)
return _c
}
// AlterIndex provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) AlterIndex(_a0 context.Context, _a1 *indexpb.AlterIndexRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -33,6 +33,76 @@ func (_m *MockDataCoordClient) EXPECT() *MockDataCoordClient_Expecter {
return &MockDataCoordClient_Expecter{mock: &_m.Mock}
}
// AllocSegment provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) AllocSegment(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption) (*datapb.AllocSegmentResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *datapb.AllocSegmentResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) (*datapb.AllocSegmentResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) *datapb.AllocSegmentResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.AllocSegmentResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoordClient_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment'
type MockDataCoordClient_AllocSegment_Call struct {
*mock.Call
}
// AllocSegment is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.AllocSegmentRequest
// - opts ...grpc.CallOption
func (_e *MockDataCoordClient_Expecter) AllocSegment(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_AllocSegment_Call {
return &MockDataCoordClient_AllocSegment_Call{Call: _e.mock.On("AllocSegment",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataCoordClient_AllocSegment_Call) Run(run func(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption)) *MockDataCoordClient_AllocSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*datapb.AllocSegmentRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataCoordClient_AllocSegment_Call) Return(_a0 *datapb.AllocSegmentResponse, _a1 error) *MockDataCoordClient_AllocSegment_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoordClient_AllocSegment_Call) RunAndReturn(run func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) (*datapb.AllocSegmentResponse, error)) *MockDataCoordClient_AllocSegment_Call {
_c.Call.Return(run)
return _c
}
// AlterIndex provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) AlterIndex(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
_va := make([]interface{}, len(opts))

View File

@ -35,7 +35,12 @@ service DataCoord {
rpc Flush(FlushRequest) returns (FlushResponse) {}
rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {}
// AllocSegment alloc a new growing segment, add it into segment meta.
rpc AllocSegment(AllocSegmentRequest) returns (AllocSegmentResponse) {}
rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {
option deprecated = true;
}
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
rpc GetSegmentStates(GetSegmentStatesRequest) returns (GetSegmentStatesResponse) {}
@ -168,6 +173,18 @@ message SegmentIDRequest {
SegmentLevel level = 7;
}
message AllocSegmentRequest {
int64 collection_id = 1;
int64 partition_id = 2;
int64 segment_id = 3; // segment id must be allocate from rootcoord idalloc service.
string vchannel = 4;
}
message AllocSegmentResponse {
SegmentInfo segment_info = 1;
common.Status status = 2;
}
message AssignSegmentIDRequest {
int64 nodeID = 1;
string peer_role = 2;