enhance: Disable import for replicating cluster (#44850)

1. Import in replicating cluster is not supported yet, so disable it for
now.
2. Remove GetReplicateConfiguration wal API

issue: https://github.com/milvus-io/milvus/issues/44123

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-10-20 09:56:01 +08:00 committed by GitHub
parent 3174f517f0
commit 168dc49bfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 18 additions and 87 deletions

View File

@ -47,11 +47,13 @@ import (
"github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/balance"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
@ -399,6 +401,17 @@ func (s *Server) initMessageCallback() {
if err != nil {
return err
}
balancer, err := balance.GetWithContext(ctx)
if err != nil {
return err
}
channelAssignment, err := balancer.GetLatestChannelAssignment()
if err != nil {
return err
}
if channelAssignment.ReplicateConfiguration != nil {
return status.NewReplicateViolation("import in replicating cluster is not supported yet")
}
return nil
})
}

View File

@ -2752,6 +2752,11 @@ func TestServer_InitMessageCallback(t *testing.T) {
mockChunkManager := mocks.NewChunkManager(t)
mockManager := NewMockManager(t)
mb := mock_balancer.NewMockBalancer(t)
mb.EXPECT().GetLatestChannelAssignment().Return(&balancer.WatchChannelAssignmentsCallbackParam{}, nil).Maybe()
balance.ResetBalancer()
balance.Register(mb)
server := &Server{
ctx: ctx,
meta: &meta{

View File

@ -50,19 +50,6 @@ func (s replicateService) UpdateReplicateConfiguration(ctx context.Context, conf
return s.streamingCoordClient.Assignment().UpdateReplicateConfiguration(ctx, config)
}
func (s replicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) {
if !s.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
defer s.lifetime.Done()
config, err := s.streamingCoordClient.Assignment().GetReplicateConfiguration(ctx)
if err != nil {
return nil, err
}
return config, nil
}
func (s replicateService) GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error) {
if !s.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed

View File

@ -12,7 +12,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
)
var singleton WALAccesser = nil
@ -100,9 +99,6 @@ type ReplicateService interface {
// UpdateReplicateConfiguration updates the replicate configuration to the milvus cluster.
UpdateReplicateConfiguration(ctx context.Context, config *commonpb.ReplicateConfiguration) error
// GetReplicateConfiguration returns the replicate configuration of the milvus cluster.
GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error)
// GetReplicateCheckpoint returns the WAL checkpoint that will be used to create scanner
// from the correct position, ensuring no duplicate or missing messages.
GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error)

View File

@ -79,11 +79,6 @@ func TestReplicate(t *testing.T) {
if err != nil {
panic(err)
}
cfg, err := streaming.WAL().Replicate().GetReplicateConfiguration(ctx)
if err != nil {
panic(err)
}
t.Logf("cfg: %+v\n", cfg)
}
func TestReplicateCreateCollection(t *testing.T) {

View File

@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
)
var expectErr = make(chan error, 10)
@ -65,10 +64,6 @@ func (n *noopReplicateService) UpdateReplicateConfiguration(ctx context.Context,
return nil
}
func (n *noopReplicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) {
return nil, nil
}
func (n *noopReplicateService) GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error) {
return nil, nil
}

View File

@ -11,8 +11,6 @@ import (
mock "github.com/stretchr/testify/mock"
replicateutil "github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
@ -149,64 +147,6 @@ func (_c *MockReplicateService_GetReplicateCheckpoint_Call) RunAndReturn(run fun
return _c
}
// GetReplicateConfiguration provides a mock function with given fields: ctx
func (_m *MockReplicateService) GetReplicateConfiguration(ctx context.Context) (*replicateutil.ConfigHelper, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetReplicateConfiguration")
}
var r0 *replicateutil.ConfigHelper
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*replicateutil.ConfigHelper, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *replicateutil.ConfigHelper); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*replicateutil.ConfigHelper)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockReplicateService_GetReplicateConfiguration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicateConfiguration'
type MockReplicateService_GetReplicateConfiguration_Call struct {
*mock.Call
}
// GetReplicateConfiguration is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockReplicateService_Expecter) GetReplicateConfiguration(ctx interface{}) *MockReplicateService_GetReplicateConfiguration_Call {
return &MockReplicateService_GetReplicateConfiguration_Call{Call: _e.mock.On("GetReplicateConfiguration", ctx)}
}
func (_c *MockReplicateService_GetReplicateConfiguration_Call) Run(run func(ctx context.Context)) *MockReplicateService_GetReplicateConfiguration_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockReplicateService_GetReplicateConfiguration_Call) Return(_a0 *replicateutil.ConfigHelper, _a1 error) *MockReplicateService_GetReplicateConfiguration_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockReplicateService_GetReplicateConfiguration_Call) RunAndReturn(run func(context.Context) (*replicateutil.ConfigHelper, error)) *MockReplicateService_GetReplicateConfiguration_Call {
_c.Call.Return(run)
return _c
}
// UpdateReplicateConfiguration provides a mock function with given fields: ctx, config
func (_m *MockReplicateService) UpdateReplicateConfiguration(ctx context.Context, config *commonpb.ReplicateConfiguration) error {
ret := _m.Called(ctx, config)