enhance: ChunkManager is no longer created during datanode initialization (#42791)

issue: #41611

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-06-17 17:06:38 +08:00 committed by GitHub
parent 001619aef9
commit a9dcd4a380
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1333 additions and 1135 deletions

View File

@ -18,19 +18,21 @@ package compaction
import (
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type Params struct {
EnableStorageV2 bool `json:"enable_storage_v2,omitempty"`
BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"`
UseMergeSort bool `json:"use_merge_sort,omitempty"`
MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"`
PreferSegmentSizeRatio float64 `json:"prefer_segment_size_ratio,omitempty"`
BloomFilterApplyBatchSize int `json:"bloom_filter_apply_batch_size,omitempty"`
EnableStorageV2 bool `json:"enable_storage_v2,omitempty"`
BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"`
UseMergeSort bool `json:"use_merge_sort,omitempty"`
MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"`
PreferSegmentSizeRatio float64 `json:"prefer_segment_size_ratio,omitempty"`
BloomFilterApplyBatchSize int `json:"bloom_filter_apply_batch_size,omitempty"`
StorageConfig *indexpb.StorageConfig `json:"storage_config,omitempty"`
}
func genParams() Params {
func GenParams() Params {
return Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
@ -38,11 +40,12 @@ func genParams() Params {
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
StorageConfig: CreateStorageConfig(),
}
}
func GenerateJSONParams() (string, error) {
compactionParams := genParams()
compactionParams := GenParams()
params, err := json.Marshal(compactionParams)
if err != nil {
return "", err
@ -55,7 +58,38 @@ func ParseParamsFromJSON(jsonStr string) (Params, error) {
err := json.Unmarshal([]byte(jsonStr), &compactionParams)
if err != nil && jsonStr == "" {
// Ensure the compatibility with the legacy requests sent by the old datacoord.
return genParams(), nil
return GenParams(), nil
}
return compactionParams, err
}
func CreateStorageConfig() *indexpb.StorageConfig {
var storageConfig *indexpb.StorageConfig
if paramtable.Get().CommonCfg.StorageType.GetValue() == "local" {
storageConfig = &indexpb.StorageConfig{
RootPath: paramtable.Get().LocalStorageCfg.Path.GetValue(),
StorageType: paramtable.Get().CommonCfg.StorageType.GetValue(),
}
} else {
storageConfig = &indexpb.StorageConfig{
Address: paramtable.Get().MinioCfg.Address.GetValue(),
AccessKeyID: paramtable.Get().MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: paramtable.Get().MinioCfg.SecretAccessKey.GetValue(),
UseSSL: paramtable.Get().MinioCfg.UseSSL.GetAsBool(),
SslCACert: paramtable.Get().MinioCfg.SslCACert.GetValue(),
BucketName: paramtable.Get().MinioCfg.BucketName.GetValue(),
RootPath: paramtable.Get().MinioCfg.RootPath.GetValue(),
UseIAM: paramtable.Get().MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: paramtable.Get().MinioCfg.IAMEndpoint.GetValue(),
StorageType: paramtable.Get().CommonCfg.StorageType.GetValue(),
Region: paramtable.Get().MinioCfg.Region.GetValue(),
UseVirtualHost: paramtable.Get().MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: paramtable.Get().MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: paramtable.Get().MinioCfg.RequestTimeoutMs.GetAsInt64(),
GcpCredentialJSON: paramtable.Get().MinioCfg.GcpCredentialJSON.GetValue(),
}
}
return storageConfig
}

View File

@ -40,6 +40,7 @@ func TestGetJSONParams(t *testing.T) {
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
StorageConfig: CreateStorageConfig(),
}, result)
}
@ -85,5 +86,6 @@ func TestGetParamsFromJSON_EmptyJSON(t *testing.T) {
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
StorageConfig: CreateStorageConfig(),
}, result)
}

View File

@ -278,14 +278,15 @@ func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportR
return fileStats.GetImportFile()
})
return &datapb.PreImportRequest{
JobID: task.GetJobID(),
TaskID: task.GetTaskID(),
CollectionID: task.GetCollectionID(),
PartitionIDs: job.GetPartitionIDs(),
Vchannels: job.GetVchannels(),
Schema: job.GetSchema(),
ImportFiles: importFiles,
Options: job.GetOptions(),
JobID: task.GetJobID(),
TaskID: task.GetTaskID(),
CollectionID: task.GetCollectionID(),
PartitionIDs: job.GetPartitionIDs(),
Vchannels: job.GetVchannels(),
Schema: job.GetSchema(),
ImportFiles: importFiles,
Options: job.GetOptions(),
StorageConfig: createStorageConfig(),
}
}
@ -347,6 +348,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
Ts: ts,
IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd},
RequestSegments: requestSegments,
StorageConfig: createStorageConfig(),
}, nil
}

