diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 16489e9a2c..0a33b83127 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -841,7 +841,6 @@ msgChannel: # Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data. # It is recommended to change this parameter before starting Milvus for the first time. rootCoordDml: rootcoord-dml - replicateMsg: replicate-msg # Sub-name prefix of the message channel where the query node publishes time tick messages. # The complete channel name prefix is ${msgChannel.chanNamePrefix.cluster}-${msgChannel.chanNamePrefix.queryTimeTick} # Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data. @@ -1024,16 +1023,11 @@ common: enabled: false # enable split by average size policy in storage v2 threshold: 1024 # split by average size policy threshold(in bytes) in storage v2 useLoonFFI: false - # Whether to disable the internal time messaging mechanism for the system. - # If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches. - # This helps Milvus-CDC synchronize incremental data - ttMsgEnabled: true traceLogMode: 0 # trace request info bloomFilterSize: 100000 # bloom filter initial size bloomFilterType: BlockedBloomFilter # bloom filter type, support BasicBloomFilter and BlockedBloomFilter maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter bloomFilterApplyBatchSize: 1000 # batch size when to apply pk to bloom filter - collectionReplicateEnable: false # Whether to enable collection replication. usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction diff --git a/internal/distributed/streaming/msgstream_adaptor.go b/internal/distributed/streaming/msgstream_adaptor.go index c015cf2b2e..d2d28200f0 100644 --- a/internal/distributed/streaming/msgstream_adaptor.go +++ b/internal/distributed/streaming/msgstream_adaptor.go @@ -129,7 +129,3 @@ func (m *delegatorMsgstreamAdaptor) GetLatestMsgID(channel string) (msgstream.Me func (m *delegatorMsgstreamAdaptor) CheckTopicValid(channel string) error { panic("should never be called") } - -func (m *delegatorMsgstreamAdaptor) ForceEnableProduce(can bool) { - panic("should never be called") -} diff --git a/internal/distributed/streaming/msgstream_adaptor_test.go b/internal/distributed/streaming/msgstream_adaptor_test.go index c07ecaf195..b4ae2db268 100644 --- a/internal/distributed/streaming/msgstream_adaptor_test.go +++ b/internal/distributed/streaming/msgstream_adaptor_test.go @@ -161,14 +161,4 @@ func TestDelegatorMsgstreamAdaptor(t *testing.T) { }() _ = adaptor.CheckTopicValid("channel1") }) - - // Test ForceEnableProduce - t.Run("ForceEnableProduce", func(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Errorf("ForceEnableProduce should panic but did not") - } - }() - adaptor.ForceEnableProduce(true) - }) } diff --git a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go index cea202ce75..fc8717cf05 100644 --- a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/util/flowgraph" - pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/mq/common" @@ -76,19 +75,11 @@ func createNewInputFromDispatcher(initCtx context.Context, start = time.Now() ) - replicateID, _ := pkgcommon.GetReplicateID(schema.GetProperties()) - if replicateID == "" { - log.Info("datanode consume without replicateID, try to get replicateID from dbProperties", zap.Any("dbProperties", dbProperties)) - replicateID, _ = pkgcommon.GetReplicateID(dbProperties) - } - replicateConfig := msgstream.GetReplicateConfig(replicateID, schema.GetDbName(), schema.GetName()) - if seekPos != nil && len(seekPos.MsgID) != 0 { input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{ - VChannel: vchannel, - Pos: seekPos, - SubPos: common.SubscriptionPositionUnknown, - ReplicateConfig: replicateConfig, + VChannel: vchannel, + Pos: seekPos, + SubPos: common.SubscriptionPositionUnknown, }) if err != nil { log.Warn("datanode consume failed after retried", zap.Error(err)) @@ -105,10 +96,9 @@ func createNewInputFromDispatcher(initCtx context.Context, } input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{ - VChannel: vchannel, - Pos: nil, - SubPos: common.SubscriptionPositionEarliest, - ReplicateConfig: replicateConfig, + VChannel: vchannel, + Pos: nil, + SubPos: common.SubscriptionPositionEarliest, }) if err != nil { log.Warn("datanode consume failed after retried", zap.Error(err)) diff --git a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go index ac447003a3..3912cab4a4 100644 --- a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go @@ -62,9 +62,6 @@ func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([ type mockTtMsgStream struct{} -func (mtm *mockTtMsgStream) SetReplicate(config *msgstream.ReplicateConfig) { -} - func (mtm *mockTtMsgStream) Close() {} func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack { @@ -107,9 +104,6 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error { return nil } -func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) { -} - func TestNewDmInputNode(t *testing.T) { assert.Panics(t, func() { newDmInputNode(&nodeConfig{ diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 3fc50d4e41..8ff4e1d90d 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -99,7 +99,6 @@ type collectionInfo struct { createdUtcTimestamp uint64 consistencyLevel commonpb.ConsistencyLevel partitionKeyIsolation bool - replicateID string updateTimestamp uint64 collectionTTL uint64 numPartitions int64 @@ -489,7 +488,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, m.collInfo[database] = make(map[string]*collectionInfo) } - replicateID, _ := common.GetReplicateID(collection.Properties) m.collInfo[database][collectionName] = &collectionInfo{ collID: collection.CollectionID, schema: schemaInfo, @@ -498,7 +496,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, createdUtcTimestamp: collection.CreatedUtcTimestamp, consistencyLevel: collection.ConsistencyLevel, partitionKeyIsolation: isolation, - replicateID: replicateID, updateTimestamp: collection.UpdateTimestamp, collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()), vChannels: collection.VirtualChannelNames, diff --git a/internal/proxy/mock_msgstream_test.go b/internal/proxy/mock_msgstream_test.go index 06180485f1..308e7ed94f 100644 --- a/internal/proxy/mock_msgstream_test.go +++ b/internal/proxy/mock_msgstream_test.go @@ -10,10 +10,9 @@ import ( type mockMsgStream struct { msgstream.MsgStream - asProducer func([]string) - setRepack func(repackFunc msgstream.RepackFunc) - close func() - forceEnableProduce func(bool) + asProducer func([]string) + setRepack func(repackFunc msgstream.RepackFunc) + close func() } func (m *mockMsgStream) AsProducer(ctx context.Context, producers []string) { @@ -34,15 +33,6 @@ func (m *mockMsgStream) Close() { } } -func (m *mockMsgStream) ForceEnableProduce(enabled bool) { - if m.forceEnableProduce != nil { - m.forceEnableProduce(enabled) - } -} - -func (m *mockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) { -} - func newMockMsgStream() *mockMsgStream { return &mockMsgStream{} } diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index f790258785..d15f58c83a 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -314,12 +314,6 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error { return nil } -func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) { -} - -func (ms *simpleMockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) { -} - func newSimpleMockMsgStream() *simpleMockMsgStream { return &simpleMockMsgStream{ msgChan: make(chan *msgstream.ConsumeMsgPack, 1024), diff --git a/internal/proxy/repack_func.go b/internal/proxy/repack_func.go index 844156d82d..454e3fe2b2 100644 --- a/internal/proxy/repack_func.go +++ b/internal/proxy/repack_func.go @@ -81,14 +81,3 @@ func defaultInsertRepackFunc( } return pack, nil } - -func replicatePackFunc( - tsMsgs []msgstream.TsMsg, - hashKeys [][]int32, -) (map[int32]*msgstream.MsgPack, error) { - return map[int32]*msgstream.MsgPack{ - 0: { - Msgs: tsMsgs, - }, - }, nil -} diff --git a/internal/proxy/task.go b/internal/proxy/task.go index d511f629ba..2872d42273 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -37,7 +37,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" - "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -1366,26 +1365,6 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error { "can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first") } } - - _, ok := common.IsReplicateEnabled(t.Properties) - if ok { - return merr.WrapErrParameterInvalidMsg("can't set the replicate.id property") - } - endTS, ok := common.GetReplicateEndTS(t.Properties) - if ok && collBasicInfo.replicateID != "" { - allocResp, err := t.mixCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ - Count: 1, - BlockTimestamp: endTS, - }) - if err = merr.CheckRPCCall(allocResp, err); err != nil { - return merr.WrapErrServiceInternal("alloc timestamp failed", err.Error()) - } - if allocResp.GetTimestamp() <= endTS { - return merr.WrapErrServiceInternal("alter collection: alloc timestamp failed, timestamp is not greater than endTS", - fmt.Sprintf("timestamp = %d, endTS = %d", allocResp.GetTimestamp(), endTS)) - } - } - return nil } diff --git a/internal/proxy/task_database.go b/internal/proxy/task_database.go index da433e22b6..9d2bfaa12d 100644 --- a/internal/proxy/task_database.go +++ b/internal/proxy/task_database.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "fmt" "go.uber.org/zap" @@ -282,33 +281,6 @@ func (t *alterDatabaseTask) PreExecute(ctx context.Context) error { return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", userDefinedTimezone) } } - _, ok := common.GetReplicateID(t.Properties) - if ok { - return merr.WrapErrParameterInvalidMsg("can't set the replicate id property in alter database request") - } - endTS, ok := common.GetReplicateEndTS(t.Properties) - if !ok { // not exist replicate end ts property - return nil - } - cacheInfo, err := globalMetaCache.GetDatabaseInfo(ctx, t.DbName) - if err != nil { - return err - } - oldReplicateEnable, _ := common.IsReplicateEnabled(cacheInfo.properties) - if !oldReplicateEnable { // old replicate enable is false - return merr.WrapErrParameterInvalidMsg("can't set the replicate end ts property in alter database request when db replicate is disabled") - } - allocResp, err := t.mixCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ - Count: 1, - BlockTimestamp: endTS, - }) - if err = merr.CheckRPCCall(allocResp, err); err != nil { - return merr.WrapErrServiceInternal("alloc timestamp failed", err.Error()) - } - if allocResp.GetTimestamp() <= endTS { - return merr.WrapErrServiceInternal("alter database: alloc timestamp failed, timestamp is not greater than endTS", - fmt.Sprintf("timestamp = %d, endTS = %d", allocResp.GetTimestamp(), endTS)) - } return nil } diff --git a/internal/proxy/task_database_test.go b/internal/proxy/task_database_test.go index c068d47981..d26ece4ba8 100644 --- a/internal/proxy/task_database_test.go +++ b/internal/proxy/task_database_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "google.golang.org/grpc/metadata" @@ -202,163 +201,6 @@ func TestAlterDatabase(t *testing.T) { assert.Nil(t, err1) } -func TestAlterDatabaseTaskForReplicateProperty(t *testing.T) { - rc := mocks.NewMockMixCoordClient(t) - cache := globalMetaCache - defer func() { globalMetaCache = cache }() - mockCache := NewMockCache(t) - globalMetaCache = mockCache - - t.Run("replicate id", func(t *testing.T) { - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.MmapEnabledKey, - Value: "true", - }, - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - - t.Run("fail to get database info", func(t *testing.T) { - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once() - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1000", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - - t.Run("not enable replicate", func(t *testing.T) { - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{ - properties: []*commonpb.KeyValuePair{}, - }, nil).Once() - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1000", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - - t.Run("fail to alloc ts", func(t *testing.T) { - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{ - properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, nil).Once() - rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once() - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1000", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - - t.Run("alloc wrong ts", func(t *testing.T) { - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{ - properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, nil).Once() - rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ - Status: merr.Success(), - Timestamp: 999, - }, nil).Once() - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1000", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - - t.Run("alloc wrong ts", func(t *testing.T) { - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{ - properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, nil).Once() - rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ - Status: merr.Success(), - Timestamp: 1001, - }, nil).Once() - task := &alterDatabaseTask{ - AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ - Base: &commonpb.MsgBase{}, - DbName: "test_alter_database", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1000", - }, - }, - }, - mixCoord: rc, - } - err := task.PreExecute(context.Background()) - assert.NoError(t, err) - }) -} - func TestDescribeDatabaseTask(t *testing.T) { rc := mocks.NewMockMixCoordClient(t) diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index c4ba8c00e0..e565f632e8 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -283,15 +283,6 @@ func (dr *deleteRunner) Init(ctx context.Context) error { return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound)) } - replicateID, err := GetReplicateID(ctx, dr.req.GetDbName(), collName) - if err != nil { - log.Warn("get replicate info failed", zap.String("collectionName", collName), zap.Error(err)) - return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) - } - if replicateID != "" { - return merr.WrapErrCollectionReplicateMode("delete") - } - dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName) if err != nil { return ErrWithLog(log, "Failed to get collection schema", err) diff --git a/internal/proxy/task_delete_test.go b/internal/proxy/task_delete_test.go index ec493b2c28..6b4753c44c 100644 --- a/internal/proxy/task_delete_test.go +++ b/internal/proxy/task_delete_test.go @@ -455,37 +455,12 @@ func (s *DeleteRunnerSuite) TestInitFailure() { s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). Return(s.collectionID, nil) - s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{}, nil) s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("mock GetCollectionSchema err")) globalMetaCache = s.mockCache s.Error(dr.Init(context.Background())) }) - s.Run("deny delete in the replicate mode", func() { - dr := deleteRunner{req: &milvuspb.DeleteRequest{ - CollectionName: s.collectionName, - }} - s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) - s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&collectionInfo{replicateID: "local-mac"}, nil) - - globalMetaCache = s.mockCache - s.Error(dr.Init(context.Background())) - }) - - s.Run("fail get replicateID", func() { - dr := deleteRunner{req: &milvuspb.DeleteRequest{ - CollectionName: s.collectionName, - }} - s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). - Return(s.collectionID, nil) - s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock getCollectionInfo err")) - globalMetaCache = s.mockCache - s.Error(dr.Init(context.Background())) - }) s.Run("create plan failed", func() { dr := deleteRunner{ diff --git a/internal/proxy/task_flush_all_test.go b/internal/proxy/task_flush_all_test.go index 1177b8bd93..3dc9090db2 100644 --- a/internal/proxy/task_flush_all_test.go +++ b/internal/proxy/task_flush_all_test.go @@ -26,14 +26,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/mocks" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/util/uniquegenerator" ) -func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordClient, *msgstream.MockMsgStream, context.Context) { +func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordClient, context.Context) { ctx := context.Background() mixCoord := mocks.NewMockMixCoordClient(t) - replicateMsgStream := msgstream.NewMockMsgStream(t) task := &flushAllTask{ baseTask: baseTask{}, @@ -50,22 +48,20 @@ func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordCli mixCoord: mixCoord, } - return task, mixCoord, replicateMsgStream, ctx + return task, mixCoord, ctx } func TestFlushAllTaskTraceCtx(t *testing.T) { - task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t) + task, mixCoord, ctx := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) traceCtx := task.TraceCtx() assert.Equal(t, ctx, traceCtx) } func TestFlushAllTaskID(t *testing.T) { - task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t) + task, mixCoord, _ := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) // Test getting ID originalID := task.ID() @@ -78,27 +74,24 @@ func TestFlushAllTaskID(t *testing.T) { } func TestFlushAllTaskName(t *testing.T) { - task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t) + task, mixCoord, _ := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) name := task.Name() assert.Equal(t, FlushAllTaskName, name) } func TestFlushAllTaskType(t *testing.T) { - task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t) + task, mixCoord, _ := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) msgType := task.Type() assert.Equal(t, commonpb.MsgType_Flush, msgType) } func TestFlushAllTaskTimestampMethods(t *testing.T) { - task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t) + task, mixCoord, _ := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) originalTs := task.BeginTs() assert.Equal(t, originalTs, task.EndTs()) @@ -129,8 +122,7 @@ func TestFlushAllTaskOnEnqueue(t *testing.T) { assert.Equal(t, commonpb.MsgType_Flush, task.Base.MsgType) // Test with existing Base - task, _, replicateMsgStream, _ := createTestFlushAllTask(t) - defer replicateMsgStream.AssertExpectations(t) + task, _, _ = createTestFlushAllTask(t) err = task.OnEnqueue() assert.NoError(t, err) @@ -138,18 +130,16 @@ func TestFlushAllTaskOnEnqueue(t *testing.T) { } func TestFlushAllTaskPreExecute(t *testing.T) { - task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t) + task, mixCoord, ctx := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) err := task.PreExecute(ctx) assert.NoError(t, err) } func TestFlushAllTaskPostExecute(t *testing.T) { - task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t) + task, mixCoord, ctx := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) err := task.PostExecute(ctx) assert.NoError(t, err) @@ -159,9 +149,8 @@ func TestFlushAllTaskImplementsTaskInterface(t *testing.T) { // Verify that flushAllTask implements the task interface var _ task = (*flushAllTask)(nil) - task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t) + task, mixCoord, _ := createTestFlushAllTask(t) defer mixCoord.AssertExpectations(t) - defer replicateMsgStream.AssertExpectations(t) // Test all interface methods are accessible assert.NotNil(t, task.TraceCtx) diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index efe2a8a6e6..e17cdbd5da 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -124,15 +124,6 @@ func (it *insertTask) PreExecute(ctx context.Context) error { return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")) } - replicateID, err := GetReplicateID(it.ctx, it.insertMsg.GetDbName(), collectionName) - if err != nil { - log.Warn("get replicate id failed", zap.String("collectionName", collectionName), zap.Error(err)) - return merr.WrapErrAsInputError(err) - } - if replicateID != "" { - return merr.WrapErrCollectionReplicateMode("insert") - } - collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName) if err != nil { log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err)) diff --git a/internal/proxy/task_insert_test.go b/internal/proxy/task_insert_test.go index d95c175592..d8190ecf8e 100644 --- a/internal/proxy/task_insert_test.go +++ b/internal/proxy/task_insert_test.go @@ -405,11 +405,6 @@ func TestInsertTask_KeepUserPK_WhenAllowInsertAutoIDTrue(t *testing.T) { mock.Anything, ).Return(&collectionInfo{schema: info}, nil) - cache.On("GetDatabaseInfo", - mock.Anything, - mock.Anything, - ).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil) - globalMetaCache = cache err = task.PreExecute(context.Background()) @@ -557,11 +552,6 @@ func TestInsertTask_Function(t *testing.T) { mock.Anything, mock.Anything, ).Return(&collectionInfo{schema: info}, nil) - cache.On("GetDatabaseInfo", - mock.Anything, - mock.Anything, - ).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil) - globalMetaCache = cache err = task.PreExecute(ctx) assert.NoError(t, err) @@ -589,7 +579,6 @@ func TestInsertTaskForSchemaMismatch(t *testing.T) { mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ updateTimestamp: 100, }, nil) - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) err := it.PreExecute(ctx) assert.Error(t, err) assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch) diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 3e3ff2054b..c89edc75ba 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -47,7 +47,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" - "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -4743,139 +4742,6 @@ func TestTaskPartitionKeyIsolation(t *testing.T) { }) } -func TestAlterCollectionForReplicateProperty(t *testing.T) { - cache := globalMetaCache - defer func() { globalMetaCache = cache }() - mockCache := NewMockCache(t) - mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ - replicateID: "local-mac-1", - }, nil).Maybe() - mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(1, nil).Maybe() - mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{}, nil).Maybe() - globalMetaCache = mockCache - ctx := context.Background() - mockRootcoord := mocks.NewMockMixCoordClient(t) - t.Run("invalid replicate id", func(t *testing.T) { - task := &alterCollectionTask{ - AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "xxxxx", - }, - }, - }, - mixCoord: mockRootcoord, - } - - err := task.PreExecute(ctx) - assert.Error(t, err) - }) - - t.Run("empty replicate id", func(t *testing.T) { - task := &alterCollectionTask{ - AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ - CollectionName: "test", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "", - }, - }, - }, - mixCoord: mockRootcoord, - } - - err := task.PreExecute(ctx) - assert.Error(t, err) - }) - - t.Run("fail to alloc ts", func(t *testing.T) { - task := &alterCollectionTask{ - AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ - CollectionName: "test", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "100", - }, - }, - }, - mixCoord: mockRootcoord, - } - - mockRootcoord.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once() - err := task.PreExecute(ctx) - assert.Error(t, err) - }) - - t.Run("alloc wrong ts", func(t *testing.T) { - task := &alterCollectionTask{ - AlterCollectionRequest: &milvuspb.AlterCollectionRequest{ - CollectionName: "test", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "100", - }, - }, - }, - mixCoord: mockRootcoord, - } - - mockRootcoord.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ - Status: merr.Success(), - Timestamp: 99, - }, nil).Once() - err := task.PreExecute(ctx) - assert.Error(t, err) - }) -} - -func TestInsertForReplicate(t *testing.T) { - cache := globalMetaCache - defer func() { globalMetaCache = cache }() - mockCache := NewMockCache(t) - globalMetaCache = mockCache - - t.Run("get replicate id fail", func(t *testing.T) { - mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once() - task := &insertTask{ - insertMsg: &msgstream.InsertMsg{ - InsertRequest: &msgpb.InsertRequest{ - CollectionName: "foo", - }, - }, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) - t.Run("insert with replicate id", func(t *testing.T) { - mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ - schema: &schemaInfo{ - CollectionSchema: &schemapb.CollectionSchema{ - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-mac", - }, - }, - }, - }, - replicateID: "local-mac", - }, nil).Once() - task := &insertTask{ - insertMsg: &msgstream.InsertMsg{ - InsertRequest: &msgpb.InsertRequest{ - CollectionName: "foo", - }, - }, - } - err := task.PreExecute(context.Background()) - assert.Error(t, err) - }) -} - func TestAlterCollectionFieldCheckLoaded(t *testing.T) { qc := NewMixCoordMock() InitMetaCache(context.Background(), qc) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index a748e9f3a7..4a48daaa0e 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -1035,15 +1035,6 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { Timestamp: it.EndTs(), } - replicateID, err := GetReplicateID(ctx, it.req.GetDbName(), collectionName) - if err != nil { - log.Warn("get replicate info failed", zap.String("collectionName", collectionName), zap.Error(err)) - return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) - } - if replicateID != "" { - return merr.WrapErrCollectionReplicateMode("upsert") - } - // check collection exists collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName) if err != nil { diff --git a/internal/proxy/task_upsert_test.go b/internal/proxy/task_upsert_test.go index 5aa07a74d5..c0b94a14c8 100644 --- a/internal/proxy/task_upsert_test.go +++ b/internal/proxy/task_upsert_test.go @@ -20,7 +20,6 @@ import ( "testing" "github.com/bytedance/mockey" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -340,40 +339,6 @@ func TestUpsertTask(t *testing.T) { }) } -func TestUpsertTaskForReplicate(t *testing.T) { - cache := globalMetaCache - defer func() { globalMetaCache = cache }() - mockCache := NewMockCache(t) - globalMetaCache = mockCache - ctx := context.Background() - - t.Run("fail to get collection info", func(t *testing.T) { - ut := upsertTask{ - ctx: ctx, - req: &milvuspb.UpsertRequest{ - CollectionName: "col-0", - }, - } - mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("foo")).Once() - err := ut.PreExecute(ctx) - assert.Error(t, err) - }) - - t.Run("replicate mode", func(t *testing.T) { - ut := upsertTask{ - ctx: ctx, - req: &milvuspb.UpsertRequest{ - CollectionName: "col-0", - }, - } - mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ - replicateID: "local-mac", - }, nil).Once() - err := ut.PreExecute(ctx) - assert.Error(t, err) - }) -} - func TestUpsertTask_Function(t *testing.T) { paramtable.Init() paramtable.Get().CredentialCfg.Credential.GetFunc = func() map[string]string { @@ -537,7 +502,6 @@ func TestUpsertTaskForSchemaMismatch(t *testing.T) { mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ updateTimestamp: 100, }, nil) - mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) err := ut.PreExecute(ctx) assert.Error(t, err) assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch) @@ -965,8 +929,6 @@ func TestUpdateTask_PreExecute_Success(t *testing.T) { // Setup mocks globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() - mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{ @@ -1002,27 +964,10 @@ func TestUpdateTask_PreExecute_Success(t *testing.T) { }) } -func TestUpdateTask_PreExecute_ReplicateIDError(t *testing.T) { - mockey.PatchConvey("TestUpdateTask_PreExecute_ReplicateIDError", t, func() { - globalMetaCache = &MetaCache{} - - mockey.Mock(GetReplicateID).Return("replica1", nil).Build() - - task := createTestUpdateTask() - - err := task.PreExecute(context.Background()) - - assert.Error(t, err) - assert.Contains(t, err.Error(), "can't operate on the collection under standby mode") - }) -} - func TestUpdateTask_PreExecute_GetCollectionIDError(t *testing.T) { mockey.PatchConvey("TestUpdateTask_PreExecute_GetCollectionIDError", t, func() { globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() - expectedErr := merr.WrapErrCollectionNotFound("test_collection") mockey.Mock((*MetaCache).GetCollectionID).Return(int64(0), expectedErr).Build() @@ -1038,7 +983,6 @@ func TestUpdateTask_PreExecute_PartitionKeyModeError(t *testing.T) { mockey.PatchConvey("TestUpdateTask_PreExecute_PartitionKeyModeError", t, func() { globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{ updateTimestamp: 12345, @@ -1061,7 +1005,6 @@ func TestUpdateTask_PreExecute_InvalidNumRows(t *testing.T) { mockey.PatchConvey("TestUpdateTask_PreExecute_InvalidNumRows", t, func() { globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{ updateTimestamp: 12345, @@ -1087,7 +1030,6 @@ func TestUpdateTask_PreExecute_QueryPreExecuteError(t *testing.T) { mockey.PatchConvey("TestUpdateTask_PreExecute_QueryPreExecuteError", t, func() { globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{ updateTimestamp: 12345, @@ -1695,7 +1637,6 @@ func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) { mockey.PatchConvey("test nullable field", t, func() { // Setup mocks using mockey - mockey.Mock(GetReplicateID).Return("", nil).Build() mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build() mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build() @@ -1814,7 +1755,6 @@ func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) { mockey.PatchConvey("test non-nullable field", t, func() { // Setup mocks using mockey - mockey.Mock(GetReplicateID).Return("", nil).Build() mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build() mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build() diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 73a1c34163..a6b6803191 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -2655,25 +2655,6 @@ func GetFailedResponse(req any, err error) any { return nil } -func GetReplicateID(ctx context.Context, database, collectionName string) (string, error) { - if globalMetaCache == nil { - return "", merr.WrapErrServiceUnavailable("internal: Milvus Proxy is not ready yet. please wait") - } - colInfo, err := globalMetaCache.GetCollectionInfo(ctx, database, collectionName, 0) - if err != nil { - return "", err - } - if colInfo.replicateID != "" { - return colInfo.replicateID, nil - } - dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, database) - if err != nil { - return "", err - } - replicateID, _ := common.GetReplicateID(dbInfo.properties) - return replicateID, nil -} - func GetFunctionOutputFields(collSchema *schemapb.CollectionSchema) []string { fields := make([]string, 0) for _, fSchema := range collSchema.Functions { diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 936fcee6f4..3536df2dff 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -19,9 +19,7 @@ package pipeline import ( "github.com/milvus-io/milvus/internal/querynodev2/delegator" base "github.com/milvus-io/milvus/internal/util/pipeline" - "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -53,16 +51,11 @@ func NewPipeLine( delegator delegator.ShardDelegator, ) (Pipeline, error) { collectionID := collection.ID() - replicateID, _ := common.GetReplicateID(collection.Schema().GetProperties()) - if replicateID == "" { - replicateID, _ = common.GetReplicateID(collection.GetDBProperties()) - } - replicateConfig := msgstream.GetReplicateConfig(replicateID, collection.GetDBName(), collection.Schema().Name) pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32() p := &pipeline{ collectionID: collectionID, - StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel, replicateConfig), + StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel), } filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength) diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 605995bc77..130a1a410e 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -24,13 +24,11 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks/util/mock_segcore" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" @@ -104,12 +102,6 @@ func (suite *PipelineTestSuite) TestBasic() { schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true) collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, - DbProperties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, }) suite.Require().NoError(err) suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection) diff --git a/internal/querynodev2/segments/collection_test.go b/internal/querynodev2/segments/collection_test.go index f21edf3209..699e1878eb 100644 --- a/internal/querynodev2/segments/collection_test.go +++ b/internal/querynodev2/segments/collection_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/suite" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks/util/mock_segcore" "github.com/milvus-io/milvus/pkg/v2/common" @@ -43,12 +42,6 @@ func (s *CollectionManagerSuite) SetupTest() { schema := mock_segcore.GenTestCollectionSchema("collection_1", schemapb.DataType_Int64, false) err := s.cm.PutOrRef(1, schema, mock_segcore.GenTestIndexMeta(1, schema), &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, - DbProperties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, }) s.Require().NoError(err) } diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index de250c8586..ad7b8b1134 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -1121,12 +1121,6 @@ func TestCreateCollectionTask_Prepare_WithProperty(t *testing.T) { meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, mock.Anything).Return(&model.Database{ Name: "foo", ID: 1, - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, }, nil).Twice() meta.EXPECT().ListAllAvailCollections(mock.Anything).Return(map[int64][]int64{ util.DefaultDBID: {1, 2}, diff --git a/internal/rootcoord/ddl_callbacks_alter_database.go b/internal/rootcoord/ddl_callbacks_alter_database.go index 891479c097..13af5bc672 100644 --- a/internal/rootcoord/ddl_callbacks_alter_database.go +++ b/internal/rootcoord/ddl_callbacks_alter_database.go @@ -159,14 +159,6 @@ func (c *DDLCallback) alterDatabaseV1AckCallback(ctx context.Context, result mes } func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { - _, existEndTS := common.GetReplicateEndTS(updatedProps) - if existEndTS { - updatedProps = append(updatedProps, &commonpb.KeyValuePair{ - Key: common.ReplicateIDKey, - Value: "", - }) - } - props := make(map[string]string) for _, prop := range oldProps { props[prop.Key] = prop.Value diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6d0d34975b..ae72f58b87 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -183,9 +183,6 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error { } func (c *Core) sendMinDdlTsAsTt() { - if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() { - return - } log := log.Ctx(c.ctx) code := c.GetStateCode() if code != commonpb.StateCode_Healthy { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index efefb46a0b..97033ead12 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1179,10 +1179,6 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) { c.UpdateStateCode(commonpb.StateCode_Healthy) - _ = paramtable.Get().Save(paramtable.Get().CommonCfg.TTMsgEnabled.Key, "false") - c.sendMinDdlTsAsTt() // disable ts msg - _ = paramtable.Get().Save(paramtable.Get().CommonCfg.TTMsgEnabled.Key, "true") - c.sendMinDdlTsAsTt() // no session. ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}) c.sendMinDdlTsAsTt() diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 33885deb78..e296c38499 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -41,13 +41,12 @@ type StreamPipeline interface { } type streamPipeline struct { - pipeline *pipeline - input <-chan *msgstream.MsgPack - scanner streaming.Scanner - dispatcher msgdispatcher.Client - startOnce sync.Once - vChannel string - replicateConfig *msgstream.ReplicateConfig + pipeline *pipeline + input <-chan *msgstream.MsgPack + scanner streaming.Scanner + dispatcher msgdispatcher.Client + startOnce sync.Once + vChannel string closeCh chan struct{} // notify work to exit closeWg sync.WaitGroup @@ -96,10 +95,9 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M start := time.Now() p.input, err = p.dispatcher.Register(ctx, &msgdispatcher.StreamConfig{ - VChannel: p.vChannel, - Pos: position, - SubPos: common.SubscriptionPositionUnknown, - ReplicateConfig: p.replicateConfig, + VChannel: p.vChannel, + Pos: position, + SubPos: common.SubscriptionPositionUnknown, }) if err != nil { log.Error("dispatcher register failed after retried", zap.String("channel", position.ChannelName), zap.Error(err)) @@ -151,7 +149,6 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time.Duration, enableTtChecker bool, vChannel string, - replicateConfig *msgstream.ReplicateConfig, ) StreamPipeline { pipeline := &streamPipeline{ pipeline: &pipeline{ @@ -159,12 +156,11 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval: nodeTtInterval, enableTtChecker: enableTtChecker, }, - dispatcher: dispatcher, - vChannel: vChannel, - replicateConfig: replicateConfig, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, - lastAccessTime: atomic.NewTime(time.Now()), + dispatcher: dispatcher, + vChannel: vChannel, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, + lastAccessTime: atomic.NewTime(time.Now()), } return pipeline diff --git a/internal/util/pipeline/stream_pipeline_test.go b/internal/util/pipeline/stream_pipeline_test.go index f18194b08a..4cb6a43031 100644 --- a/internal/util/pipeline/stream_pipeline_test.go +++ b/internal/util/pipeline/stream_pipeline_test.go @@ -50,7 +50,7 @@ func (suite *StreamPipelineSuite) SetupTest() { suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T()) suite.msgDispatcher.EXPECT().Register(mock.Anything, mock.Anything).Return(suite.inChannel, nil) suite.msgDispatcher.EXPECT().Deregister(suite.channel) - suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel, nil) + suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel) suite.length = 4 } diff --git a/pkg/common/common.go b/pkg/common/common.go index fc566a2216..120411d305 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -241,8 +241,6 @@ const ( PartitionKeyIsolationKey = "partitionkey.isolation" FieldSkipLoadKey = "field.skipLoad" IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled" - ReplicateIDKey = "replicate.id" - ReplicateEndTSKey = "replicate.endTS" IndexNonEncoding = "index.nonEncoding" EnableDynamicSchemaKey = `dynamicfield.enabled` NamespaceEnabledKey = "namespace.enabled" @@ -514,33 +512,6 @@ func ShouldFieldBeLoaded(kvs []*commonpb.KeyValuePair) (bool, error) { return true, nil } -func IsReplicateEnabled(kvs []*commonpb.KeyValuePair) (bool, bool) { - replicateID, ok := GetReplicateID(kvs) - return replicateID != "", ok -} - -func GetReplicateID(kvs []*commonpb.KeyValuePair) (string, bool) { - for _, kv := range kvs { - if kv.GetKey() == ReplicateIDKey { - return kv.GetValue(), true - } - } - return "", false -} - -func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) { - for _, kv := range kvs { - if kv.GetKey() == ReplicateEndTSKey { - ts, err := strconv.ParseUint(kv.GetValue(), 10, 64) - if err != nil { - return 0, false - } - return ts, true - } - } - return 0, false -} - func IsEnableDynamicSchema(kvs []*commonpb.KeyValuePair) (found bool, value bool, err error) { for _, kv := range kvs { if kv.GetKey() == EnableDynamicSchemaKey { diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index df9695d9a9..9d6bf68a26 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -178,87 +178,6 @@ func TestShouldFieldBeLoaded(t *testing.T) { } } -func TestReplicateProperty(t *testing.T) { - t.Run("ReplicateID", func(t *testing.T) { - { - p := []*commonpb.KeyValuePair{ - { - Key: ReplicateIDKey, - Value: "1001", - }, - } - e, ok := IsReplicateEnabled(p) - assert.True(t, e) - assert.True(t, ok) - i, ok := GetReplicateID(p) - assert.True(t, ok) - assert.Equal(t, "1001", i) - } - - { - p := []*commonpb.KeyValuePair{ - { - Key: ReplicateIDKey, - Value: "", - }, - } - e, ok := IsReplicateEnabled(p) - assert.False(t, e) - assert.True(t, ok) - } - - { - p := []*commonpb.KeyValuePair{ - { - Key: "foo", - Value: "1001", - }, - } - e, ok := IsReplicateEnabled(p) - assert.False(t, e) - assert.False(t, ok) - } - }) - - t.Run("ReplicateTS", func(t *testing.T) { - { - p := []*commonpb.KeyValuePair{ - { - Key: ReplicateEndTSKey, - Value: "1001", - }, - } - ts, ok := GetReplicateEndTS(p) - assert.True(t, ok) - assert.EqualValues(t, 1001, ts) - } - - { - p := []*commonpb.KeyValuePair{ - { - Key: ReplicateEndTSKey, - Value: "foo", - }, - } - ts, ok := GetReplicateEndTS(p) - assert.False(t, ok) - assert.EqualValues(t, 0, ts) - } - - { - p := []*commonpb.KeyValuePair{ - { - Key: "foo", - Value: "1001", - }, - } - ts, ok := GetReplicateEndTS(p) - assert.False(t, ok) - assert.EqualValues(t, 0, ts) - } - }) -} - func TestIsEnableDynamicSchema(t *testing.T) { type testCase struct { tag string diff --git a/pkg/mq/common/message.go b/pkg/mq/common/message.go index affac6ad0f..f668022aa7 100644 --- a/pkg/mq/common/message.go +++ b/pkg/mq/common/message.go @@ -81,7 +81,6 @@ const ( TimestampTypeKey = "timestamp" ChannelTypeKey = "vchannel" CollectionIDTypeKey = "collection_id" - ReplicateIDTypeKey = "replicate_id" ) // GetMsgType gets the message type from message. diff --git a/pkg/mq/msgdispatcher/client.go b/pkg/mq/msgdispatcher/client.go index e04caccee8..935b1a234d 100644 --- a/pkg/mq/msgdispatcher/client.go +++ b/pkg/mq/msgdispatcher/client.go @@ -38,10 +38,9 @@ type ( ) type StreamConfig struct { - VChannel string - Pos *Pos - SubPos SubPos - ReplicateConfig *msgstream.ReplicateConfig + VChannel string + Pos *Pos + SubPos SubPos } func NewStreamConfig(vchannel string, pos *Pos, subPos SubPos) *StreamConfig { diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 8cc7b4fda0..1f9d3a15c3 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -89,7 +89,6 @@ func NewDispatcher( pchannel string, position *Pos, subPos SubPos, - includeCurrentMsg bool, pullbackEndTs typeutil.Timestamp, includeSkipWhenSplit bool, ) (*Dispatcher, error) { @@ -120,7 +119,7 @@ func NewDispatcher( return nil, err } log.Info("as consumer done", zap.Any("position", position)) - err = stream.Seek(ctx, []*Pos{position}, includeCurrentMsg) + err = stream.Seek(ctx, []*Pos{position}, true) if err != nil { log.Error("seek failed", zap.Error(err)) return nil, err @@ -253,14 +252,12 @@ func (d *Dispatcher) work() { for vchannel, p := range targetPacks { var err error t, _ := d.targets.Get(vchannel) - isReplicateChannel := strings.Contains(vchannel, paramtable.Get().CommonCfg.ReplicateMsgChannel.GetValue()) // The dispatcher seeks from the oldest target, // so for each target, msg before the target position must be filtered out. // // From 2.6.0, every message has a unique timetick, so we can filter out the msg by < but not <=. - if ((d.includeSkipWhenSplit && p.EndTs < t.pos.GetTimestamp()) || - (!d.includeSkipWhenSplit && p.EndTs <= t.pos.GetTimestamp())) && - !isReplicateChannel { + if (d.includeSkipWhenSplit && p.EndTs < t.pos.GetTimestamp()) || + (!d.includeSkipWhenSplit && p.EndTs <= t.pos.GetTimestamp()) { log.Info("skip msg", zap.String("vchannel", vchannel), zap.Int("msgCount", len(p.Msgs)), @@ -319,7 +316,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal // init packs for all targets, even though there's no msg in pack, // but we still need to dispatch time ticks to the targets. targetPacks := make(map[string]*MsgPack) - replicateConfigs := make(map[string]*msgstream.ReplicateConfig) d.targets.Range(func(vchannel string, t *target) bool { targetPacks[vchannel] = &MsgPack{ BeginTs: pack.BeginTs, @@ -328,9 +324,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal StartPositions: pack.StartPositions, EndPositions: pack.EndPositions, } - if t.replicateConfig != nil { - replicateConfigs[vchannel] = t.replicateConfig - } return true }) // group messages by vchannel @@ -350,14 +343,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal // we need to dispatch it to the vchannel of this collection targets := []string{} for k := range targetPacks { - if msg.GetType() == commonpb.MsgType_Replicate { - config := replicateConfigs[k] - if config != nil && msg.GetReplicateID() == config.ReplicateID { - targets = append(targets, k) - } - continue - } - if !strings.Contains(k, collectionID) { continue } @@ -386,59 +371,5 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal targetPacks[vchannel].Msgs = append(targetPacks[vchannel].Msgs, tsMsg) } } - replicateEndChannels := make(map[string]struct{}) - for vchannel, c := range replicateConfigs { - if len(targetPacks[vchannel].Msgs) == 0 { - delete(targetPacks, vchannel) // no replicate msg, can't send pack - continue - } - // calculate the new pack ts - beginTs := targetPacks[vchannel].Msgs[0].BeginTs() - endTs := targetPacks[vchannel].Msgs[0].EndTs() - newMsgs := make([]msgstream.TsMsg, 0) - for _, msg := range targetPacks[vchannel].Msgs { - if msg.BeginTs() < beginTs { - beginTs = msg.BeginTs() - } - if msg.EndTs() > endTs { - endTs = msg.EndTs() - } - if msg.Type() == commonpb.MsgType_Replicate { - replicateMsg := msg.(*msgstream.ReplicateMsg) - if c.CheckFunc(replicateMsg) { - replicateEndChannels[vchannel] = struct{}{} - } - continue - } - newMsgs = append(newMsgs, msg) - } - targetPacks[vchannel].Msgs = newMsgs - d.resetMsgPackTS(targetPacks[vchannel], beginTs, endTs) - } - for vchannel := range replicateEndChannels { - if t, ok := d.targets.Get(vchannel); ok { - t.replicateConfig = nil - log.Info("replicate end, set replicate config nil", zap.String("vchannel", vchannel)) - } - } return targetPacks } - -func (d *Dispatcher) resetMsgPackTS(pack *MsgPack, newBeginTs, newEndTs typeutil.Timestamp) { - pack.BeginTs = newBeginTs - pack.EndTs = newEndTs - startPositions := make([]*msgstream.MsgPosition, 0) - endPositions := make([]*msgstream.MsgPosition, 0) - for _, pos := range pack.StartPositions { - startPosition := typeutil.Clone(pos) - startPosition.Timestamp = newBeginTs - startPositions = append(startPositions, startPosition) - } - for _, pos := range pack.EndPositions { - endPosition := typeutil.Clone(pos) - endPosition.Timestamp = newEndTs - endPositions = append(endPositions, endPosition) - } - pack.StartPositions = startPositions - pack.EndPositions = endPositions -} diff --git a/pkg/mq/msgdispatcher/dispatcher_test.go b/pkg/mq/msgdispatcher/dispatcher_test.go index 42a333c16e..bd3be20ea8 100644 --- a/pkg/mq/msgdispatcher/dispatcher_test.go +++ b/pkg/mq/msgdispatcher/dispatcher_test.go @@ -36,7 +36,7 @@ func TestDispatcher(t *testing.T) { ctx := context.Background() t.Run("test base", func(t *testing.T) { d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0, false) + nil, common.SubscriptionPositionEarliest, 0, false) assert.NoError(t, err) assert.NotPanics(t, func() { d.Handle(start) @@ -65,7 +65,7 @@ func TestDispatcher(t *testing.T) { }, } d, err := NewDispatcher(ctx, factory, time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0, false) + nil, common.SubscriptionPositionEarliest, 0, false) assert.Error(t, err) assert.Nil(t, d) @@ -73,7 +73,7 @@ func TestDispatcher(t *testing.T) { t.Run("test target", func(t *testing.T) { d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0, false) + nil, common.SubscriptionPositionEarliest, 0, false) assert.NoError(t, err) output := make(chan *msgstream.MsgPack, 1024) @@ -128,7 +128,7 @@ func TestDispatcher(t *testing.T) { func BenchmarkDispatcher_handle(b *testing.B) { d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0, false) + nil, common.SubscriptionPositionEarliest, 0, false) assert.NoError(b, err) for i := 0; i < b.N; i++ { @@ -143,197 +143,44 @@ func BenchmarkDispatcher_handle(b *testing.B) { func TestGroupMessage(t *testing.T) { d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0", - nil, common.SubscriptionPositionEarliest, false, 0, false) + nil, common.SubscriptionPositionEarliest, 0, false) assert.NoError(t, err) d.AddTarget(newTarget(&StreamConfig{VChannel: "mock_pchannel_0_1v0"}, false)) d.AddTarget(newTarget(&StreamConfig{ - VChannel: "mock_pchannel_0_2v0", - ReplicateConfig: msgstream.GetReplicateConfig("local-test", "foo", "coo"), + VChannel: "mock_pchannel_0_2v0", }, false)) - { - // no replicate msg - packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ - BeginTs: 1, - EndTs: 10, - StartPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("1"), - Timestamp: 1, - }, + packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ + BeginTs: 1, + EndTs: 10, + StartPositions: []*msgstream.MsgPosition{ + { + ChannelName: "mock_pchannel_0", + MsgID: []byte("1"), + Timestamp: 1, }, - EndPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("10"), - Timestamp: 10, - }, + }, + EndPositions: []*msgstream.MsgPosition{ + { + ChannelName: "mock_pchannel_0", + MsgID: []byte("10"), + Timestamp: 10, }, - Msgs: []msgstream.TsMsg{ - &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 5, - EndTimestamp: 5, - }, - InsertRequest: &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - Timestamp: 5, - }, - ShardName: "mock_pchannel_0_1v0", + }, + Msgs: []msgstream.TsMsg{ + &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: 5, + EndTimestamp: 5, + }, + InsertRequest: &msgpb.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + Timestamp: 5, }, + ShardName: "mock_pchannel_0_1v0", }, }, - }), nil) - assert.Len(t, packs, 1) - } - - { - // equal to replicateID - packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ - BeginTs: 1, - EndTs: 10, - StartPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("1"), - Timestamp: 1, - }, - }, - EndPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("10"), - Timestamp: 10, - }, - }, - Msgs: []msgstream.TsMsg{ - &msgstream.ReplicateMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 100, - EndTimestamp: 100, - }, - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - Timestamp: 100, - ReplicateInfo: &commonpb.ReplicateInfo{ - ReplicateID: "local-test", - }, - }, - }, - }, - }, - }), nil) - assert.Len(t, packs, 2) - { - replicatePack := packs["mock_pchannel_0_2v0"] - assert.EqualValues(t, 100, replicatePack.BeginTs) - assert.EqualValues(t, 100, replicatePack.EndTs) - assert.EqualValues(t, 100, replicatePack.StartPositions[0].Timestamp) - assert.EqualValues(t, 100, replicatePack.EndPositions[0].Timestamp) - assert.Len(t, replicatePack.Msgs, 0) - } - { - replicatePack := packs["mock_pchannel_0_1v0"] - assert.EqualValues(t, 1, replicatePack.BeginTs) - assert.EqualValues(t, 10, replicatePack.EndTs) - assert.EqualValues(t, 1, replicatePack.StartPositions[0].Timestamp) - assert.EqualValues(t, 10, replicatePack.EndPositions[0].Timestamp) - assert.Len(t, replicatePack.Msgs, 0) - } - } - - { - // not equal to replicateID - packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ - BeginTs: 1, - EndTs: 10, - StartPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("1"), - Timestamp: 1, - }, - }, - EndPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("10"), - Timestamp: 10, - }, - }, - Msgs: []msgstream.TsMsg{ - &msgstream.ReplicateMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 100, - EndTimestamp: 100, - }, - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - Timestamp: 100, - ReplicateInfo: &commonpb.ReplicateInfo{ - ReplicateID: "local-test-1", // not equal to replicateID - }, - }, - }, - }, - }, - }), nil) - assert.Len(t, packs, 1) - replicatePack := packs["mock_pchannel_0_2v0"] - assert.Nil(t, replicatePack) - } - - { - // replicate end - replicateTarget, ok := d.targets.Get("mock_pchannel_0_2v0") - assert.True(t, ok) - assert.NotNil(t, replicateTarget.replicateConfig) - packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{ - BeginTs: 1, - EndTs: 10, - StartPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("1"), - Timestamp: 1, - }, - }, - EndPositions: []*msgstream.MsgPosition{ - { - ChannelName: "mock_pchannel_0", - MsgID: []byte("10"), - Timestamp: 10, - }, - }, - Msgs: []msgstream.TsMsg{ - &msgstream.ReplicateMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 100, - EndTimestamp: 100, - }, - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - Timestamp: 100, - ReplicateInfo: &commonpb.ReplicateInfo{ - ReplicateID: "local-test", - }, - }, - IsEnd: true, - Database: "foo", - }, - }, - }, - }), nil) - assert.Len(t, packs, 2) - replicatePack := packs["mock_pchannel_0_2v0"] - assert.EqualValues(t, 100, replicatePack.BeginTs) - assert.EqualValues(t, 100, replicatePack.EndTs) - assert.EqualValues(t, 100, replicatePack.StartPositions[0].Timestamp) - assert.EqualValues(t, 100, replicatePack.EndPositions[0].Timestamp) - assert.Nil(t, replicateTarget.replicateConfig) - } + }, + }), nil) + assert.Len(t, packs, 2) } diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index ea51ffb777..3e51df27f9 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -247,14 +247,11 @@ OUTER: } } - // For CDC, CDC needs to includeCurrentMsg when create new dispatcher - // and NOT includeCurrentMsg when create lag dispatcher. So if any dispatcher lagged, + // If any dispatcher is lagged, // we give up batch subscription and create dispatcher for only one target. - includeCurrentMsg := false for _, candidate := range candidateTargets { if candidate.isLagged { candidateTargets = []*target{candidate} - includeCurrentMsg = true candidate.isLagged = false break } @@ -272,7 +269,7 @@ OUTER: // TODO: add newDispatcher timeout param and init context id := c.idAllocator.Inc() - d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, includeCurrentMsg, latestTarget.pos.GetTimestamp(), c.includeSkipWhenSplit) + d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, latestTarget.pos.GetTimestamp(), c.includeSkipWhenSplit) if err != nil { panic(err) } @@ -381,19 +378,6 @@ func (c *dispatcherManager) tryMerge() { zap.Duration("dur", time.Since(start))) } -// deleteMetric remove specific prometheus metric, -// Lock/RLock is required before calling this method. -func (c *dispatcherManager) deleteMetric(channel string) { - nodeIDStr := fmt.Sprintf("%d", c.nodeID) - if c.role == typeutil.DataNodeRole { - metrics.DataNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel) - return - } - if c.role == typeutil.QueryNodeRole { - metrics.QueryNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel) - } -} - func (c *dispatcherManager) uploadMetric() { c.mu.RLock() defer c.mu.RUnlock() diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index 5474537abe..41251081ca 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -24,7 +24,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -38,18 +37,16 @@ type target struct { latestTimeTick uint64 isLagged bool - closeMu sync.Mutex - closeOnce sync.Once - closed bool - maxLag time.Duration - timer *time.Timer - replicateConfig *msgstream.ReplicateConfig + closeMu sync.Mutex + closeOnce sync.Once + closed bool + maxLag time.Duration + timer *time.Timer cancelCh lifetime.SafeChan } func newTarget(streamConfig *StreamConfig, filterSameTimeTick bool) *target { - replicateConfig := streamConfig.ReplicateConfig maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) t := &target{ vchannel: streamConfig.VChannel, @@ -61,14 +58,8 @@ func newTarget(streamConfig *StreamConfig, filterSameTimeTick bool) *target { cancelCh: lifetime.NewSafeChan(), maxLag: maxTolerantLag, timer: time.NewTimer(maxTolerantLag), - replicateConfig: replicateConfig, } t.closed = false - if replicateConfig != nil { - log.Info("have replicate config", - zap.String("vchannel", streamConfig.VChannel), - zap.String("replicateID", replicateConfig.ReplicateID)) - } return t } diff --git a/pkg/mq/msgstream/mock_msgstream.go b/pkg/mq/msgstream/mock_msgstream.go index f06e6df9be..e0645739e1 100644 --- a/pkg/mq/msgstream/mock_msgstream.go +++ b/pkg/mq/msgstream/mock_msgstream.go @@ -292,39 +292,6 @@ func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Clos return _c } -// ForceEnableProduce provides a mock function with given fields: can -func (_m *MockMsgStream) ForceEnableProduce(can bool) { - _m.Called(can) -} - -// MockMsgStream_ForceEnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceEnableProduce' -type MockMsgStream_ForceEnableProduce_Call struct { - *mock.Call -} - -// ForceEnableProduce is a helper method to define mock.On call -// - can bool -func (_e *MockMsgStream_Expecter) ForceEnableProduce(can interface{}) *MockMsgStream_ForceEnableProduce_Call { - return &MockMsgStream_ForceEnableProduce_Call{Call: _e.mock.On("ForceEnableProduce", can)} -} - -func (_c *MockMsgStream_ForceEnableProduce_Call) Run(run func(can bool)) *MockMsgStream_ForceEnableProduce_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(bool)) - }) - return _c -} - -func (_c *MockMsgStream_ForceEnableProduce_Call) Return() *MockMsgStream_ForceEnableProduce_Call { - _c.Call.Return() - return _c -} - -func (_c *MockMsgStream_ForceEnableProduce_Call) RunAndReturn(run func(bool)) *MockMsgStream_ForceEnableProduce_Call { - _c.Run(run) - return _c -} - // GetLatestMsgID provides a mock function with given fields: channel func (_m *MockMsgStream) GetLatestMsgID(channel string) (common.MessageID, error) { ret := _m.Called(channel) diff --git a/pkg/mq/msgstream/mq_factory.go b/pkg/mq/msgstream/mq_factory.go index 12ffc693b1..b522db50ef 100644 --- a/pkg/mq/msgstream/mq_factory.go +++ b/pkg/mq/msgstream/mq_factory.go @@ -308,9 +308,6 @@ func (w WpMsgStream) CheckTopicValid(channel string) error { return nil } -func (w WpMsgStream) ForceEnableProduce(can bool) { -} - // NewWpmsFactory creates a new message stream factory based on woodpecker func NewWpmsFactory(cfg *paramtable.ServiceParam) Factory { // TODO should not be used in mq wrapper diff --git a/pkg/mq/msgstream/mq_factory_test.go b/pkg/mq/msgstream/mq_factory_test.go index ee1764d26b..6280b9cd65 100644 --- a/pkg/mq/msgstream/mq_factory_test.go +++ b/pkg/mq/msgstream/mq_factory_test.go @@ -204,7 +204,6 @@ func TestWpMsgStream(t *testing.T) { wpStream.Close() wpStream.AsProducer(ctx, []string{"test-channel"}) wpStream.SetRepackFunc(nil) - wpStream.ForceEnableProduce(true) // Test methods returning nil/empty values msgID, err := wpStream.GetLatestMsgID("test-channel") diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 56f969f551..b47535ab2d 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -20,14 +20,12 @@ import ( "context" "fmt" "path/filepath" - "strconv" "sync" "sync/atomic" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" - uatomic "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -35,7 +33,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" @@ -47,10 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -var ( - _ MsgStream = (*mqMsgStream)(nil) - streamCounter uatomic.Int64 -) +var _ MsgStream = (*mqMsgStream)(nil) type mqMsgStream struct { ctx context.Context @@ -60,22 +54,16 @@ type mqMsgStream struct { consumers map[string]mqwrapper.Consumer consumerChannels []string - repackFunc RepackFunc - unmarshal UnmarshalDispatcher - receiveBuf chan *ConsumeMsgPack - closeRWMutex *sync.RWMutex - streamCancel func() - bufSize int64 - producerLock *sync.RWMutex - consumerLock *sync.Mutex - closed int32 - onceChan sync.Once - ttMsgEnable atomic.Value - forceEnableProduce atomic.Value - configEvent config.EventHandler - - replicateID string - checkFunc CheckReplicateMsgFunc + repackFunc RepackFunc + unmarshal UnmarshalDispatcher + receiveBuf chan *ConsumeMsgPack + closeRWMutex *sync.RWMutex + streamCancel func() + bufSize int64 + producerLock *sync.RWMutex + consumerLock *sync.Mutex + closed int32 + onceChan sync.Once } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -109,20 +97,7 @@ func NewMqMsgStream(initCtx context.Context, closeRWMutex: &sync.RWMutex{}, closed: 0, } - ctxLog := log.Ctx(initCtx) - stream.forceEnableProduce.Store(false) - stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) - stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) { - value, err := strconv.ParseBool(event.Value) - if err != nil { - ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err)) - return - } - stream.ttMsgEnable.Store(value) - ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce())) - }) - paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent) - ctxLog.Info("Msg Stream state", zap.Bool("can_produce", stream.isEnabledProduce())) + log.Ctx(initCtx).Info("Msg Stream initialized") return stream, nil } @@ -244,7 +219,6 @@ func (ms *mqMsgStream) Close() { ms.client.Close() close(ms.receiveBuf) - paramtable.Get().Unwatch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, ms.configEvent) log.Info("mq msg stream closed") } @@ -273,36 +247,7 @@ func (ms *mqMsgStream) GetProduceChannels() []string { return ms.producerChannels } -func (ms *mqMsgStream) ForceEnableProduce(can bool) { - ms.forceEnableProduce.Store(can) -} - -func (ms *mqMsgStream) isEnabledProduce() bool { - return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool) -} - -func (ms *mqMsgStream) isSkipSystemTT() bool { - return ms.replicateID != "" -} - -// checkReplicateID check the replicate id of the message, return values: isMatch, isReplicate -func (ms *mqMsgStream) checkReplicateID(msg TsMsg) (bool, bool) { - if !ms.isSkipSystemTT() { - return true, false - } - msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase }) - if !ok { - log.Warn("fail to get msg base, please check it", zap.Any("type", msg.Type())) - return false, false - } - return msgBase.GetBase().GetReplicateInfo().GetReplicateID() == ms.replicateID, true -} - func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error { - if !ms.isEnabledProduce() { - log.Ctx(ms.ctx).Warn("can't produce the msg in the backup instance", zap.Stack("stack")) - return merr.ErrDenyProduceMsg - } if msgPack == nil || len(msgPack.Msgs) <= 0 { log.Ctx(ms.ctx).Debug("Warning: Receive empty msgPack") return nil @@ -378,14 +323,7 @@ func (ms *mqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) (map[str if msgPack == nil || len(msgPack.Msgs) <= 0 { return ids, errors.New("empty msgs") } - // Only allow to create collection msg in backup instance - // However, there may be a problem of ts disorder here, but because the start position of the collection only uses offsets, not time, there is no problem for the time being - isCreateCollectionMsg := len(msgPack.Msgs) == 1 && msgPack.Msgs[0].Type() == commonpb.MsgType_CreateCollection - if !ms.isEnabledProduce() && !isCreateCollectionMsg { - log.Ctx(ms.ctx).Warn("can't broadcast the msg in the backup instance", zap.Stack("stack")) - return ids, merr.ErrDenyProduceMsg - } for _, v := range msgPack.Msgs { spanCtx, sp := MsgSpanFromCtx(v.TraceCtx(), v) @@ -754,15 +692,14 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { timeTickMsg = v continue } - if v.GetTimestamp() <= currTs || - v.GetReplicateID() != "" { + if v.GetTimestamp() <= currTs { size += uint64(v.GetSize()) timeTickBuf = append(timeTickBuf, v) } else { tempBuffer = append(tempBuffer, v) } // when drop collection, force to exit the buffer loop - if v.GetType() == commonpb.MsgType_DropCollection || v.GetType() == commonpb.MsgType_Replicate { + if v.GetType() == commonpb.MsgType_DropCollection { containsEndBufferMsg = true } } @@ -1007,10 +944,6 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, } } - // skip the replicate msg because it must have been consumed - if packMsg.GetReplicateID() != "" { - continue - } if packMsg.GetType() == commonpb.MsgType_TimeTick && packMsg.GetTimestamp() >= mp.Timestamp { runLoop = false if time.Since(loopStarTime) > 30*time.Second { diff --git a/pkg/mq/msgstream/msg_for_collection_test.go b/pkg/mq/msgstream/msg_for_collection_test.go index 665baa0d0a..4b7360db9d 100644 --- a/pkg/mq/msgstream/msg_for_collection_test.go +++ b/pkg/mq/msgstream/msg_for_collection_test.go @@ -31,12 +31,11 @@ func TestFlushMsg(t *testing.T) { var msg TsMsg = &FlushMsg{ FlushRequest: &milvuspb.FlushRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_Flush, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionNames: []string{"col1", "col2"}, @@ -70,12 +69,11 @@ func TestLoadCollection(t *testing.T) { var msg TsMsg = &LoadCollectionMsg{ LoadCollectionRequest: &milvuspb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadCollection, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_LoadCollection, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionName: "col1", @@ -109,12 +107,11 @@ func TestReleaseCollection(t *testing.T) { var msg TsMsg = &ReleaseCollectionMsg{ ReleaseCollectionRequest: &milvuspb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ReleaseCollection, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_ReleaseCollection, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionName: "col1", diff --git a/pkg/mq/msgstream/msg_for_database_test.go b/pkg/mq/msgstream/msg_for_database_test.go index ba3ef2333e..0ccf7ca4ed 100644 --- a/pkg/mq/msgstream/msg_for_database_test.go +++ b/pkg/mq/msgstream/msg_for_database_test.go @@ -31,12 +31,11 @@ func TestCreateDatabase(t *testing.T) { var msg TsMsg = &CreateDatabaseMsg{ CreateDatabaseRequest: &milvuspb.CreateDatabaseRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateDatabase, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_CreateDatabase, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", }, @@ -68,12 +67,11 @@ func TestDropDatabase(t *testing.T) { var msg TsMsg = &DropDatabaseMsg{ DropDatabaseRequest: &milvuspb.DropDatabaseRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropDatabase, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_DropDatabase, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", }, @@ -105,12 +103,11 @@ func TestAlterDatabase(t *testing.T) { var msg TsMsg = &AlterDatabaseMsg{ AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_AlterDatabase, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_AlterDatabase, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", Properties: []*commonpb.KeyValuePair{ diff --git a/pkg/mq/msgstream/msg_for_import_test.go b/pkg/mq/msgstream/msg_for_import_test.go index 57dcfe6abe..7bb8af2652 100644 --- a/pkg/mq/msgstream/msg_for_import_test.go +++ b/pkg/mq/msgstream/msg_for_import_test.go @@ -31,12 +31,11 @@ func TestImportMsg(t *testing.T) { var msg TsMsg = &ImportMsg{ ImportMsg: &msgpb.ImportMsg{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Import, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_Import, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", }, diff --git a/pkg/mq/msgstream/msg_for_index_test.go b/pkg/mq/msgstream/msg_for_index_test.go index 068afa8608..85ff8d21d7 100644 --- a/pkg/mq/msgstream/msg_for_index_test.go +++ b/pkg/mq/msgstream/msg_for_index_test.go @@ -31,12 +31,11 @@ func TestCreateIndex(t *testing.T) { var msg TsMsg = &CreateIndexMsg{ CreateIndexRequest: &milvuspb.CreateIndexRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateIndex, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_CreateIndex, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", }, @@ -67,12 +66,11 @@ func TestDropIndex(t *testing.T) { var msg TsMsg = &DropIndexMsg{ DropIndexRequest: &milvuspb.DropIndexRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropIndex, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_DropIndex, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionName: "col1", diff --git a/pkg/mq/msgstream/msg_for_partition_test.go b/pkg/mq/msgstream/msg_for_partition_test.go index caed9a9b92..c063f020f8 100644 --- a/pkg/mq/msgstream/msg_for_partition_test.go +++ b/pkg/mq/msgstream/msg_for_partition_test.go @@ -31,12 +31,11 @@ func TestLoadPartitions(t *testing.T) { msg := &LoadPartitionsMsg{ LoadPartitionsRequest: &milvuspb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadPartitions, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_LoadPartitions, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionName: "col1", @@ -76,12 +75,11 @@ func TestReleasePartitions(t *testing.T) { msg := &ReleasePartitionsMsg{ ReleasePartitionsRequest: &milvuspb.ReleasePartitionsRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ReleasePartitions, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_ReleasePartitions, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, DbName: "unit_db", CollectionName: "col1", diff --git a/pkg/mq/msgstream/msg_for_replicate.go b/pkg/mq/msgstream/msg_for_replicate.go deleted file mode 100644 index 3f6b9699c6..0000000000 --- a/pkg/mq/msgstream/msg_for_replicate.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the LF AI & Data foundation under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package msgstream - -import ( - "google.golang.org/protobuf/proto" - - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" -) - -type ReplicateMsg struct { - BaseMsg - *msgpb.ReplicateMsg -} - -var _ TsMsg = (*ReplicateMsg)(nil) - -func (r *ReplicateMsg) ID() UniqueID { - return r.Base.MsgID -} - -func (r *ReplicateMsg) SetID(id UniqueID) { - r.Base.MsgID = id -} - -func (r *ReplicateMsg) Type() MsgType { - return r.Base.MsgType -} - -func (r *ReplicateMsg) SourceID() int64 { - return r.Base.SourceID -} - -func (r *ReplicateMsg) Marshal(input TsMsg) (MarshalType, error) { - replicateMsg := input.(*ReplicateMsg) - mb, err := proto.Marshal(replicateMsg.ReplicateMsg) - if err != nil { - return nil, err - } - return mb, nil -} - -func (r *ReplicateMsg) Unmarshal(input MarshalType) (TsMsg, error) { - replicateMsg := &msgpb.ReplicateMsg{} - in, err := convertToByteArray(input) - if err != nil { - return nil, err - } - err = proto.Unmarshal(in, replicateMsg) - if err != nil { - return nil, err - } - rr := &ReplicateMsg{ReplicateMsg: replicateMsg} - rr.BeginTimestamp = replicateMsg.GetBase().GetTimestamp() - rr.EndTimestamp = replicateMsg.GetBase().GetTimestamp() - - return rr, nil -} - -func (r *ReplicateMsg) Size() int { - return proto.Size(r.ReplicateMsg) -} diff --git a/pkg/mq/msgstream/msg_for_replicate_test.go b/pkg/mq/msgstream/msg_for_replicate_test.go deleted file mode 100644 index 2e73a391ad..0000000000 --- a/pkg/mq/msgstream/msg_for_replicate_test.go +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the LF AI & Data foundation under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package msgstream - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" -) - -func TestReplicateMsg(t *testing.T) { - var msg TsMsg = &ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, - }, - Database: "unit_db", - }, - } - assert.EqualValues(t, 100, msg.ID()) - msg.SetID(200) - assert.EqualValues(t, 200, msg.ID()) - assert.Equal(t, commonpb.MsgType_Replicate, msg.Type()) - assert.EqualValues(t, 10000, msg.SourceID()) - - msgBytes, err := msg.Marshal(msg) - assert.NoError(t, err) - - var newMsg TsMsg = &ReplicateMsg{} - _, err = newMsg.Unmarshal("1") - assert.Error(t, err) - - newMsg, err = newMsg.Unmarshal(msgBytes) - assert.NoError(t, err) - assert.EqualValues(t, 200, newMsg.ID()) - assert.EqualValues(t, 1000, newMsg.BeginTs()) - assert.EqualValues(t, 1000, newMsg.EndTs()) - - assert.True(t, msg.Size() > 0) -} diff --git a/pkg/mq/msgstream/msg_for_user_role_test.go b/pkg/mq/msgstream/msg_for_user_role_test.go index 51a284295b..3dc1d53ac7 100644 --- a/pkg/mq/msgstream/msg_for_user_role_test.go +++ b/pkg/mq/msgstream/msg_for_user_role_test.go @@ -31,12 +31,11 @@ func TestCreateUser(t *testing.T) { var msg TsMsg = &CreateUserMsg{ CreateCredentialRequest: &milvuspb.CreateCredentialRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateCredential, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_CreateCredential, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Username: "unit_user", Password: "unit_password", @@ -70,12 +69,11 @@ func TestUpdateUser(t *testing.T) { var msg TsMsg = &UpdateUserMsg{ UpdateCredentialRequest: &milvuspb.UpdateCredentialRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_UpdateCredential, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_UpdateCredential, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Username: "unit_user", OldPassword: "unit_old_password", @@ -111,12 +109,11 @@ func TestDeleteUser(t *testing.T) { var msg TsMsg = &DeleteUserMsg{ DeleteCredentialRequest: &milvuspb.DeleteCredentialRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DeleteCredential, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_DeleteCredential, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Username: "unit_user", }, @@ -148,12 +145,11 @@ func TestCreateRole(t *testing.T) { var msg TsMsg = &CreateRoleMsg{ CreateRoleRequest: &milvuspb.CreateRoleRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateRole, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_CreateRole, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Entity: &milvuspb.RoleEntity{ Name: "unit_role", @@ -187,12 +183,11 @@ func TestDropRole(t *testing.T) { var msg TsMsg = &DropRoleMsg{ DropRoleRequest: &milvuspb.DropRoleRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropRole, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_DropRole, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, RoleName: "unit_role", }, @@ -224,12 +219,11 @@ func TestOperateUserRole(t *testing.T) { var msg TsMsg = &OperateUserRoleMsg{ OperateUserRoleRequest: &milvuspb.OperateUserRoleRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_OperateUserRole, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_OperateUserRole, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, RoleName: "unit_role", Username: "unit_user", @@ -265,12 +259,11 @@ func TestOperatePrivilege(t *testing.T) { var msg TsMsg = &OperatePrivilegeMsg{ OperatePrivilegeRequest: &milvuspb.OperatePrivilegeRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_OperatePrivilege, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_OperatePrivilege, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Entity: &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: "unit_role"}, @@ -317,12 +310,11 @@ func TestOperatePrivilegeV2(t *testing.T) { var msg TsMsg = &OperatePrivilegeV2Msg{ OperatePrivilegeV2Request: &milvuspb.OperatePrivilegeV2Request{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_OperatePrivilegeV2, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_OperatePrivilegeV2, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, Grantor: &milvuspb.GrantorEntity{ User: &milvuspb.UserEntity{Name: "unit_user"}, @@ -359,12 +351,11 @@ func TestCreatePrivilegeGroup(t *testing.T) { var msg TsMsg = &CreatePrivilegeGroupMsg{ CreatePrivilegeGroupRequest: &milvuspb.CreatePrivilegeGroupRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreatePrivilegeGroup, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_CreatePrivilegeGroup, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, GroupName: "unit_group", }, @@ -397,12 +388,11 @@ func TestDropPrivilegeGroup(t *testing.T) { var msg TsMsg = &DropPrivilegeGroupMsg{ DropPrivilegeGroupRequest: &milvuspb.DropPrivilegeGroupRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropPrivilegeGroup, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_DropPrivilegeGroup, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, GroupName: "unit_group", }, @@ -434,12 +424,11 @@ func TestOperatePrivilegeGroup(t *testing.T) { var msg TsMsg = &OperatePrivilegeGroupMsg{ OperatePrivilegeGroupRequest: &milvuspb.OperatePrivilegeGroupRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_OperatePrivilegeGroup, - MsgID: 100, - Timestamp: 1000, - SourceID: 10000, - TargetID: 100000, - ReplicateInfo: nil, + MsgType: commonpb.MsgType_OperatePrivilegeGroup, + MsgID: 100, + Timestamp: 1000, + SourceID: 10000, + TargetID: 100000, }, GroupName: "unit_group", Type: milvuspb.OperatePrivilegeGroupType_AddPrivilegesToGroup, diff --git a/pkg/mq/msgstream/msgstream.go b/pkg/mq/msgstream/msgstream.go index 127f4fe3c9..a0026a79f9 100644 --- a/pkg/mq/msgstream/msgstream.go +++ b/pkg/mq/msgstream/msgstream.go @@ -80,7 +80,6 @@ type ConsumeMsg interface { GetID() int64 GetCollectionID() string GetType() commonpb.MsgType - GetReplicateID() string SetTraceCtx(ctx context.Context) Unmarshal(unmarshalDispatcher UnmarshalDispatcher) (TsMsg, error) @@ -124,15 +123,6 @@ func (m *UnmarshaledMsg) GetSize() int { return m.msg.Size() } -func (m *UnmarshaledMsg) GetReplicateID() string { - msgBase, ok := m.msg.(interface{ GetBase() *commonpb.MsgBase }) - if !ok { - log.Warn("fail to get msg base, please check it", zap.Any("type", m.msg.Type())) - return "" - } - return msgBase.GetBase().GetReplicateInfo().GetReplicateID() -} - func (m *UnmarshaledMsg) SetPosition(pos *msgpb.MsgPosition) { m.msg.SetPosition(pos) } @@ -159,7 +149,6 @@ type MarshaledMsg struct { timestamp uint64 vchannel string collectionID string - replicateID string traceCtx context.Context } @@ -195,10 +184,6 @@ func (m *MarshaledMsg) GetSize() int { return len(m.msg.Payload()) } -func (m *MarshaledMsg) GetReplicateID() string { - return m.replicateID -} - func (m *MarshaledMsg) SetPosition(pos *msgpb.MsgPosition) { m.pos = pos } @@ -269,11 +254,6 @@ func NewMarshaledMsg(msg common.Message, group string) (ConsumeMsg, error) { msgType: msgType, vchannel: vchannel, } - - replicateID, ok := properties[common.ReplicateIDTypeKey] - if ok { - result.replicateID = replicateID - } return result, nil } @@ -306,53 +286,11 @@ type MsgStream interface { Chan() <-chan *ConsumeMsgPack GetUnmarshalDispatcher() UnmarshalDispatcher // Seek consume message from the specified position - // includeCurrentMsg indicates whether to consume the current message, and in the milvus system, it should be always false + // includeCurrentMsg indicates whether to consume the current message. Seek(ctx context.Context, msgPositions []*MsgPosition, includeCurrentMsg bool) error GetLatestMsgID(channel string) (MessageID, error) CheckTopicValid(channel string) error - - ForceEnableProduce(can bool) -} - -type ReplicateConfig struct { - ReplicateID string - CheckFunc CheckReplicateMsgFunc -} - -type CheckReplicateMsgFunc func(*ReplicateMsg) bool - -func GetReplicateConfig(replicateID, dbName, colName string) *ReplicateConfig { - if replicateID == "" { - return nil - } - replicateConfig := &ReplicateConfig{ - ReplicateID: replicateID, - CheckFunc: func(msg *ReplicateMsg) bool { - if !msg.GetIsEnd() { - return false - } - log.Info("check replicate msg", - zap.String("replicateID", replicateID), - zap.String("dbName", dbName), - zap.String("colName", colName), - zap.Any("msg", msg)) - if msg.GetIsCluster() { - return true - } - return msg.GetDatabase() == dbName && (msg.GetCollection() == colName || msg.GetCollection() == "") - }, - } - return replicateConfig -} - -func GetReplicateID(msg TsMsg) string { - msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase }) - if !ok { - log.Warn("fail to get msg base, please check it", zap.Any("type", msg.Type())) - return "" - } - return msgBase.GetBase().GetReplicateInfo().GetReplicateID() } func GetTimestamp(msg TsMsg) uint64 { @@ -364,10 +302,6 @@ func GetTimestamp(msg TsMsg) uint64 { return msgBase.GetBase().GetTimestamp() } -func MatchReplicateID(msg TsMsg, replicateID string) bool { - return GetReplicateID(msg) == replicateID -} - type Factory interface { NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) diff --git a/pkg/mq/msgstream/msgstream_util.go b/pkg/mq/msgstream/msgstream_util.go index 1c9f2553ab..266c819d6b 100644 --- a/pkg/mq/msgstream/msgstream_util.go +++ b/pkg/mq/msgstream/msgstream_util.go @@ -154,9 +154,6 @@ func GetPorperties(msg TsMsg) map[string]string { msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase }) if ok { properties[common.TimestampTypeKey] = strconv.FormatUint(msgBase.GetBase().GetTimestamp(), 10) - if msgBase.GetBase().GetReplicateInfo() != nil { - properties[common.ReplicateIDTypeKey] = msgBase.GetBase().GetReplicateInfo().GetReplicateID() - } } return properties diff --git a/pkg/mq/msgstream/msgstream_util_test.go b/pkg/mq/msgstream/msgstream_util_test.go index 4079272a29..cea1e2d599 100644 --- a/pkg/mq/msgstream/msgstream_util_test.go +++ b/pkg/mq/msgstream/msgstream_util_test.go @@ -24,8 +24,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/v2/mq/common" ) @@ -82,90 +80,3 @@ func TestGetLatestMsgID(t *testing.T) { assert.Equal(t, []byte("mock"), id) } } - -func TestReplicateConfig(t *testing.T) { - t.Run("get replicate id", func(t *testing.T) { - { - msg := &InsertMsg{ - InsertRequest: &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{ - ReplicateInfo: &commonpb.ReplicateInfo{ - ReplicateID: "local", - }, - }, - }, - } - assert.Equal(t, "local", GetReplicateID(msg)) - assert.True(t, MatchReplicateID(msg, "local")) - } - { - msg := &InsertMsg{ - InsertRequest: &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{}, - }, - } - assert.Equal(t, "", GetReplicateID(msg)) - assert.False(t, MatchReplicateID(msg, "local")) - } - { - msg := &MarshalFailTsMsg{} - assert.Equal(t, "", GetReplicateID(msg)) - } - }) - - t.Run("get replicate config", func(t *testing.T) { - { - assert.Nil(t, GetReplicateConfig("", "", "")) - } - { - rc := GetReplicateConfig("local", "db", "col") - assert.Equal(t, "local", rc.ReplicateID) - checkFunc := rc.CheckFunc - assert.False(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{}, - })) - assert.True(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - IsCluster: true, - }, - })) - assert.False(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - Database: "db1", - }, - })) - assert.True(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - Database: "db", - }, - })) - assert.False(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - Database: "db", - Collection: "col1", - }, - })) - } - { - rc := GetReplicateConfig("local", "db", "col") - checkFunc := rc.CheckFunc - assert.True(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - Database: "db", - }, - })) - assert.False(t, checkFunc(&ReplicateMsg{ - ReplicateMsg: &msgpb.ReplicateMsg{ - IsEnd: true, - Database: "db1", - Collection: "col1", - }, - })) - } - }) -} diff --git a/pkg/mq/msgstream/unmarshal.go b/pkg/mq/msgstream/unmarshal.go index a5e74a5f3e..deb5479b77 100644 --- a/pkg/mq/msgstream/unmarshal.go +++ b/pkg/mq/msgstream/unmarshal.go @@ -92,7 +92,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { createPrivilegeGroupMsg := CreatePrivilegeGroupMsg{} dropPrivilegeGroupMsg := DropPrivilegeGroupMsg{} operatePrivilegeGroupMsg := OperatePrivilegeGroupMsg{} - replicateMsg := ReplicateMsg{} importMsg := ImportMsg{} createAliasMsg := CreateAliasMsg{} @@ -134,7 +133,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { p.TempMap[commonpb.MsgType_CreatePrivilegeGroup] = createPrivilegeGroupMsg.Unmarshal p.TempMap[commonpb.MsgType_DropPrivilegeGroup] = dropPrivilegeGroupMsg.Unmarshal p.TempMap[commonpb.MsgType_OperatePrivilegeGroup] = operatePrivilegeGroupMsg.Unmarshal - p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal p.TempMap[commonpb.MsgType_CreateAlias] = createAliasMsg.Unmarshal p.TempMap[commonpb.MsgType_DropAlias] = dropAliasMsg.Unmarshal diff --git a/pkg/util/funcutil/func_test.go b/pkg/util/funcutil/func_test.go index 6a31add6a0..61d75a1211 100644 --- a/pkg/util/funcutil/func_test.go +++ b/pkg/util/funcutil/func_test.go @@ -949,12 +949,6 @@ func TestNumRowsWithSchema(t *testing.T) { func TestChannelConvert(t *testing.T) { t.Run("is physical channel", func(t *testing.T) { - { - channel := "by-dev-replicate-msg" - ok := IsPhysicalChannel(channel) - assert.True(t, ok) - } - { channel := "by-dev-rootcoord-dml_2" ok := IsPhysicalChannel(channel) @@ -980,12 +974,6 @@ func TestChannelConvert(t *testing.T) { physicalChannel := ToPhysicalChannel(channel) assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel) } - - { - channel := "by-dev-replicate-msg" - physicalChannel := ToPhysicalChannel(channel) - assert.Equal(t, "by-dev-replicate-msg", physicalChannel) - } }) t.Run("get virtual channel", func(t *testing.T) { diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 9b9ae8f38e..2c9efc1fb4 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -70,8 +70,9 @@ var ( ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false) - ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false) - ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false) + // Deprecated, keep it only for reserving the error code + ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false) + ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false) @@ -149,7 +150,8 @@ var ( ErrMqTopicNotFound = newMilvusError("topic not found", 1300, false) ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false) ErrMqInternal = newMilvusError("message queue internal error", 1302, false) - ErrDenyProduceMsg = newMilvusError("deny to write the message to mq", 1303, false) + // Deprecated, keep it only for reserving the error code + ErrDenyProduceMsg = newMilvusError("deny to write the message to mq", 1303, false) // Privilege related // this operation is denied because the user not authorized, user need to login in first @@ -178,7 +180,7 @@ var ( ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false) ErrHTTPRateLimit = newMilvusError("request is rejected by limiter", 1807, true) - // replicate related + // Deprecated, legacy replicate related errors, keep them only for reserving the error code ErrDenyReplicateMessage = newMilvusError("deny to use the replicate message in the normal instance", 1900, false) ErrInvalidMsgBytes = newMilvusError("invalid replicate msg bytes", 1901, false) ErrNoAssignSegmentID = newMilvusError("no assign segment id", 1902, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 1dda3d0211..d7a99f91e9 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -322,10 +322,6 @@ func WrapErrAsInputErrorWhen(err error, targets ...milvusError) error { return err } -func WrapErrCollectionReplicateMode(operation string) error { - return wrapFields(ErrCollectionReplicateMode, value("operation", operation)) -} - func GetErrorType(err error) ErrorType { if merr, ok := err.(milvusError); ok { return merr.errType diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 16886d559d..f77a32a727 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -210,7 +210,6 @@ type commonConfig struct { RootCoordTimeTick ParamItem `refreshable:"true"` RootCoordStatistics ParamItem `refreshable:"true"` RootCoordDml ParamItem `refreshable:"false"` - ReplicateMsgChannel ParamItem `refreshable:"false"` QueryCoordTimeTick ParamItem `refreshable:"true"` @@ -298,14 +297,12 @@ type commonConfig struct { StorageZstdConcurrency ParamItem `refreshable:"false"` StorageReadRetryAttempts ParamItem `refreshable:"true"` - TTMsgEnabled ParamItem `refreshable:"true"` TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` BloomFilterType ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"` BloomFilterApplyBatchSize ParamItem `refreshable:"true"` PanicWhenPluginFail ParamItem `refreshable:"false"` - CollectionReplicateEnable ParamItem `refreshable:"true"` UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` @@ -413,17 +410,6 @@ It is recommended to change this parameter before starting Milvus for the first } p.RootCoordDml.Init(base.mgr) - p.ReplicateMsgChannel = ParamItem{ - Key: "msgChannel.chanNamePrefix.replicateMsg", - DefaultValue: "replicate-msg", - Version: "2.3.2", - FallbackKeys: []string{"common.chanNamePrefix.replicateMsg"}, - PanicIfEmpty: true, - Formatter: chanNamePrefix, - Export: true, - } - p.ReplicateMsgChannel.Init(base.mgr) - p.QueryCoordTimeTick = ParamItem{ Key: "msgChannel.chanNamePrefix.queryTimeTick", DefaultValue: "queryTimeTick", @@ -1076,26 +1062,6 @@ The default value is 1, which is enough for most cases.`, } p.StorageReadRetryAttempts.Init(base.mgr) - p.TTMsgEnabled = ParamItem{ - Key: "common.ttMsgEnabled", - Version: "2.3.2", - DefaultValue: "true", - Doc: `Whether to disable the internal time messaging mechanism for the system. -If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches. -This helps Milvus-CDC synchronize incremental data`, - Export: true, - } - p.TTMsgEnabled.Init(base.mgr) - - p.CollectionReplicateEnable = ParamItem{ - Key: "common.collectionReplicateEnable", - Version: "2.4.16", - DefaultValue: "false", - Doc: `Whether to enable collection replication.`, - Export: true, - } - p.CollectionReplicateEnable.Init(base.mgr) - p.TraceLogMode = ParamItem{ Key: "common.traceLogMode", Version: "2.3.4",