From 168dc49bfc82c3e1b754faedb3274f799315dd51 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 20 Oct 2025 09:56:01 +0800 Subject: [PATCH] enhance: Disable import for replicating cluster (#44850) 1. Import in replicating cluster is not supported yet, so disable it for now. 2. Remove GetReplicateConfiguration wal API issue: https://github.com/milvus-io/milvus/issues/44123 --------- Signed-off-by: bigsheeper --- internal/datacoord/server.go | 13 ++++ internal/datacoord/server_test.go | 5 ++ .../streaming/replicate_service.go | 13 ---- internal/distributed/streaming/streaming.go | 4 -- .../distributed/streaming/streaming_test.go | 5 -- .../distributed/streaming/test_streaming.go | 5 -- .../mock_streaming/mock_ReplicateService.go | 60 ------------------- 7 files changed, 18 insertions(+), 87 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index e395b060b2..4d75419e77 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -47,11 +47,13 @@ import ( "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/balance" "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" @@ -399,6 +401,17 @@ func (s *Server) initMessageCallback() { if err != nil { return err } + balancer, err := balance.GetWithContext(ctx) + if err != nil { + return err + } + channelAssignment, err := balancer.GetLatestChannelAssignment() + if err != nil { + return err + } + if channelAssignment.ReplicateConfiguration != nil { + return status.NewReplicateViolation("import in replicating cluster is not supported yet") + } return nil }) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index ba873aabfc..46ca524e63 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2752,6 +2752,11 @@ func TestServer_InitMessageCallback(t *testing.T) { mockChunkManager := mocks.NewChunkManager(t) mockManager := NewMockManager(t) + mb := mock_balancer.NewMockBalancer(t) + mb.EXPECT().GetLatestChannelAssignment().Return(&balancer.WatchChannelAssignmentsCallbackParam{}, nil).Maybe() + balance.ResetBalancer() + balance.Register(mb) + server := &Server{ ctx: ctx, meta: &meta{ diff --git a/internal/distributed/streaming/replicate_service.go b/internal/distributed/streaming/replicate_service.go index ac8ebf5289..ec6cf61201 100644 --- a/internal/distributed/streaming/replicate_service.go +++ b/internal/distributed/streaming/replicate_service.go @@ -50,19 +50,6 @@ func (s replicateService) UpdateReplicateConfiguration(ctx context.Context, conf return s.streamingCoordClient.Assignment().UpdateReplicateConfiguration(ctx, config) } -func (s replicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) { - if !s.lifetime.Add(typeutil.LifetimeStateWorking) { - return nil, ErrWALAccesserClosed - } - defer s.lifetime.Done() - - config, err := s.streamingCoordClient.Assignment().GetReplicateConfiguration(ctx) - if err != nil { - return nil, err - } - return config, nil -} - func (s replicateService) GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error) { if !s.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, ErrWALAccesserClosed diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index da732dc14e..9a12ef582e 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -12,7 +12,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/options" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" - "github.com/milvus-io/milvus/pkg/v2/util/replicateutil" ) var singleton WALAccesser = nil @@ -100,9 +99,6 @@ type ReplicateService interface { // UpdateReplicateConfiguration updates the replicate configuration to the milvus cluster. UpdateReplicateConfiguration(ctx context.Context, config *commonpb.ReplicateConfiguration) error - // GetReplicateConfiguration returns the replicate configuration of the milvus cluster. - GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) - // GetReplicateCheckpoint returns the WAL checkpoint that will be used to create scanner // from the correct position, ensuring no duplicate or missing messages. GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error) diff --git a/internal/distributed/streaming/streaming_test.go b/internal/distributed/streaming/streaming_test.go index 7247331f3b..c94af52c9b 100644 --- a/internal/distributed/streaming/streaming_test.go +++ b/internal/distributed/streaming/streaming_test.go @@ -79,11 +79,6 @@ func TestReplicate(t *testing.T) { if err != nil { panic(err) } - cfg, err := streaming.WAL().Replicate().GetReplicateConfiguration(ctx) - if err != nil { - panic(err) - } - t.Logf("cfg: %+v\n", cfg) } func TestReplicateCreateCollection(t *testing.T) { diff --git a/internal/distributed/streaming/test_streaming.go b/internal/distributed/streaming/test_streaming.go index d6c69686de..b4295bfc09 100644 --- a/internal/distributed/streaming/test_streaming.go +++ b/internal/distributed/streaming/test_streaming.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" - "github.com/milvus-io/milvus/pkg/v2/util/replicateutil" ) var expectErr = make(chan error, 10) @@ -65,10 +64,6 @@ func (n *noopReplicateService) UpdateReplicateConfiguration(ctx context.Context, return nil } -func (n *noopReplicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) { - return nil, nil -} - func (n *noopReplicateService) GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error) { return nil, nil } diff --git a/internal/mocks/distributed/mock_streaming/mock_ReplicateService.go b/internal/mocks/distributed/mock_streaming/mock_ReplicateService.go index 5b1bda8597..11cb66c063 100644 --- a/internal/mocks/distributed/mock_streaming/mock_ReplicateService.go +++ b/internal/mocks/distributed/mock_streaming/mock_ReplicateService.go @@ -11,8 +11,6 @@ import ( mock "github.com/stretchr/testify/mock" - replicateutil "github.com/milvus-io/milvus/pkg/v2/util/replicateutil" - types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal" @@ -149,64 +147,6 @@ func (_c *MockReplicateService_GetReplicateCheckpoint_Call) RunAndReturn(run fun return _c } -// GetReplicateConfiguration provides a mock function with given fields: ctx -func (_m *MockReplicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for GetReplicateConfiguration") - } - - var r0 *replicateutil.ConfigHelper - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*replicateutil.ConfigHelper, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) *replicateutil.ConfigHelper); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*replicateutil.ConfigHelper) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockReplicateService_GetReplicateConfiguration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicateConfiguration' -type MockReplicateService_GetReplicateConfiguration_Call struct { - *mock.Call -} - -// GetReplicateConfiguration is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockReplicateService_Expecter) GetReplicateConfiguration(ctx interface{}) *MockReplicateService_GetReplicateConfiguration_Call { - return &MockReplicateService_GetReplicateConfiguration_Call{Call: _e.mock.On("GetReplicateConfiguration", ctx)} -} - -func (_c *MockReplicateService_GetReplicateConfiguration_Call) Run(run func(ctx context.Context)) *MockReplicateService_GetReplicateConfiguration_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MockReplicateService_GetReplicateConfiguration_Call) Return(_a0 *replicateutil.ConfigHelper, _a1 error) *MockReplicateService_GetReplicateConfiguration_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockReplicateService_GetReplicateConfiguration_Call) RunAndReturn(run func(context.Context) (*replicateutil.ConfigHelper, error)) *MockReplicateService_GetReplicateConfiguration_Call { - _c.Call.Return(run) - return _c -} - // UpdateReplicateConfiguration provides a mock function with given fields: ctx, config func (_m *MockReplicateService) UpdateReplicateConfiguration(ctx context.Context, config *commonpb.ReplicateConfiguration) error { ret := _m.Called(ctx, config)