enhance: Remove legacy cdc/replication (#46603)

issue: https://github.com/milvus-io/milvus/issues/44123

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: legacy in-cluster CDC/replication plumbing
(ReplicateMsg types, ReplicateID-based guards and flags) is obsolete —
the system relies on standard msgstream positions, subPos/end-ts
semantics and timetick ordering as the single source of truth for
message ordering and skipping, so replication-specific
channels/types/guards can be removed safely.

- Removed/simplified logic (what and why): removed replication feature
flags and params (ReplicateMsgChannel, TTMsgEnabled,
CollectionReplicateEnable), ReplicateMsg type and its tests, ReplicateID
constants/helpers and MergeProperties hooks, ReplicateConfig and its
propagation (streamPipeline, StreamConfig, dispatcher, target),
replicate-aware dispatcher/pipeline branches, and replicate-mode
pre-checks/timestamp-allocation in proxy tasks — these implemented a
redundant alternate “replicate-mode” pathway that duplicated
position/end-ts and timetick logic.

- Why this does NOT cause data loss or regression (concrete code paths):
no persistence or core write paths were removed — proxy PreExecute flows
(internal/proxy/task_*.go) still perform the same schema/ID/size
validations and then follow the normal non-replicate execution path;
dispatcher and pipeline continue to use position/subPos and
pullback/end-ts in Seek/grouping (pkg/mq/msgdispatcher/dispatcher.go,
internal/util/pipeline/stream_pipeline.go), so skipping and ordering
behavior remains unchanged; timetick emission in rootcoord
(sendMinDdlTsAsTt) is now ungated (no silent suppression), preserving or
increasing timetick delivery rather than removing it.

- PR type and net effect: Enhancement/Refactor — removes deprecated
replication API surface (types, helpers, config, tests) and replication
branches, simplifies public APIs and constructor signatures, and reduces
surface area for future maintenance while keeping DML/DDL persistence,
ordering, and seek semantics intact.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-12-30 14:53:21 +08:00 committed by GitHub
parent b7761d67a3
commit b18ebd9468
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
58 changed files with 215 additions and 1657 deletions

View File

@ -841,7 +841,6 @@ msgChannel:
# Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data.
# It is recommended to change this parameter before starting Milvus for the first time.
rootCoordDml: rootcoord-dml
replicateMsg: replicate-msg
# Sub-name prefix of the message channel where the query node publishes time tick messages.
# The complete channel name prefix is ${msgChannel.chanNamePrefix.cluster}-${msgChannel.chanNamePrefix.queryTimeTick}
# Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data.
@ -1024,16 +1023,11 @@ common:
enabled: false # enable split by average size policy in storage v2
threshold: 1024 # split by average size policy threshold(in bytes) in storage v2
useLoonFFI: false
# Whether to disable the internal time messaging mechanism for the system.
# If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches.
# This helps Milvus-CDC synchronize incremental data
ttMsgEnabled: true
traceLogMode: 0 # trace request info
bloomFilterSize: 100000 # bloom filter initial size
bloomFilterType: BlockedBloomFilter # bloom filter type, support BasicBloomFilter and BlockedBloomFilter
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
bloomFilterApplyBatchSize: 1000 # batch size when to apply pk to bloom filter
collectionReplicateEnable: false # Whether to enable collection replication.
usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction

View File

@ -129,7 +129,3 @@ func (m *delegatorMsgstreamAdaptor) GetLatestMsgID(channel string) (msgstream.Me
func (m *delegatorMsgstreamAdaptor) CheckTopicValid(channel string) error {
panic("should never be called")
}
func (m *delegatorMsgstreamAdaptor) ForceEnableProduce(can bool) {
panic("should never be called")
}

View File

@ -161,14 +161,4 @@ func TestDelegatorMsgstreamAdaptor(t *testing.T) {
}()
_ = adaptor.CheckTopicValid("channel1")
})
// Test ForceEnableProduce
t.Run("ForceEnableProduce", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("ForceEnableProduce should panic but did not")
}
}()
adaptor.ForceEnableProduce(true)
})
}

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
@ -76,19 +75,11 @@ func createNewInputFromDispatcher(initCtx context.Context,
start = time.Now()
)
replicateID, _ := pkgcommon.GetReplicateID(schema.GetProperties())
if replicateID == "" {
log.Info("datanode consume without replicateID, try to get replicateID from dbProperties", zap.Any("dbProperties", dbProperties))
replicateID, _ = pkgcommon.GetReplicateID(dbProperties)
}
replicateConfig := msgstream.GetReplicateConfig(replicateID, schema.GetDbName(), schema.GetName())
if seekPos != nil && len(seekPos.MsgID) != 0 {
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: seekPos,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: replicateConfig,
VChannel: vchannel,
Pos: seekPos,
SubPos: common.SubscriptionPositionUnknown,
})
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
@ -105,10 +96,9 @@ func createNewInputFromDispatcher(initCtx context.Context,
}
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: nil,
SubPos: common.SubscriptionPositionEarliest,
ReplicateConfig: replicateConfig,
VChannel: vchannel,
Pos: nil,
SubPos: common.SubscriptionPositionEarliest,
})
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))

View File

@ -62,9 +62,6 @@ func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([
type mockTtMsgStream struct{}
func (mtm *mockTtMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
}
func (mtm *mockTtMsgStream) Close() {}
func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
@ -107,9 +104,6 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
return nil
}
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
}
func TestNewDmInputNode(t *testing.T) {
assert.Panics(t, func() {
newDmInputNode(&nodeConfig{

View File

@ -99,7 +99,6 @@ type collectionInfo struct {
createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
replicateID string
updateTimestamp uint64
collectionTTL uint64
numPartitions int64
@ -489,7 +488,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
m.collInfo[database] = make(map[string]*collectionInfo)
}
replicateID, _ := common.GetReplicateID(collection.Properties)
m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
@ -498,7 +496,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
vChannels: collection.VirtualChannelNames,

View File

@ -10,10 +10,9 @@ import (
type mockMsgStream struct {
msgstream.MsgStream
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
forceEnableProduce func(bool)
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
}
func (m *mockMsgStream) AsProducer(ctx context.Context, producers []string) {
@ -34,15 +33,6 @@ func (m *mockMsgStream) Close() {
}
}
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
if m.forceEnableProduce != nil {
m.forceEnableProduce(enabled)
}
}
func (m *mockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
}
func newMockMsgStream() *mockMsgStream {
return &mockMsgStream{}
}

View File

@ -314,12 +314,6 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
return nil
}
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
}
func (ms *simpleMockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
}
func newSimpleMockMsgStream() *simpleMockMsgStream {
return &simpleMockMsgStream{
msgChan: make(chan *msgstream.ConsumeMsgPack, 1024),

View File

@ -81,14 +81,3 @@ func defaultInsertRepackFunc(
}
return pack, nil
}
func replicatePackFunc(
tsMsgs []msgstream.TsMsg,
hashKeys [][]int32,
) (map[int32]*msgstream.MsgPack, error) {
return map[int32]*msgstream.MsgPack{
0: {
Msgs: tsMsgs,
},
}, nil
}

View File

@ -37,7 +37,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@ -1366,26 +1365,6 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
"can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first")
}
}
_, ok := common.IsReplicateEnabled(t.Properties)
if ok {
return merr.WrapErrParameterInvalidMsg("can't set the replicate.id property")
}
endTS, ok := common.GetReplicateEndTS(t.Properties)
if ok && collBasicInfo.replicateID != "" {
allocResp, err := t.mixCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
Count: 1,
BlockTimestamp: endTS,
})
if err = merr.CheckRPCCall(allocResp, err); err != nil {
return merr.WrapErrServiceInternal("alloc timestamp failed", err.Error())
}
if allocResp.GetTimestamp() <= endTS {
return merr.WrapErrServiceInternal("alter collection: alloc timestamp failed, timestamp is not greater than endTS",
fmt.Sprintf("timestamp = %d, endTS = %d", allocResp.GetTimestamp(), endTS))
}
}
return nil
}

View File

@ -2,7 +2,6 @@ package proxy
import (
"context"
"fmt"
"go.uber.org/zap"
@ -282,33 +281,6 @@ func (t *alterDatabaseTask) PreExecute(ctx context.Context) error {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", userDefinedTimezone)
}
}
_, ok := common.GetReplicateID(t.Properties)
if ok {
return merr.WrapErrParameterInvalidMsg("can't set the replicate id property in alter database request")
}
endTS, ok := common.GetReplicateEndTS(t.Properties)
if !ok { // not exist replicate end ts property
return nil
}
cacheInfo, err := globalMetaCache.GetDatabaseInfo(ctx, t.DbName)
if err != nil {
return err
}
oldReplicateEnable, _ := common.IsReplicateEnabled(cacheInfo.properties)
if !oldReplicateEnable { // old replicate enable is false
return merr.WrapErrParameterInvalidMsg("can't set the replicate end ts property in alter database request when db replicate is disabled")
}
allocResp, err := t.mixCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
Count: 1,
BlockTimestamp: endTS,
})
if err = merr.CheckRPCCall(allocResp, err); err != nil {
return merr.WrapErrServiceInternal("alloc timestamp failed", err.Error())
}
if allocResp.GetTimestamp() <= endTS {
return merr.WrapErrServiceInternal("alter database: alloc timestamp failed, timestamp is not greater than endTS",
fmt.Sprintf("timestamp = %d, endTS = %d", allocResp.GetTimestamp(), endTS))
}
return nil
}