View File

@ -251,13 +251,14 @@ func (node *DataNode) Init() error {
node.factory.Init(Params)
log.Info("DataNode server init succeeded")
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
if err != nil {
initError = err
return
if !streamingutil.IsStreamingServiceEnabled() {
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
if err != nil {
initError = err
return
}
node.chunkManager = chunkManager
}
node.chunkManager = chunkManager
syncMgr := syncmgr.NewSyncManager(node.chunkManager)
node.syncMgr = syncMgr

View File

@ -247,7 +247,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
@ -308,7 +308,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
@ -385,7 +385,7 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}
func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
@ -444,7 +444,7 @@ func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() {
}
}
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})

View File

@ -267,7 +267,7 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
if err != nil {
return nil, nil, err
}
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncDataWithChunkManager(t.ctx, syncTask, t.cm)
if err != nil {
log.Ctx(context.TODO()).Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err

View File

@ -234,7 +234,7 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
if err != nil {
return nil, nil, err
}
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncDataWithChunkManager(t.ctx, syncTask, t.cm)
if err != nil {
log.Ctx(context.TODO()).Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err

View File

@ -131,8 +131,8 @@ func (s *L0ImportSuite) TestL0PreImport() {
}
func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
@ -95,6 +96,7 @@ func Test_IndexServiceSuite(t *testing.T) {
}
func (s *IndexServiceSuite) SetupTest() {
streamingutil.SetStreamingServiceEnabled()
s.collID = 1
s.partID = 2
s.segID = 3
@ -134,10 +136,7 @@ func (s *IndexServiceSuite) SetupTest() {
ctx = context.TODO()
)
cm := mocks.NewChunkManager(s.T())
factory.EXPECT().Init(mock.Anything).Return()
factory.EXPECT().NewPersistentStorageChunkManager(mock.Anything).Return(cm, nil)
s.node = NewDataNode(ctx, factory)

View File

@ -22,7 +22,6 @@ package datanode
import (
"context"
"fmt"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -214,15 +213,27 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx)*/
taskCtx := tracer.Propagate(ctx, node.ctx)
compactionParams, err := compaction.ParseParamsFromJSON(req.GetJsonParams())
if err != nil {
return merr.Status(err), err
}
cm, err := node.storageFactory.NewChunkManager(node.ctx, compactionParams.StorageConfig)
if err != nil {
log.Error("create chunk manager failed",
zap.String("bucket", compactionParams.StorageConfig.GetBucketName()),
zap.String("ROOTPATH", compactionParams.StorageConfig.GetRootPath()),
zap.Error(err),
)
return merr.Status(err), err
}
var task compactor.Compactor
binlogIO := io.NewBinlogIO(node.chunkManager)
binlogIO := io.NewBinlogIO(cm)
switch req.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
task = compactor.NewLevelZeroCompactionTask(
taskCtx,
binlogIO,
node.chunkManager,
cm,
req,
)
case datapb.CompactionType_MixCompaction:
@ -274,6 +285,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
}
// SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN
// deprecated after v2.6.0
func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.Int64("planID", req.GetPlanID()),
@ -447,11 +459,20 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques
return merr.Status(err), nil
}
cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig())
if err != nil {
log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()),
zap.Error(err),
)
return merr.Status(err), nil
}
var task importv2.Task
if importutilv2.IsL0Import(req.GetOptions()) {
task = importv2.NewL0PreImportTask(req, node.importTaskMgr, node.chunkManager)
task = importv2.NewL0PreImportTask(req, node.importTaskMgr, cm)
} else {
task = importv2.NewPreImportTask(req, node.importTaskMgr, node.chunkManager)
task = importv2.NewPreImportTask(req, node.importTaskMgr, cm)
}
node.importTaskMgr.Add(task)
@ -476,11 +497,20 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) (
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil
}
cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig())
if err != nil {
log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()),
zap.Error(err),
)
return merr.Status(err), nil
}
var task importv2.Task
if importutilv2.IsL0Import(req.GetOptions()) {
task = importv2.NewL0ImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager)
task = importv2.NewL0ImportTask(req, node.importTaskMgr, node.syncMgr, cm)
} else {
task = importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager)
task = importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, cm)
}
node.importTaskMgr.Add(task)