View File

@ -5,7 +5,6 @@ import (
"strings"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/metadata"
@ -202,163 +201,6 @@ func TestAlterDatabase(t *testing.T) {
assert.Nil(t, err1)
}
func TestAlterDatabaseTaskForReplicateProperty(t *testing.T) {
rc := mocks.NewMockMixCoordClient(t)
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
t.Run("replicate id", func(t *testing.T) {
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.MmapEnabledKey,
Value: "true",
},
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("fail to get database info", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once()
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1000",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("not enable replicate", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
properties: []*commonpb.KeyValuePair{},
}, nil).Once()
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1000",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("fail to alloc ts", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
}, nil).Once()
rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once()
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1000",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("alloc wrong ts", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
}, nil).Once()
rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
Status: merr.Success(),
Timestamp: 999,
}, nil).Once()
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1000",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("alloc wrong ts", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
}, nil).Once()
rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
Status: merr.Success(),
Timestamp: 1001,
}, nil).Once()
task := &alterDatabaseTask{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{},
DbName: "test_alter_database",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1000",
},
},
},
mixCoord: rc,
}
err := task.PreExecute(context.Background())
assert.NoError(t, err)
})
}
func TestDescribeDatabaseTask(t *testing.T) {
rc := mocks.NewMockMixCoordClient(t)

View File

@ -283,15 +283,6 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound))
}
replicateID, err := GetReplicateID(ctx, dr.req.GetDbName(), collName)
if err != nil {
log.Warn("get replicate info failed", zap.String("collectionName", collName), zap.Error(err))
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
if replicateID != "" {
return merr.WrapErrCollectionReplicateMode("delete")
}
dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName)
if err != nil {
return ErrWithLog(log, "Failed to get collection schema", err)

View File

@ -455,37 +455,12 @@ func (s *DeleteRunnerSuite) TestInitFailure() {
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock GetCollectionSchema err"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("deny delete in the replicate mode", func() {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
}}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(&collectionInfo{replicateID: "local-mac"}, nil)
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("fail get replicateID", func() {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
}}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock getCollectionInfo err"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("create plan failed", func() {
dr := deleteRunner{

View File

@ -26,14 +26,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/util/uniquegenerator"
)
func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordClient, *msgstream.MockMsgStream, context.Context) {
func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordClient, context.Context) {
ctx := context.Background()
mixCoord := mocks.NewMockMixCoordClient(t)
replicateMsgStream := msgstream.NewMockMsgStream(t)
task := &flushAllTask{
baseTask: baseTask{},
@ -50,22 +48,20 @@ func createTestFlushAllTask(t *testing.T) (*flushAllTask, *mocks.MockMixCoordCli
mixCoord: mixCoord,
}
return task, mixCoord, replicateMsgStream, ctx
return task, mixCoord, ctx
}
func TestFlushAllTaskTraceCtx(t *testing.T) {
task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t)
task, mixCoord, ctx := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
traceCtx := task.TraceCtx()
assert.Equal(t, ctx, traceCtx)
}
func TestFlushAllTaskID(t *testing.T) {
task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t)
task, mixCoord, _ := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
// Test getting ID
originalID := task.ID()
@ -78,27 +74,24 @@ func TestFlushAllTaskID(t *testing.T) {
}
func TestFlushAllTaskName(t *testing.T) {
task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t)
task, mixCoord, _ := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
name := task.Name()
assert.Equal(t, FlushAllTaskName, name)
}
func TestFlushAllTaskType(t *testing.T) {
task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t)
task, mixCoord, _ := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
msgType := task.Type()
assert.Equal(t, commonpb.MsgType_Flush, msgType)
}
func TestFlushAllTaskTimestampMethods(t *testing.T) {
task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t)
task, mixCoord, _ := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
originalTs := task.BeginTs()
assert.Equal(t, originalTs, task.EndTs())
@ -129,8 +122,7 @@ func TestFlushAllTaskOnEnqueue(t *testing.T) {
assert.Equal(t, commonpb.MsgType_Flush, task.Base.MsgType)
// Test with existing Base
task, _, replicateMsgStream, _ := createTestFlushAllTask(t)
defer replicateMsgStream.AssertExpectations(t)
task, _, _ = createTestFlushAllTask(t)
err = task.OnEnqueue()
assert.NoError(t, err)
@ -138,18 +130,16 @@ func TestFlushAllTaskOnEnqueue(t *testing.T) {
}
func TestFlushAllTaskPreExecute(t *testing.T) {
task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t)
task, mixCoord, ctx := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
err := task.PreExecute(ctx)
assert.NoError(t, err)
}
func TestFlushAllTaskPostExecute(t *testing.T) {
task, mixCoord, replicateMsgStream, ctx := createTestFlushAllTask(t)
task, mixCoord, ctx := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
err := task.PostExecute(ctx)
assert.NoError(t, err)
@ -159,9 +149,8 @@ func TestFlushAllTaskImplementsTaskInterface(t *testing.T) {
// Verify that flushAllTask implements the task interface
var _ task = (*flushAllTask)(nil)
task, mixCoord, replicateMsgStream, _ := createTestFlushAllTask(t)
task, mixCoord, _ := createTestFlushAllTask(t)
defer mixCoord.AssertExpectations(t)
defer replicateMsgStream.AssertExpectations(t)
// Test all interface methods are accessible
assert.NotNil(t, task.TraceCtx)

View File

@ -124,15 +124,6 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
}
replicateID, err := GetReplicateID(it.ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("get replicate id failed", zap.String("collectionName", collectionName), zap.Error(err))
return merr.WrapErrAsInputError(err)
}
if replicateID != "" {
return merr.WrapErrCollectionReplicateMode("insert")
}
collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))

View File

@ -405,11 +405,6 @@ func TestInsertTask_KeepUserPK_WhenAllowInsertAutoIDTrue(t *testing.T) {
mock.Anything,
).Return(&collectionInfo{schema: info}, nil)
cache.On("GetDatabaseInfo",
mock.Anything,
mock.Anything,
).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil)
globalMetaCache = cache
err = task.PreExecute(context.Background())
@ -557,11 +552,6 @@ func TestInsertTask_Function(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(&collectionInfo{schema: info}, nil)
cache.On("GetDatabaseInfo",
mock.Anything,
mock.Anything,
).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil)
globalMetaCache = cache
err = task.PreExecute(ctx)
assert.NoError(t, err)
@ -589,7 +579,6 @@ func TestInsertTaskForSchemaMismatch(t *testing.T) {
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := it.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)

View File

@ -47,7 +47,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@ -4743,139 +4742,6 @@ func TestTaskPartitionKeyIsolation(t *testing.T) {
})
}
func TestAlterCollectionForReplicateProperty(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
replicateID: "local-mac-1",
}, nil).Maybe()
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(1, nil).Maybe()
mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{}, nil).Maybe()
globalMetaCache = mockCache
ctx := context.Background()
mockRootcoord := mocks.NewMockMixCoordClient(t)
t.Run("invalid replicate id", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "xxxxx",
},
},
},
mixCoord: mockRootcoord,
}
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("empty replicate id", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
CollectionName: "test",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "",
},
},
},
mixCoord: mockRootcoord,
}
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("fail to alloc ts", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
CollectionName: "test",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "100",
},
},
},
mixCoord: mockRootcoord,
}
mockRootcoord.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once()
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("alloc wrong ts", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
CollectionName: "test",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "100",
},
},
},
mixCoord: mockRootcoord,
}
mockRootcoord.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
Status: merr.Success(),
Timestamp: 99,
}, nil).Once()
err := task.PreExecute(ctx)
assert.Error(t, err)
})
}
func TestInsertForReplicate(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
t.Run("get replicate id fail", func(t *testing.T) {
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once()
task := &insertTask{
insertMsg: &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{
CollectionName: "foo",
},
},
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
t.Run("insert with replicate id", func(t *testing.T) {
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
schema: &schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-mac",
},
},
},
},
replicateID: "local-mac",
}, nil).Once()
task := &insertTask{
insertMsg: &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{
CollectionName: "foo",
},
},
}
err := task.PreExecute(context.Background())
assert.Error(t, err)
})
}
func TestAlterCollectionFieldCheckLoaded(t *testing.T) {
qc := NewMixCoordMock()
InitMetaCache(context.Background(), qc)

View File

@ -1035,15 +1035,6 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
Timestamp: it.EndTs(),
}
replicateID, err := GetReplicateID(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("get replicate info failed", zap.String("collectionName", collectionName), zap.Error(err))
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
if replicateID != "" {
return merr.WrapErrCollectionReplicateMode("upsert")
}
// check collection exists
collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
if err != nil {

View File

@ -20,7 +20,6 @@ import (
"testing"
"github.com/bytedance/mockey"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -340,40 +339,6 @@ func TestUpsertTask(t *testing.T) {
})
}
func TestUpsertTaskForReplicate(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
ctx := context.Background()
t.Run("fail to get collection info", func(t *testing.T) {
ut := upsertTask{
ctx: ctx,
req: &milvuspb.UpsertRequest{
CollectionName: "col-0",
},
}
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("foo")).Once()
err := ut.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("replicate mode", func(t *testing.T) {
ut := upsertTask{
ctx: ctx,
req: &milvuspb.UpsertRequest{
CollectionName: "col-0",
},
}
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
replicateID: "local-mac",
}, nil).Once()
err := ut.PreExecute(ctx)
assert.Error(t, err)
})
}
func TestUpsertTask_Function(t *testing.T) {
paramtable.Init()
paramtable.Get().CredentialCfg.Credential.GetFunc = func() map[string]string {
@ -537,7 +502,6 @@ func TestUpsertTaskForSchemaMismatch(t *testing.T) {
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := ut.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
@ -965,8 +929,6 @@ func TestUpdateTask_PreExecute_Success(t *testing.T) {
// Setup mocks
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{
@ -1002,27 +964,10 @@ func TestUpdateTask_PreExecute_Success(t *testing.T) {
})
}
func TestUpdateTask_PreExecute_ReplicateIDError(t *testing.T) {
mockey.PatchConvey("TestUpdateTask_PreExecute_ReplicateIDError", t, func() {
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("replica1", nil).Build()
task := createTestUpdateTask()
err := task.PreExecute(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), "can't operate on the collection under standby mode")
})
}
func TestUpdateTask_PreExecute_GetCollectionIDError(t *testing.T) {
mockey.PatchConvey("TestUpdateTask_PreExecute_GetCollectionIDError", t, func() {
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
expectedErr := merr.WrapErrCollectionNotFound("test_collection")
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(0), expectedErr).Build()
@ -1038,7 +983,6 @@ func TestUpdateTask_PreExecute_PartitionKeyModeError(t *testing.T) {
mockey.PatchConvey("TestUpdateTask_PreExecute_PartitionKeyModeError", t, func() {
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{
updateTimestamp: 12345,
@ -1061,7 +1005,6 @@ func TestUpdateTask_PreExecute_InvalidNumRows(t *testing.T) {
mockey.PatchConvey("TestUpdateTask_PreExecute_InvalidNumRows", t, func() {
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{
updateTimestamp: 12345,
@ -1087,7 +1030,6 @@ func TestUpdateTask_PreExecute_QueryPreExecuteError(t *testing.T) {
mockey.PatchConvey("TestUpdateTask_PreExecute_QueryPreExecuteError", t, func() {
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{
updateTimestamp: 12345,
@ -1695,7 +1637,6 @@ func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) {
mockey.PatchConvey("test nullable field", t, func() {
// Setup mocks using mockey
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build()
@ -1814,7 +1755,6 @@ func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) {
mockey.PatchConvey("test non-nullable field", t, func() {
// Setup mocks using mockey
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build()

View File

@ -2655,25 +2655,6 @@ func GetFailedResponse(req any, err error) any {
return nil
}
func GetReplicateID(ctx context.Context, database, collectionName string) (string, error) {
if globalMetaCache == nil {
return "", merr.WrapErrServiceUnavailable("internal: Milvus Proxy is not ready yet. please wait")
}
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, database, collectionName, 0)
if err != nil {
return "", err
}
if colInfo.replicateID != "" {
return colInfo.replicateID, nil
}
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, database)
if err != nil {
return "", err
}
replicateID, _ := common.GetReplicateID(dbInfo.properties)
return replicateID, nil
}
func GetFunctionOutputFields(collSchema *schemapb.CollectionSchema) []string {
fields := make([]string, 0)
for _, fSchema := range collSchema.Functions {

View File

@ -19,9 +19,7 @@ package pipeline
import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -53,16 +51,11 @@ func NewPipeLine(
delegator delegator.ShardDelegator,
) (Pipeline, error) {
collectionID := collection.ID()
replicateID, _ := common.GetReplicateID(collection.Schema().GetProperties())
if replicateID == "" {
replicateID, _ = common.GetReplicateID(collection.GetDBProperties())
}
replicateConfig := msgstream.GetReplicateConfig(replicateID, collection.GetDBName(), collection.Schema().Name)
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
p := &pipeline{
collectionID: collectionID,
StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel, replicateConfig),
StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel),
}
filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength)

View File

@ -24,13 +24,11 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks/util/mock_segcore"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
@ -104,12 +102,6 @@ func (suite *PipelineTestSuite) TestBasic() {
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
DbProperties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
})
suite.Require().NoError(err)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)

View File

@ -21,7 +21,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks/util/mock_segcore"
"github.com/milvus-io/milvus/pkg/v2/common"
@ -43,12 +42,6 @@ func (s *CollectionManagerSuite) SetupTest() {
schema := mock_segcore.GenTestCollectionSchema("collection_1", schemapb.DataType_Int64, false)
err := s.cm.PutOrRef(1, schema, mock_segcore.GenTestIndexMeta(1, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
DbProperties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
})
s.Require().NoError(err)
}

View File

@ -1121,12 +1121,6 @@ func TestCreateCollectionTask_Prepare_WithProperty(t *testing.T) {
meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, mock.Anything).Return(&model.Database{
Name: "foo",
ID: 1,
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
}, nil).Twice()
meta.EXPECT().ListAllAvailCollections(mock.Anything).Return(map[int64][]int64{
util.DefaultDBID: {1, 2},

View File

@ -159,14 +159,6 @@ func (c *DDLCallback) alterDatabaseV1AckCallback(ctx context.Context, result mes
}
func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
_, existEndTS := common.GetReplicateEndTS(updatedProps)
if existEndTS {
updatedProps = append(updatedProps, &commonpb.KeyValuePair{
Key: common.ReplicateIDKey,
Value: "",
})
}
props := make(map[string]string)
for _, prop := range oldProps {
props[prop.Key] = prop.Value

View File

@ -183,9 +183,6 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error {
}
func (c *Core) sendMinDdlTsAsTt() {
if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() {
return
}
log := log.Ctx(c.ctx)
code := c.GetStateCode()
if code != commonpb.StateCode_Healthy {

View File

@ -1179,10 +1179,6 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) {
c.UpdateStateCode(commonpb.StateCode_Healthy)
_ = paramtable.Get().Save(paramtable.Get().CommonCfg.TTMsgEnabled.Key, "false")
c.sendMinDdlTsAsTt() // disable ts msg
_ = paramtable.Get().Save(paramtable.Get().CommonCfg.TTMsgEnabled.Key, "true")
c.sendMinDdlTsAsTt() // no session.
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}})
c.sendMinDdlTsAsTt()

View File

@ -41,13 +41,12 @@ type StreamPipeline interface {
}
type streamPipeline struct {
pipeline *pipeline
input <-chan *msgstream.MsgPack
scanner streaming.Scanner
dispatcher msgdispatcher.Client
startOnce sync.Once
vChannel string
replicateConfig *msgstream.ReplicateConfig
pipeline *pipeline
input <-chan *msgstream.MsgPack
scanner streaming.Scanner
dispatcher msgdispatcher.Client
startOnce sync.Once
vChannel string
closeCh chan struct{} // notify work to exit
closeWg sync.WaitGroup
@ -96,10 +95,9 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
start := time.Now()
p.input, err = p.dispatcher.Register(ctx, &msgdispatcher.StreamConfig{
VChannel: p.vChannel,
Pos: position,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: p.replicateConfig,
VChannel: p.vChannel,
Pos: position,
SubPos: common.SubscriptionPositionUnknown,
})
if err != nil {
log.Error("dispatcher register failed after retried", zap.String("channel", position.ChannelName), zap.Error(err))
@ -151,7 +149,6 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client,
nodeTtInterval time.Duration,
enableTtChecker bool,
vChannel string,
replicateConfig *msgstream.ReplicateConfig,
) StreamPipeline {
pipeline := &streamPipeline{
pipeline: &pipeline{
@ -159,12 +156,11 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client,
nodeTtInterval: nodeTtInterval,
enableTtChecker: enableTtChecker,
},
dispatcher: dispatcher,
vChannel: vChannel,
replicateConfig: replicateConfig,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
lastAccessTime: atomic.NewTime(time.Now()),
dispatcher: dispatcher,
vChannel: vChannel,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
lastAccessTime: atomic.NewTime(time.Now()),
}
return pipeline

View File

@ -50,7 +50,7 @@ func (suite *StreamPipelineSuite) SetupTest() {
suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T())
suite.msgDispatcher.EXPECT().Register(mock.Anything, mock.Anything).Return(suite.inChannel, nil)
suite.msgDispatcher.EXPECT().Deregister(suite.channel)
suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel, nil)
suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel)
suite.length = 4
}

View File

@ -241,8 +241,6 @@ const (
PartitionKeyIsolationKey = "partitionkey.isolation"
FieldSkipLoadKey = "field.skipLoad"
IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled"
ReplicateIDKey = "replicate.id"
ReplicateEndTSKey = "replicate.endTS"
IndexNonEncoding = "index.nonEncoding"
EnableDynamicSchemaKey = `dynamicfield.enabled`
NamespaceEnabledKey = "namespace.enabled"
@ -514,33 +512,6 @@ func ShouldFieldBeLoaded(kvs []*commonpb.KeyValuePair) (bool, error) {
return true, nil
}
func IsReplicateEnabled(kvs []*commonpb.KeyValuePair) (bool, bool) {
replicateID, ok := GetReplicateID(kvs)
return replicateID != "", ok
}
func GetReplicateID(kvs []*commonpb.KeyValuePair) (string, bool) {
for _, kv := range kvs {
if kv.GetKey() == ReplicateIDKey {
return kv.GetValue(), true
}
}
return "", false
}
func GetReplicateEndTS(kvs []*commonpb.KeyValuePair) (uint64, bool) {
for _, kv := range kvs {
if kv.GetKey() == ReplicateEndTSKey {
ts, err := strconv.ParseUint(kv.GetValue(), 10, 64)
if err != nil {
return 0, false
}
return ts, true
}
}
return 0, false
}
func IsEnableDynamicSchema(kvs []*commonpb.KeyValuePair) (found bool, value bool, err error) {
for _, kv := range kvs {
if kv.GetKey() == EnableDynamicSchemaKey {

View File

@ -178,87 +178,6 @@ func TestShouldFieldBeLoaded(t *testing.T) {
}
}
func TestReplicateProperty(t *testing.T) {
t.Run("ReplicateID", func(t *testing.T) {
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateIDKey,
Value: "1001",
},
}
e, ok := IsReplicateEnabled(p)
assert.True(t, e)
assert.True(t, ok)
i, ok := GetReplicateID(p)
assert.True(t, ok)
assert.Equal(t, "1001", i)
}
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateIDKey,
Value: "",
},
}
e, ok := IsReplicateEnabled(p)
assert.False(t, e)
assert.True(t, ok)
}
{
p := []*commonpb.KeyValuePair{
{
Key: "foo",
Value: "1001",
},
}
e, ok := IsReplicateEnabled(p)
assert.False(t, e)
assert.False(t, ok)
}
})
t.Run("ReplicateTS", func(t *testing.T) {
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateEndTSKey,
Value: "1001",
},
}
ts, ok := GetReplicateEndTS(p)
assert.True(t, ok)
assert.EqualValues(t, 1001, ts)
}
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateEndTSKey,
Value: "foo",
},
}
ts, ok := GetReplicateEndTS(p)
assert.False(t, ok)
assert.EqualValues(t, 0, ts)
}
{
p := []*commonpb.KeyValuePair{
{
Key: "foo",
Value: "1001",
},
}
ts, ok := GetReplicateEndTS(p)
assert.False(t, ok)
assert.EqualValues(t, 0, ts)
}
})
}
func TestIsEnableDynamicSchema(t *testing.T) {
type testCase struct {
tag string

View File

@ -81,7 +81,6 @@ const (
TimestampTypeKey = "timestamp"
ChannelTypeKey = "vchannel"
CollectionIDTypeKey = "collection_id"
ReplicateIDTypeKey = "replicate_id"
)
// GetMsgType gets the message type from message.

View File

@ -38,10 +38,9 @@ type (
)
type StreamConfig struct {
VChannel string
Pos *Pos
SubPos SubPos
ReplicateConfig *msgstream.ReplicateConfig
VChannel string
Pos *Pos
SubPos SubPos
}
func NewStreamConfig(vchannel string, pos *Pos, subPos SubPos) *StreamConfig {

View File

@ -89,7 +89,6 @@ func NewDispatcher(
pchannel string,
position *Pos,
subPos SubPos,
includeCurrentMsg bool,
pullbackEndTs typeutil.Timestamp,
includeSkipWhenSplit bool,
) (*Dispatcher, error) {
@ -120,7 +119,7 @@ func NewDispatcher(
return nil, err
}
log.Info("as consumer done", zap.Any("position", position))
err = stream.Seek(ctx, []*Pos{position}, includeCurrentMsg)
err = stream.Seek(ctx, []*Pos{position}, true)
if err != nil {
log.Error("seek failed", zap.Error(err))
return nil, err
@ -253,14 +252,12 @@ func (d *Dispatcher) work() {
for vchannel, p := range targetPacks {
var err error
t, _ := d.targets.Get(vchannel)
isReplicateChannel := strings.Contains(vchannel, paramtable.Get().CommonCfg.ReplicateMsgChannel.GetValue())
// The dispatcher seeks from the oldest target,
// so for each target, msg before the target position must be filtered out.
//
// From 2.6.0, every message has a unique timetick, so we can filter out the msg by < but not <=.
if ((d.includeSkipWhenSplit && p.EndTs < t.pos.GetTimestamp()) ||
(!d.includeSkipWhenSplit && p.EndTs <= t.pos.GetTimestamp())) &&
!isReplicateChannel {
if (d.includeSkipWhenSplit && p.EndTs < t.pos.GetTimestamp()) ||
(!d.includeSkipWhenSplit && p.EndTs <= t.pos.GetTimestamp()) {
log.Info("skip msg",
zap.String("vchannel", vchannel),
zap.Int("msgCount", len(p.Msgs)),
@ -319,7 +316,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal
// init packs for all targets, even though there's no msg in pack,
// but we still need to dispatch time ticks to the targets.
targetPacks := make(map[string]*MsgPack)
replicateConfigs := make(map[string]*msgstream.ReplicateConfig)
d.targets.Range(func(vchannel string, t *target) bool {
targetPacks[vchannel] = &MsgPack{
BeginTs: pack.BeginTs,
@ -328,9 +324,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal
StartPositions: pack.StartPositions,
EndPositions: pack.EndPositions,
}
if t.replicateConfig != nil {
replicateConfigs[vchannel] = t.replicateConfig
}
return true
})
// group messages by vchannel
@ -350,14 +343,6 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal
// we need to dispatch it to the vchannel of this collection
targets := []string{}
for k := range targetPacks {
if msg.GetType() == commonpb.MsgType_Replicate {
config := replicateConfigs[k]
if config != nil && msg.GetReplicateID() == config.ReplicateID {
targets = append(targets, k)
}
continue
}
if !strings.Contains(k, collectionID) {
continue
}
@ -386,59 +371,5 @@ func (d *Dispatcher) groupAndParseMsgs(pack *msgstream.ConsumeMsgPack, unmarshal
targetPacks[vchannel].Msgs = append(targetPacks[vchannel].Msgs, tsMsg)
}
}
replicateEndChannels := make(map[string]struct{})
for vchannel, c := range replicateConfigs {
if len(targetPacks[vchannel].Msgs) == 0 {
delete(targetPacks, vchannel) // no replicate msg, can't send pack
continue
}
// calculate the new pack ts
beginTs := targetPacks[vchannel].Msgs[0].BeginTs()
endTs := targetPacks[vchannel].Msgs[0].EndTs()
newMsgs := make([]msgstream.TsMsg, 0)
for _, msg := range targetPacks[vchannel].Msgs {
if msg.BeginTs() < beginTs {
beginTs = msg.BeginTs()
}
if msg.EndTs() > endTs {
endTs = msg.EndTs()
}
if msg.Type() == commonpb.MsgType_Replicate {
replicateMsg := msg.(*msgstream.ReplicateMsg)
if c.CheckFunc(replicateMsg) {
replicateEndChannels[vchannel] = struct{}{}
}
continue
}
newMsgs = append(newMsgs, msg)
}
targetPacks[vchannel].Msgs = newMsgs
d.resetMsgPackTS(targetPacks[vchannel], beginTs, endTs)
}
for vchannel := range replicateEndChannels {
if t, ok := d.targets.Get(vchannel); ok {
t.replicateConfig = nil
log.Info("replicate end, set replicate config nil", zap.String("vchannel", vchannel))
}
}
return targetPacks
}
func (d *Dispatcher) resetMsgPackTS(pack *MsgPack, newBeginTs, newEndTs typeutil.Timestamp) {
pack.BeginTs = newBeginTs
pack.EndTs = newEndTs
startPositions := make([]*msgstream.MsgPosition, 0)
endPositions := make([]*msgstream.MsgPosition, 0)
for _, pos := range pack.StartPositions {
startPosition := typeutil.Clone(pos)
startPosition.Timestamp = newBeginTs
startPositions = append(startPositions, startPosition)
}
for _, pos := range pack.EndPositions {
endPosition := typeutil.Clone(pos)
endPosition.Timestamp = newEndTs
endPositions = append(endPositions, endPosition)
}
pack.StartPositions = startPositions
pack.EndPositions = endPositions
}

View File

@ -36,7 +36,7 @@ func TestDispatcher(t *testing.T) {
ctx := context.Background()
t.Run("test base", func(t *testing.T) {
d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0",
nil, common.SubscriptionPositionEarliest, false, 0, false)
nil, common.SubscriptionPositionEarliest, 0, false)
assert.NoError(t, err)
assert.NotPanics(t, func() {
d.Handle(start)
@ -65,7 +65,7 @@ func TestDispatcher(t *testing.T) {
},
}
d, err := NewDispatcher(ctx, factory, time.Now().UnixNano(), "mock_pchannel_0",
nil, common.SubscriptionPositionEarliest, false, 0, false)
nil, common.SubscriptionPositionEarliest, 0, false)
assert.Error(t, err)
assert.Nil(t, d)
@ -73,7 +73,7 @@ func TestDispatcher(t *testing.T) {
t.Run("test target", func(t *testing.T) {
d, err := NewDispatcher(ctx, newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0",
nil, common.SubscriptionPositionEarliest, false, 0, false)
nil, common.SubscriptionPositionEarliest, 0, false)
assert.NoError(t, err)
output := make(chan *msgstream.MsgPack, 1024)
@ -128,7 +128,7 @@ func TestDispatcher(t *testing.T) {
func BenchmarkDispatcher_handle(b *testing.B) {
d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0",
nil, common.SubscriptionPositionEarliest, false, 0, false)
nil, common.SubscriptionPositionEarliest, 0, false)
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
@ -143,197 +143,44 @@ func BenchmarkDispatcher_handle(b *testing.B) {
func TestGroupMessage(t *testing.T) {
d, err := NewDispatcher(context.Background(), newMockFactory(), time.Now().UnixNano(), "mock_pchannel_0",
nil, common.SubscriptionPositionEarliest, false, 0, false)
nil, common.SubscriptionPositionEarliest, 0, false)
assert.NoError(t, err)
d.AddTarget(newTarget(&StreamConfig{VChannel: "mock_pchannel_0_1v0"}, false))
d.AddTarget(newTarget(&StreamConfig{
VChannel: "mock_pchannel_0_2v0",
ReplicateConfig: msgstream.GetReplicateConfig("local-test", "foo", "coo"),
VChannel: "mock_pchannel_0_2v0",
}, false))
{
// no replicate msg
packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{
BeginTs: 1,
EndTs: 10,
StartPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("1"),
Timestamp: 1,
},
packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{
BeginTs: 1,
EndTs: 10,
StartPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("1"),
Timestamp: 1,
},
EndPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("10"),
Timestamp: 10,
},
},
EndPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("10"),
Timestamp: 10,
},
Msgs: []msgstream.TsMsg{
&msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 5,
EndTimestamp: 5,
},
InsertRequest: &msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
Timestamp: 5,
},
ShardName: "mock_pchannel_0_1v0",
},
Msgs: []msgstream.TsMsg{
&msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 5,
EndTimestamp: 5,
},
InsertRequest: &msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
Timestamp: 5,
},
ShardName: "mock_pchannel_0_1v0",
},
},
}), nil)
assert.Len(t, packs, 1)
}
{
// equal to replicateID
packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{
BeginTs: 1,
EndTs: 10,
StartPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("1"),
Timestamp: 1,
},
},
EndPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("10"),
Timestamp: 10,
},
},
Msgs: []msgstream.TsMsg{
&msgstream.ReplicateMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 100,
EndTimestamp: 100,
},
ReplicateMsg: &msgpb.ReplicateMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Replicate,
Timestamp: 100,
ReplicateInfo: &commonpb.ReplicateInfo{
ReplicateID: "local-test",
},
},
},
},
},
}), nil)
assert.Len(t, packs, 2)
{
replicatePack := packs["mock_pchannel_0_2v0"]
assert.EqualValues(t, 100, replicatePack.BeginTs)
assert.EqualValues(t, 100, replicatePack.EndTs)
assert.EqualValues(t, 100, replicatePack.StartPositions[0].Timestamp)
assert.EqualValues(t, 100, replicatePack.EndPositions[0].Timestamp)
assert.Len(t, replicatePack.Msgs, 0)
}
{
replicatePack := packs["mock_pchannel_0_1v0"]
assert.EqualValues(t, 1, replicatePack.BeginTs)
assert.EqualValues(t, 10, replicatePack.EndTs)
assert.EqualValues(t, 1, replicatePack.StartPositions[0].Timestamp)
assert.EqualValues(t, 10, replicatePack.EndPositions[0].Timestamp)
assert.Len(t, replicatePack.Msgs, 0)
}
}
{
// not equal to replicateID
packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{
BeginTs: 1,
EndTs: 10,
StartPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("1"),
Timestamp: 1,
},
},
EndPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("10"),
Timestamp: 10,
},
},
Msgs: []msgstream.TsMsg{
&msgstream.ReplicateMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 100,
EndTimestamp: 100,
},
ReplicateMsg: &msgpb.ReplicateMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Replicate,
Timestamp: 100,
ReplicateInfo: &commonpb.ReplicateInfo{
ReplicateID: "local-test-1", // not equal to replicateID
},
},
},
},
},
}), nil)
assert.Len(t, packs, 1)
replicatePack := packs["mock_pchannel_0_2v0"]
assert.Nil(t, replicatePack)
}
{
// replicate end
replicateTarget, ok := d.targets.Get("mock_pchannel_0_2v0")
assert.True(t, ok)
assert.NotNil(t, replicateTarget.replicateConfig)
packs := d.groupAndParseMsgs(msgstream.BuildConsumeMsgPack(&MsgPack{
BeginTs: 1,
EndTs: 10,
StartPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("1"),
Timestamp: 1,
},
},
EndPositions: []*msgstream.MsgPosition{
{
ChannelName: "mock_pchannel_0",
MsgID: []byte("10"),
Timestamp: 10,
},
},
Msgs: []msgstream.TsMsg{
&msgstream.ReplicateMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 100,
EndTimestamp: 100,
},
ReplicateMsg: &msgpb.ReplicateMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Replicate,
Timestamp: 100,
ReplicateInfo: &commonpb.ReplicateInfo{
ReplicateID: "local-test",
},
},
IsEnd: true,
Database: "foo",
},
},
},
}), nil)
assert.Len(t, packs, 2)
replicatePack := packs["mock_pchannel_0_2v0"]
assert.EqualValues(t, 100, replicatePack.BeginTs)
assert.EqualValues(t, 100, replicatePack.EndTs)
assert.EqualValues(t, 100, replicatePack.StartPositions[0].Timestamp)
assert.EqualValues(t, 100, replicatePack.EndPositions[0].Timestamp)
assert.Nil(t, replicateTarget.replicateConfig)
}
},
}), nil)
assert.Len(t, packs, 2)
}