View File

@ -19,6 +19,7 @@ package datanode
import (
"context"
"math/rand"
"os"
"strings"
"testing"
"time"
@ -34,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
allocator2 "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/compactor"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
@ -565,6 +567,7 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() {
}
func (s *DataNodeServicesSuite) TestRPCWatch() {
os.Setenv("MILVUS_STREAMING_SERVICE_ENABLED", "0")
s.Run("node not healthy", func() {
s.SetupTest()
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
@ -1221,12 +1224,17 @@ func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
func (s *DataNodeServicesSuite) TestCreateTask() {
s.Run("create pre-import task", func() {
preImportReq := &datapb.PreImportRequest{
StorageConfig: compaction.CreateStorageConfig(),
}
payload, err := proto.Marshal(preImportReq)
s.NoError(err)
req := &workerpb.CreateTaskRequest{
Properties: map[string]string{
taskcommon.ClusterIDKey: "cluster-0",
taskcommon.TypeKey: taskcommon.PreImport,
},
Payload: []byte{},
Payload: payload,
}
status, err := s.node.CreateTask(s.ctx, req)
s.NoError(merr.CheckRPCCall(status, err))
@ -1234,7 +1242,8 @@ func (s *DataNodeServicesSuite) TestCreateTask() {
s.Run("create import task", func() {
importReq := &datapb.ImportRequest{
Schema: &schemapb.CollectionSchema{},
Schema: &schemapb.CollectionSchema{},
StorageConfig: compaction.CreateStorageConfig(),
}
payload, err := proto.Marshal(importReq)
s.NoError(err)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package broker

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package syncmgr

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package syncmgr

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package syncmgr
@ -8,6 +8,8 @@ import (
conc "github.com/milvus-io/milvus/pkg/v2/util/conc"
mock "github.com/stretchr/testify/mock"
storage "github.com/milvus-io/milvus/internal/storage"
)
// MockSyncManager is an autogenerated mock type for the SyncManager type
@ -23,7 +25,7 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
return &MockSyncManager_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
// Close provides a mock function with no fields
func (_m *MockSyncManager) Close() error {
ret := _m.Called()
@ -142,7 +144,82 @@ func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context,
return _c
}
// TaskStatsJSON provides a mock function with given fields:
// SyncDataWithChunkManager provides a mock function with given fields: ctx, task, chunkManager, callbacks
func (_m *MockSyncManager) SyncDataWithChunkManager(ctx context.Context, task Task, chunkManager storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
_va := make([]interface{}, len(callbacks))
for _i := range callbacks {
_va[_i] = callbacks[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, task, chunkManager)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for SyncDataWithChunkManager")
}
var r0 *conc.Future[struct{}]
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, Task, storage.ChunkManager, ...func(error) error) (*conc.Future[struct{}], error)); ok {
return rf(ctx, task, chunkManager, callbacks...)
}
if rf, ok := ret.Get(0).(func(context.Context, Task, storage.ChunkManager, ...func(error) error) *conc.Future[struct{}]); ok {
r0 = rf(ctx, task, chunkManager, callbacks...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*conc.Future[struct{}])
}
}
if rf, ok := ret.Get(1).(func(context.Context, Task, storage.ChunkManager, ...func(error) error) error); ok {
r1 = rf(ctx, task, chunkManager, callbacks...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockSyncManager_SyncDataWithChunkManager_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncDataWithChunkManager'
type MockSyncManager_SyncDataWithChunkManager_Call struct {
*mock.Call
}
// SyncDataWithChunkManager is a helper method to define mock.On call
// - ctx context.Context
// - task Task
// - chunkManager storage.ChunkManager
// - callbacks ...func(error) error
func (_e *MockSyncManager_Expecter) SyncDataWithChunkManager(ctx interface{}, task interface{}, chunkManager interface{}, callbacks ...interface{}) *MockSyncManager_SyncDataWithChunkManager_Call {
return &MockSyncManager_SyncDataWithChunkManager_Call{Call: _e.mock.On("SyncDataWithChunkManager",
append([]interface{}{ctx, task, chunkManager}, callbacks...)...)}
}
func (_c *MockSyncManager_SyncDataWithChunkManager_Call) Run(run func(ctx context.Context, task Task, chunkManager storage.ChunkManager, callbacks ...func(error) error)) *MockSyncManager_SyncDataWithChunkManager_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]func(error) error, len(args)-3)
for i, a := range args[3:] {
if a != nil {
variadicArgs[i] = a.(func(error) error)
}
}
run(args[0].(context.Context), args[1].(Task), args[2].(storage.ChunkManager), variadicArgs...)
})
return _c
}
func (_c *MockSyncManager_SyncDataWithChunkManager_Call) Return(_a0 *conc.Future[struct{}], _a1 error) *MockSyncManager_SyncDataWithChunkManager_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSyncManager_SyncDataWithChunkManager_Call) RunAndReturn(run func(context.Context, Task, storage.ChunkManager, ...func(error) error) (*conc.Future[struct{}], error)) *MockSyncManager_SyncDataWithChunkManager_Call {
_c.Call.Return(run)
return _c
}
// TaskStatsJSON provides a mock function with no fields
func (_m *MockSyncManager) TaskStatsJSON() string {
ret := _m.Called()

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package syncmgr
@ -22,7 +22,7 @@ func (_m *MockTask) EXPECT() *MockTask_Expecter {
return &MockTask_Expecter{mock: &_m.Mock}
}
// ChannelName provides a mock function with given fields:
// ChannelName provides a mock function with no fields
func (_m *MockTask) ChannelName() string {
ret := _m.Called()
@ -67,7 +67,7 @@ func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_C
return _c
}
// Checkpoint provides a mock function with given fields:
// Checkpoint provides a mock function with no fields
func (_m *MockTask) Checkpoint() *msgpb.MsgPosition {
ret := _m.Called()
@ -143,11 +143,11 @@ func (_c *MockTask_HandleError_Call) Return() *MockTask_HandleError_Call {
}
func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_HandleError_Call {
_c.Call.Return(run)
_c.Run(run)
return _c
}
// IsFlush provides a mock function with given fields:
// IsFlush provides a mock function with no fields
func (_m *MockTask) IsFlush() bool {
ret := _m.Called()
@ -238,7 +238,7 @@ func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *Mock
return _c
}
// SegmentID provides a mock function with given fields:
// SegmentID provides a mock function with no fields
func (_m *MockTask) SegmentID() int64 {
ret := _m.Called()
@ -283,7 +283,7 @@ func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_Segm
return _c
}
// StartPosition provides a mock function with given fields:
// StartPosition provides a mock function with no fields
func (_m *MockTask) StartPosition() *msgpb.MsgPosition {
ret := _m.Called()

View File

@ -49,6 +49,7 @@ type SyncMeta struct {
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)
SyncDataWithChunkManager(ctx context.Context, task Task, chunkManager storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error)
// Close waits for the task to finish and then shuts down the sync manager.
Close() error
@ -116,6 +117,19 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...fu
return mgr.safeSubmitTask(ctx, task, callbacks...), nil
}
func (mgr *syncManager) SyncDataWithChunkManager(ctx context.Context, task Task, chunkManager storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, errors.New("sync manager is closed")
}
switch t := task.(type) {
case *SyncTask:
t.WithChunkManager(chunkManager)
}
return mgr.safeSubmitTask(ctx, task, callbacks...), nil
}
// safeSubmitTask submits task to SyncManager
func (mgr *syncManager) safeSubmitTask(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())

View File

@ -827,6 +827,7 @@ message PreImportRequest {
schema.CollectionSchema schema = 7;
repeated internal.ImportFile import_files = 8;
repeated common.KeyValuePair options = 9;
index.StorageConfig storage_config = 10;
}
message IDRange {
@ -853,6 +854,7 @@ message ImportRequest {
uint64 ts = 10;
IDRange ID_range = 11;
repeated ImportRequestSegment request_segments = 12;
index.StorageConfig storage_config = 13;
}
message QueryPreImportRequest {

File diff suppressed because it is too large Load Diff