View File

@ -247,14 +247,11 @@ OUTER:
}
}
// For CDC, CDC needs to includeCurrentMsg when create new dispatcher
// and NOT includeCurrentMsg when create lag dispatcher. So if any dispatcher lagged,
// If any dispatcher is lagged,
// we give up batch subscription and create dispatcher for only one target.
includeCurrentMsg := false
for _, candidate := range candidateTargets {
if candidate.isLagged {
candidateTargets = []*target{candidate}
includeCurrentMsg = true
candidate.isLagged = false
break
}
@ -272,7 +269,7 @@ OUTER:
// TODO: add newDispatcher timeout param and init context
id := c.idAllocator.Inc()
d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, includeCurrentMsg, latestTarget.pos.GetTimestamp(), c.includeSkipWhenSplit)
d, err := NewDispatcher(context.Background(), c.factory, id, c.pchannel, earliestTarget.pos, earliestTarget.subPos, latestTarget.pos.GetTimestamp(), c.includeSkipWhenSplit)
if err != nil {
panic(err)
}
@ -381,19 +378,6 @@ func (c *dispatcherManager) tryMerge() {
zap.Duration("dur", time.Since(start)))
}
// deleteMetric remove specific prometheus metric,
// Lock/RLock is required before calling this method.
func (c *dispatcherManager) deleteMetric(channel string) {
nodeIDStr := fmt.Sprintf("%d", c.nodeID)
if c.role == typeutil.DataNodeRole {
metrics.DataNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel)
return
}
if c.role == typeutil.QueryNodeRole {
metrics.QueryNodeMsgDispatcherTtLag.DeleteLabelValues(nodeIDStr, channel)
}
}
func (c *dispatcherManager) uploadMetric() {
c.mu.RLock()
defer c.mu.RUnlock()

View File

@ -24,7 +24,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -38,18 +37,16 @@ type target struct {
latestTimeTick uint64
isLagged bool
closeMu sync.Mutex
closeOnce sync.Once
closed bool
maxLag time.Duration
timer *time.Timer
replicateConfig *msgstream.ReplicateConfig
closeMu sync.Mutex
closeOnce sync.Once
closed bool
maxLag time.Duration
timer *time.Timer
cancelCh lifetime.SafeChan
}
func newTarget(streamConfig *StreamConfig, filterSameTimeTick bool) *target {
replicateConfig := streamConfig.ReplicateConfig
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
t := &target{
vchannel: streamConfig.VChannel,
@ -61,14 +58,8 @@ func newTarget(streamConfig *StreamConfig, filterSameTimeTick bool) *target {
cancelCh: lifetime.NewSafeChan(),
maxLag: maxTolerantLag,
timer: time.NewTimer(maxTolerantLag),
replicateConfig: replicateConfig,
}
t.closed = false
if replicateConfig != nil {
log.Info("have replicate config",
zap.String("vchannel", streamConfig.VChannel),
zap.String("replicateID", replicateConfig.ReplicateID))
}
return t
}

View File

@ -292,39 +292,6 @@ func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Clos
return _c
}
// ForceEnableProduce provides a mock function with given fields: can
func (_m *MockMsgStream) ForceEnableProduce(can bool) {
_m.Called(can)
}
// MockMsgStream_ForceEnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceEnableProduce'
type MockMsgStream_ForceEnableProduce_Call struct {
*mock.Call
}
// ForceEnableProduce is a helper method to define mock.On call
// - can bool
func (_e *MockMsgStream_Expecter) ForceEnableProduce(can interface{}) *MockMsgStream_ForceEnableProduce_Call {
return &MockMsgStream_ForceEnableProduce_Call{Call: _e.mock.On("ForceEnableProduce", can)}
}
func (_c *MockMsgStream_ForceEnableProduce_Call) Run(run func(can bool)) *MockMsgStream_ForceEnableProduce_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
})
return _c
}
func (_c *MockMsgStream_ForceEnableProduce_Call) Return() *MockMsgStream_ForceEnableProduce_Call {
_c.Call.Return()
return _c
}
func (_c *MockMsgStream_ForceEnableProduce_Call) RunAndReturn(run func(bool)) *MockMsgStream_ForceEnableProduce_Call {
_c.Run(run)
return _c
}
// GetLatestMsgID provides a mock function with given fields: channel
func (_m *MockMsgStream) GetLatestMsgID(channel string) (common.MessageID, error) {
ret := _m.Called(channel)

View File

@ -308,9 +308,6 @@ func (w WpMsgStream) CheckTopicValid(channel string) error {
return nil
}
func (w WpMsgStream) ForceEnableProduce(can bool) {
}
// NewWpmsFactory creates a new message stream factory based on woodpecker
func NewWpmsFactory(cfg *paramtable.ServiceParam) Factory {
// TODO should not be used in mq wrapper

View File

@ -204,7 +204,6 @@ func TestWpMsgStream(t *testing.T) {
wpStream.Close()
wpStream.AsProducer(ctx, []string{"test-channel"})
wpStream.SetRepackFunc(nil)
wpStream.ForceEnableProduce(true)
// Test methods returning nil/empty values
msgID, err := wpStream.GetLatestMsgID("test-channel")

View File

@ -20,14 +20,12 @@ import (
"context"
"fmt"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
uatomic "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
@ -35,7 +33,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
@ -47,10 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var (
_ MsgStream = (*mqMsgStream)(nil)
streamCounter uatomic.Int64
)
var _ MsgStream = (*mqMsgStream)(nil)
type mqMsgStream struct {
ctx context.Context
@ -60,22 +54,16 @@ type mqMsgStream struct {
consumers map[string]mqwrapper.Consumer
consumerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *ConsumeMsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
forceEnableProduce atomic.Value
configEvent config.EventHandler
replicateID string
checkFunc CheckReplicateMsgFunc
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *ConsumeMsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
}
// NewMqMsgStream is used to generate a new mqMsgStream object
@ -109,20 +97,7 @@ func NewMqMsgStream(initCtx context.Context,
closeRWMutex: &sync.RWMutex{},
closed: 0,
}
ctxLog := log.Ctx(initCtx)
stream.forceEnableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
ctxLog.Info("Msg Stream state", zap.Bool("can_produce", stream.isEnabledProduce()))
log.Ctx(initCtx).Info("Msg Stream initialized")
return stream, nil
}
@ -244,7 +219,6 @@ func (ms *mqMsgStream) Close() {
ms.client.Close()
close(ms.receiveBuf)
paramtable.Get().Unwatch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, ms.configEvent)
log.Info("mq msg stream closed")
}
@ -273,36 +247,7 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
return ms.producerChannels
}
func (ms *mqMsgStream) ForceEnableProduce(can bool) {
ms.forceEnableProduce.Store(can)
}
func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}
func (ms *mqMsgStream) isSkipSystemTT() bool {
return ms.replicateID != ""
}
// checkReplicateID check the replicate id of the message, return values: isMatch, isReplicate
func (ms *mqMsgStream) checkReplicateID(msg TsMsg) (bool, bool) {
if !ms.isSkipSystemTT() {
return true, false
}
msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase })
if !ok {
log.Warn("fail to get msg base, please check it", zap.Any("type", msg.Type()))
return false, false
}
return msgBase.GetBase().GetReplicateInfo().GetReplicateID() == ms.replicateID, true
}
func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
if !ms.isEnabledProduce() {
log.Ctx(ms.ctx).Warn("can't produce the msg in the backup instance", zap.Stack("stack"))
return merr.ErrDenyProduceMsg
}
if msgPack == nil || len(msgPack.Msgs) <= 0 {
log.Ctx(ms.ctx).Debug("Warning: Receive empty msgPack")
return nil
@ -378,14 +323,7 @@ func (ms *mqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) (map[str
if msgPack == nil || len(msgPack.Msgs) <= 0 {
return ids, errors.New("empty msgs")
}
// Only allow to create collection msg in backup instance
// However, there may be a problem of ts disorder here, but because the start position of the collection only uses offsets, not time, there is no problem for the time being
isCreateCollectionMsg := len(msgPack.Msgs) == 1 && msgPack.Msgs[0].Type() == commonpb.MsgType_CreateCollection
if !ms.isEnabledProduce() && !isCreateCollectionMsg {
log.Ctx(ms.ctx).Warn("can't broadcast the msg in the backup instance", zap.Stack("stack"))
return ids, merr.ErrDenyProduceMsg
}
for _, v := range msgPack.Msgs {
spanCtx, sp := MsgSpanFromCtx(v.TraceCtx(), v)
@ -754,15 +692,14 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
timeTickMsg = v
continue
}
if v.GetTimestamp() <= currTs ||
v.GetReplicateID() != "" {
if v.GetTimestamp() <= currTs {
size += uint64(v.GetSize())
timeTickBuf = append(timeTickBuf, v)
} else {
tempBuffer = append(tempBuffer, v)
}
// when drop collection, force to exit the buffer loop
if v.GetType() == commonpb.MsgType_DropCollection || v.GetType() == commonpb.MsgType_Replicate {
if v.GetType() == commonpb.MsgType_DropCollection {
containsEndBufferMsg = true
}
}
@ -1007,10 +944,6 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition,
}
}
// skip the replicate msg because it must have been consumed
if packMsg.GetReplicateID() != "" {
continue
}
if packMsg.GetType() == commonpb.MsgType_TimeTick && packMsg.GetTimestamp() >= mp.Timestamp {
runLoop = false
if time.Since(loopStarTime) > 30*time.Second {

View File

@ -31,12 +31,11 @@ func TestFlushMsg(t *testing.T) {
var msg TsMsg = &FlushMsg{
FlushRequest: &milvuspb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_Flush,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionNames: []string{"col1", "col2"},
@ -70,12 +69,11 @@ func TestLoadCollection(t *testing.T) {
var msg TsMsg = &LoadCollectionMsg{
LoadCollectionRequest: &milvuspb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_LoadCollection,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionName: "col1",
@ -109,12 +107,11 @@ func TestReleaseCollection(t *testing.T) {
var msg TsMsg = &ReleaseCollectionMsg{
ReleaseCollectionRequest: &milvuspb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_ReleaseCollection,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionName: "col1",

View File

@ -31,12 +31,11 @@ func TestCreateDatabase(t *testing.T) {
var msg TsMsg = &CreateDatabaseMsg{
CreateDatabaseRequest: &milvuspb.CreateDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_CreateDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
},
@ -68,12 +67,11 @@ func TestDropDatabase(t *testing.T) {
var msg TsMsg = &DropDatabaseMsg{
DropDatabaseRequest: &milvuspb.DropDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_DropDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
},
@ -105,12 +103,11 @@ func TestAlterDatabase(t *testing.T) {
var msg TsMsg = &AlterDatabaseMsg{
AlterDatabaseRequest: &milvuspb.AlterDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_AlterDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_AlterDatabase,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
Properties: []*commonpb.KeyValuePair{

View File

@ -31,12 +31,11 @@ func TestImportMsg(t *testing.T) {
var msg TsMsg = &ImportMsg{
ImportMsg: &msgpb.ImportMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Import,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_Import,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
},

View File

@ -31,12 +31,11 @@ func TestCreateIndex(t *testing.T) {
var msg TsMsg = &CreateIndexMsg{
CreateIndexRequest: &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateIndex,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_CreateIndex,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
},
@ -67,12 +66,11 @@ func TestDropIndex(t *testing.T) {
var msg TsMsg = &DropIndexMsg{
DropIndexRequest: &milvuspb.DropIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropIndex,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_DropIndex,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionName: "col1",

View File

@ -31,12 +31,11 @@ func TestLoadPartitions(t *testing.T) {
msg := &LoadPartitionsMsg{
LoadPartitionsRequest: &milvuspb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_LoadPartitions,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionName: "col1",
@ -76,12 +75,11 @@ func TestReleasePartitions(t *testing.T) {
msg := &ReleasePartitionsMsg{
ReleasePartitionsRequest: &milvuspb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_ReleasePartitions,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
DbName: "unit_db",
CollectionName: "col1",

View File

@ -1,78 +0,0 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package msgstream
import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
)
type ReplicateMsg struct {
BaseMsg
*msgpb.ReplicateMsg
}
var _ TsMsg = (*ReplicateMsg)(nil)
func (r *ReplicateMsg) ID() UniqueID {
return r.Base.MsgID
}
func (r *ReplicateMsg) SetID(id UniqueID) {
r.Base.MsgID = id
}
func (r *ReplicateMsg) Type() MsgType {
return r.Base.MsgType
}
func (r *ReplicateMsg) SourceID() int64 {
return r.Base.SourceID
}
func (r *ReplicateMsg) Marshal(input TsMsg) (MarshalType, error) {
replicateMsg := input.(*ReplicateMsg)
mb, err := proto.Marshal(replicateMsg.ReplicateMsg)
if err != nil {
return nil, err
}
return mb, nil
}
func (r *ReplicateMsg) Unmarshal(input MarshalType) (TsMsg, error) {
replicateMsg := &msgpb.ReplicateMsg{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, replicateMsg)
if err != nil {
return nil, err
}
rr := &ReplicateMsg{ReplicateMsg: replicateMsg}
rr.BeginTimestamp = replicateMsg.GetBase().GetTimestamp()
rr.EndTimestamp = replicateMsg.GetBase().GetTimestamp()
return rr, nil
}
func (r *ReplicateMsg) Size() int {
return proto.Size(r.ReplicateMsg)
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package msgstream
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
)
func TestReplicateMsg(t *testing.T) {
var msg TsMsg = &ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Replicate,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
},
Database: "unit_db",
},
}
assert.EqualValues(t, 100, msg.ID())
msg.SetID(200)
assert.EqualValues(t, 200, msg.ID())
assert.Equal(t, commonpb.MsgType_Replicate, msg.Type())
assert.EqualValues(t, 10000, msg.SourceID())
msgBytes, err := msg.Marshal(msg)
assert.NoError(t, err)
var newMsg TsMsg = &ReplicateMsg{}
_, err = newMsg.Unmarshal("1")
assert.Error(t, err)
newMsg, err = newMsg.Unmarshal(msgBytes)
assert.NoError(t, err)
assert.EqualValues(t, 200, newMsg.ID())
assert.EqualValues(t, 1000, newMsg.BeginTs())
assert.EqualValues(t, 1000, newMsg.EndTs())
assert.True(t, msg.Size() > 0)
}

View File

@ -31,12 +31,11 @@ func TestCreateUser(t *testing.T) {
var msg TsMsg = &CreateUserMsg{
CreateCredentialRequest: &milvuspb.CreateCredentialRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_CreateCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Username: "unit_user",
Password: "unit_password",
@ -70,12 +69,11 @@ func TestUpdateUser(t *testing.T) {
var msg TsMsg = &UpdateUserMsg{
UpdateCredentialRequest: &milvuspb.UpdateCredentialRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_UpdateCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_UpdateCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Username: "unit_user",
OldPassword: "unit_old_password",
@ -111,12 +109,11 @@ func TestDeleteUser(t *testing.T) {
var msg TsMsg = &DeleteUserMsg{
DeleteCredentialRequest: &milvuspb.DeleteCredentialRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DeleteCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_DeleteCredential,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Username: "unit_user",
},
@ -148,12 +145,11 @@ func TestCreateRole(t *testing.T) {
var msg TsMsg = &CreateRoleMsg{
CreateRoleRequest: &milvuspb.CreateRoleRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_CreateRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Entity: &milvuspb.RoleEntity{
Name: "unit_role",
@ -187,12 +183,11 @@ func TestDropRole(t *testing.T) {
var msg TsMsg = &DropRoleMsg{
DropRoleRequest: &milvuspb.DropRoleRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_DropRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
RoleName: "unit_role",
},
@ -224,12 +219,11 @@ func TestOperateUserRole(t *testing.T) {
var msg TsMsg = &OperateUserRoleMsg{
OperateUserRoleRequest: &milvuspb.OperateUserRoleRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_OperateUserRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_OperateUserRole,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
RoleName: "unit_role",
Username: "unit_user",
@ -265,12 +259,11 @@ func TestOperatePrivilege(t *testing.T) {
var msg TsMsg = &OperatePrivilegeMsg{
OperatePrivilegeRequest: &milvuspb.OperatePrivilegeRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_OperatePrivilege,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_OperatePrivilege,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Entity: &milvuspb.GrantEntity{
Role: &milvuspb.RoleEntity{Name: "unit_role"},
@ -317,12 +310,11 @@ func TestOperatePrivilegeV2(t *testing.T) {
var msg TsMsg = &OperatePrivilegeV2Msg{
OperatePrivilegeV2Request: &milvuspb.OperatePrivilegeV2Request{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_OperatePrivilegeV2,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_OperatePrivilegeV2,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
Grantor: &milvuspb.GrantorEntity{
User: &milvuspb.UserEntity{Name: "unit_user"},
@ -359,12 +351,11 @@ func TestCreatePrivilegeGroup(t *testing.T) {
var msg TsMsg = &CreatePrivilegeGroupMsg{
CreatePrivilegeGroupRequest: &milvuspb.CreatePrivilegeGroupRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_CreatePrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
GroupName: "unit_group",
},
@ -397,12 +388,11 @@ func TestDropPrivilegeGroup(t *testing.T) {
var msg TsMsg = &DropPrivilegeGroupMsg{
DropPrivilegeGroupRequest: &milvuspb.DropPrivilegeGroupRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_DropPrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
GroupName: "unit_group",
},
@ -434,12 +424,11 @@ func TestOperatePrivilegeGroup(t *testing.T) {
var msg TsMsg = &OperatePrivilegeGroupMsg{
OperatePrivilegeGroupRequest: &milvuspb.OperatePrivilegeGroupRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_OperatePrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
MsgType: commonpb.MsgType_OperatePrivilegeGroup,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
},
GroupName: "unit_group",
Type: milvuspb.OperatePrivilegeGroupType_AddPrivilegesToGroup,

View File

@ -80,7 +80,6 @@ type ConsumeMsg interface {
GetID() int64
GetCollectionID() string
GetType() commonpb.MsgType
GetReplicateID() string
SetTraceCtx(ctx context.Context)
Unmarshal(unmarshalDispatcher UnmarshalDispatcher) (TsMsg, error)
@ -124,15 +123,6 @@ func (m *UnmarshaledMsg) GetSize() int {
return m.msg.Size()
}
func (m *UnmarshaledMsg) GetReplicateID() string {
msgBase, ok := m.msg.(interface{ GetBase() *commonpb.MsgBase })
if !ok {
log.Warn("fail to get msg base, please check it", zap.Any("type", m.msg.Type()))
return ""
}
return msgBase.GetBase().GetReplicateInfo().GetReplicateID()
}
func (m *UnmarshaledMsg) SetPosition(pos *msgpb.MsgPosition) {
m.msg.SetPosition(pos)
}
@ -159,7 +149,6 @@ type MarshaledMsg struct {
timestamp uint64
vchannel string
collectionID string
replicateID string
traceCtx context.Context
}
@ -195,10 +184,6 @@ func (m *MarshaledMsg) GetSize() int {
return len(m.msg.Payload())
}
func (m *MarshaledMsg) GetReplicateID() string {
return m.replicateID
}
func (m *MarshaledMsg) SetPosition(pos *msgpb.MsgPosition) {
m.pos = pos
}
@ -269,11 +254,6 @@ func NewMarshaledMsg(msg common.Message, group string) (ConsumeMsg, error) {
msgType: msgType,
vchannel: vchannel,
}
replicateID, ok := properties[common.ReplicateIDTypeKey]
if ok {
result.replicateID = replicateID
}
return result, nil
}
@ -306,53 +286,11 @@ type MsgStream interface {
Chan() <-chan *ConsumeMsgPack
GetUnmarshalDispatcher() UnmarshalDispatcher
// Seek consume message from the specified position
// includeCurrentMsg indicates whether to consume the current message, and in the milvus system, it should be always false
// includeCurrentMsg indicates whether to consume the current message.
Seek(ctx context.Context, msgPositions []*MsgPosition, includeCurrentMsg bool) error
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error
ForceEnableProduce(can bool)
}
type ReplicateConfig struct {
ReplicateID string
CheckFunc CheckReplicateMsgFunc
}
type CheckReplicateMsgFunc func(*ReplicateMsg) bool
func GetReplicateConfig(replicateID, dbName, colName string) *ReplicateConfig {
if replicateID == "" {
return nil
}
replicateConfig := &ReplicateConfig{
ReplicateID: replicateID,
CheckFunc: func(msg *ReplicateMsg) bool {
if !msg.GetIsEnd() {
return false
}
log.Info("check replicate msg",
zap.String("replicateID", replicateID),
zap.String("dbName", dbName),
zap.String("colName", colName),
zap.Any("msg", msg))
if msg.GetIsCluster() {
return true
}
return msg.GetDatabase() == dbName && (msg.GetCollection() == colName || msg.GetCollection() == "")
},
}
return replicateConfig
}
func GetReplicateID(msg TsMsg) string {
msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase })
if !ok {
log.Warn("fail to get msg base, please check it", zap.Any("type", msg.Type()))
return ""
}
return msgBase.GetBase().GetReplicateInfo().GetReplicateID()
}
func GetTimestamp(msg TsMsg) uint64 {
@ -364,10 +302,6 @@ func GetTimestamp(msg TsMsg) uint64 {
return msgBase.GetBase().GetTimestamp()
}
func MatchReplicateID(msg TsMsg, replicateID string) bool {
return GetReplicateID(msg) == replicateID
}
type Factory interface {
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)

View File

@ -154,9 +154,6 @@ func GetPorperties(msg TsMsg) map[string]string {
msgBase, ok := msg.(interface{ GetBase() *commonpb.MsgBase })
if ok {
properties[common.TimestampTypeKey] = strconv.FormatUint(msgBase.GetBase().GetTimestamp(), 10)
if msgBase.GetBase().GetReplicateInfo() != nil {
properties[common.ReplicateIDTypeKey] = msgBase.GetBase().GetReplicateInfo().GetReplicateID()
}
}
return properties

View File

@ -24,8 +24,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
)
@ -82,90 +80,3 @@ func TestGetLatestMsgID(t *testing.T) {
assert.Equal(t, []byte("mock"), id)
}
}
func TestReplicateConfig(t *testing.T) {
t.Run("get replicate id", func(t *testing.T) {
{
msg := &InsertMsg{
InsertRequest: &msgpb.InsertRequest{
Base: &commonpb.MsgBase{
ReplicateInfo: &commonpb.ReplicateInfo{
ReplicateID: "local",
},
},
},
}
assert.Equal(t, "local", GetReplicateID(msg))
assert.True(t, MatchReplicateID(msg, "local"))
}
{
msg := &InsertMsg{
InsertRequest: &msgpb.InsertRequest{
Base: &commonpb.MsgBase{},
},
}
assert.Equal(t, "", GetReplicateID(msg))
assert.False(t, MatchReplicateID(msg, "local"))
}
{
msg := &MarshalFailTsMsg{}
assert.Equal(t, "", GetReplicateID(msg))
}
})
t.Run("get replicate config", func(t *testing.T) {
{
assert.Nil(t, GetReplicateConfig("", "", ""))
}
{
rc := GetReplicateConfig("local", "db", "col")
assert.Equal(t, "local", rc.ReplicateID)
checkFunc := rc.CheckFunc
assert.False(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{},
}))
assert.True(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
IsCluster: true,
},
}))
assert.False(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
Database: "db1",
},
}))
assert.True(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
Database: "db",
},
}))
assert.False(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
Database: "db",
Collection: "col1",
},
}))
}
{
rc := GetReplicateConfig("local", "db", "col")
checkFunc := rc.CheckFunc
assert.True(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
Database: "db",
},
}))
assert.False(t, checkFunc(&ReplicateMsg{
ReplicateMsg: &msgpb.ReplicateMsg{
IsEnd: true,
Database: "db1",
Collection: "col1",
},
}))
}
})
}

View File

@ -92,7 +92,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
createPrivilegeGroupMsg := CreatePrivilegeGroupMsg{}
dropPrivilegeGroupMsg := DropPrivilegeGroupMsg{}
operatePrivilegeGroupMsg := OperatePrivilegeGroupMsg{}
replicateMsg := ReplicateMsg{}
importMsg := ImportMsg{}
createAliasMsg := CreateAliasMsg{}
@ -134,7 +133,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_CreatePrivilegeGroup] = createPrivilegeGroupMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropPrivilegeGroup] = dropPrivilegeGroupMsg.Unmarshal
p.TempMap[commonpb.MsgType_OperatePrivilegeGroup] = operatePrivilegeGroupMsg.Unmarshal
p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal
p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreateAlias] = createAliasMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropAlias] = dropAliasMsg.Unmarshal

View File

@ -949,12 +949,6 @@ func TestNumRowsWithSchema(t *testing.T) {
func TestChannelConvert(t *testing.T) {
t.Run("is physical channel", func(t *testing.T) {
{
channel := "by-dev-replicate-msg"
ok := IsPhysicalChannel(channel)
assert.True(t, ok)
}
{
channel := "by-dev-rootcoord-dml_2"
ok := IsPhysicalChannel(channel)
@ -980,12 +974,6 @@ func TestChannelConvert(t *testing.T) {
physicalChannel := ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_2", physicalChannel)
}
{
channel := "by-dev-replicate-msg"
physicalChannel := ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-replicate-msg", physicalChannel)
}
})
t.Run("get virtual channel", func(t *testing.T) {

View File

@ -70,8 +70,9 @@ var (
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false)
ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false)
ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false)
// Deprecated, keep it only for reserving the error code
ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false)
ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false)
@ -149,7 +150,8 @@ var (
ErrMqTopicNotFound = newMilvusError("topic not found", 1300, false)
ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
ErrMqInternal = newMilvusError("message queue internal error", 1302, false)
ErrDenyProduceMsg = newMilvusError("deny to write the message to mq", 1303, false)
// Deprecated, keep it only for reserving the error code
ErrDenyProduceMsg = newMilvusError("deny to write the message to mq", 1303, false)
// Privilege related
// this operation is denied because the user not authorized, user need to login in first
@ -178,7 +180,7 @@ var (
ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false)
ErrHTTPRateLimit = newMilvusError("request is rejected by limiter", 1807, true)
// replicate related
// Deprecated, legacy replicate related errors, keep them only for reserving the error code
ErrDenyReplicateMessage = newMilvusError("deny to use the replicate message in the normal instance", 1900, false)
ErrInvalidMsgBytes = newMilvusError("invalid replicate msg bytes", 1901, false)
ErrNoAssignSegmentID = newMilvusError("no assign segment id", 1902, false)

View File

@ -322,10 +322,6 @@ func WrapErrAsInputErrorWhen(err error, targets ...milvusError) error {
return err
}
func WrapErrCollectionReplicateMode(operation string) error {
return wrapFields(ErrCollectionReplicateMode, value("operation", operation))
}
func GetErrorType(err error) ErrorType {
if merr, ok := err.(milvusError); ok {
return merr.errType

View File

@ -210,7 +210,6 @@ type commonConfig struct {
RootCoordTimeTick ParamItem `refreshable:"true"`
RootCoordStatistics ParamItem `refreshable:"true"`
RootCoordDml ParamItem `refreshable:"false"`
ReplicateMsgChannel ParamItem `refreshable:"false"`
QueryCoordTimeTick ParamItem `refreshable:"true"`
@ -298,14 +297,12 @@ type commonConfig struct {
StorageZstdConcurrency ParamItem `refreshable:"false"`
StorageReadRetryAttempts ParamItem `refreshable:"true"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"`
BloomFilterType ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"`
BloomFilterApplyBatchSize ParamItem `refreshable:"true"`
PanicWhenPluginFail ParamItem `refreshable:"false"`
CollectionReplicateEnable ParamItem `refreshable:"true"`
UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
UseVectorAsClusteringKey ParamItem `refreshable:"true"`
@ -413,17 +410,6 @@ It is recommended to change this parameter before starting Milvus for the first
}
p.RootCoordDml.Init(base.mgr)
p.ReplicateMsgChannel = ParamItem{
Key: "msgChannel.chanNamePrefix.replicateMsg",
DefaultValue: "replicate-msg",
Version: "2.3.2",
FallbackKeys: []string{"common.chanNamePrefix.replicateMsg"},
PanicIfEmpty: true,
Formatter: chanNamePrefix,
Export: true,
}
p.ReplicateMsgChannel.Init(base.mgr)
p.QueryCoordTimeTick = ParamItem{
Key: "msgChannel.chanNamePrefix.queryTimeTick",
DefaultValue: "queryTimeTick",
@ -1076,26 +1062,6 @@ The default value is 1, which is enough for most cases.`,
}
p.StorageReadRetryAttempts.Init(base.mgr)
p.TTMsgEnabled = ParamItem{
Key: "common.ttMsgEnabled",
Version: "2.3.2",
DefaultValue: "true",
Doc: `Whether to disable the internal time messaging mechanism for the system.
If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches.
This helps Milvus-CDC synchronize incremental data`,
Export: true,
}
p.TTMsgEnabled.Init(base.mgr)
p.CollectionReplicateEnable = ParamItem{
Key: "common.collectionReplicateEnable",
Version: "2.4.16",
DefaultValue: "false",
Doc: `Whether to enable collection replication.`,
Export: true,
}
p.CollectionReplicateEnable.Init(base.mgr)
p.TraceLogMode = ParamItem{
Key: "common.traceLogMode",
Version: "2.3.4",