diff --git a/internal/cdc/replication/replicatemanager/channel_replicator.go b/internal/cdc/replication/replicatemanager/channel_replicator.go index 36c06e1027..c8edb67166 100644 --- a/internal/cdc/replication/replicatemanager/channel_replicator.go +++ b/internal/cdc/replication/replicatemanager/channel_replicator.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" - "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor" "github.com/milvus-io/milvus/pkg/v2/streaming/util/options" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -130,16 +130,12 @@ func (r *channelReplicator) replicateLoop() error { logger.Info("replicate channel stopped") return nil case msg := <-ch: - // TODO: Should be done at streamingnode. - if msg.MessageType().IsSelfControlled() { - if msg.MessageType() != message.MessageTypeTimeTick { - logger.Debug("skip self-controlled message", log.FieldMessage(msg)) - } - continue - } err := rsc.Replicate(msg) if err != nil { - panic(fmt.Sprintf("replicate message failed due to unrecoverable error: %v", err)) + if !errors.Is(err, replicatestream.ErrReplicateIgnored) { + panic(fmt.Sprintf("replicate message failed due to unrecoverable error: %v", err)) + } + continue } logger.Debug("replicate message success", log.FieldMessage(msg)) } diff --git a/internal/cdc/replication/replicatestream/metrics.go b/internal/cdc/replication/replicatestream/metrics.go index e52d5a97a8..6b39026220 100644 --- a/internal/cdc/replication/replicatestream/metrics.go +++ b/internal/cdc/replication/replicatestream/metrics.go @@ -19,9 +19,11 @@ package replicatestream import ( "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/milvus-io/milvus/pkg/v2/metrics" - streamingpb "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" - message "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -31,9 +33,11 @@ type ReplicateMetrics interface { StartReplicate(msg message.ImmutableMessage) OnSent(msg message.ImmutableMessage) OnConfirmed(msg message.ImmutableMessage) + OnNoIncomingMessages() + OnInitiate() OnConnect() OnDisconnect() - OnReconnect() + OnClose() } type msgMetrics struct { @@ -96,10 +100,18 @@ func (m *replicateMetrics) OnConfirmed(msg message.ImmutableMessage) { ).Set(float64(lag.Milliseconds())) } -func (m *replicateMetrics) OnConnect() { +// OnNoIncomingMessages is called when there are no incoming messages. +func (m *replicateMetrics) OnNoIncomingMessages() { + metrics.CDCReplicateLag.WithLabelValues( + m.replicateInfo.GetSourceChannelName(), + m.replicateInfo.GetTargetChannelName(), + ).Set(0) +} + +func (m *replicateMetrics) OnInitiate() { metrics.CDCStreamRPCConnections.WithLabelValues( m.replicateInfo.GetTargetCluster().GetClusterId(), - metrics.CDCStatusConnected, + metrics.CDCStatusDisconnected, ).Inc() } @@ -115,7 +127,7 @@ func (m *replicateMetrics) OnDisconnect() { ).Inc() } -func (m *replicateMetrics) OnReconnect() { +func (m *replicateMetrics) OnConnect() { targetClusterID := m.replicateInfo.GetTargetCluster().GetClusterId() metrics.CDCStreamRPCConnections.WithLabelValues( targetClusterID, @@ -130,3 +142,9 @@ func (m *replicateMetrics) OnReconnect() { targetClusterID, ).Inc() } + +func (m *replicateMetrics) OnClose() { + metrics.CDCStreamRPCConnections.DeletePartialMatch(prometheus.Labels{ + metrics.CDCLabelTargetCluster: m.replicateInfo.GetTargetCluster().GetClusterId(), + }) +} diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client.go b/internal/cdc/replication/replicatestream/replicate_stream_client.go index ea1358791a..02532fe3b9 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client.go @@ -19,16 +19,21 @@ package replicatestream import ( context "context" - streamingpb "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" ) +var ErrReplicateIgnored = errors.New("ignored replicate message") + // ReplicateStreamClient is the client that replicates the message to the given cluster. type ReplicateStreamClient interface { // Replicate replicates the message to the target cluster. // Replicate opeartion doesn't promise the message is delivered to the target cluster. // It will cache the message in memory and retry until the message is delivered to the target cluster or the client is closed. // Once the error is returned, the replicate operation will be unrecoverable. + // return ErrReplicateIgnored if the message should not be replicated. Replicate(msg message.ImmutableMessage) error // Stop stops the replicate operation. diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go index c97f7e8981..e8bae682ee 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go @@ -75,7 +75,7 @@ func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.Re finishedCh: make(chan struct{}), } - rs.metrics.OnConnect() + rs.metrics.OnInitiate() go rs.startInternal() return rs } @@ -87,7 +87,7 @@ func (r *replicateStreamClient) startInternal() { ) defer func() { - r.metrics.OnDisconnect() + r.metrics.OnClose() logger.Info("replicate stream client closed") close(r.finishedCh) }() @@ -116,6 +116,7 @@ func (r *replicateStreamClient) startInternal() { continue } logger.Info("replicate stream client service started") + r.metrics.OnConnect() // reset client and pending messages r.client = client @@ -151,6 +152,14 @@ func (r *replicateStreamClient) Replicate(msg message.ImmutableMessage) error { case <-r.ctx.Done(): return nil default: + // TODO: Should be done at streamingnode, but after move it into streamingnode, the metric need to be adjusted. + if msg.MessageType().IsSelfControlled() { + if r.pendingMessages.Len() == 0 { + // if there is no pending messages, there's no lag between source and target. + r.metrics.OnNoIncomingMessages() + } + return ErrReplicateIgnored + } r.metrics.StartReplicate(msg) r.pendingMessages.Enqueue(r.ctx, msg) return nil diff --git a/internal/distributed/streaming/replicate_service.go b/internal/distributed/streaming/replicate_service.go index e454803d1b..ac8ebf5289 100644 --- a/internal/distributed/streaming/replicate_service.go +++ b/internal/distributed/streaming/replicate_service.go @@ -4,6 +4,8 @@ import ( "context" "strings" + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/util/streamingutil/status" @@ -114,10 +116,15 @@ func (s replicateService) overwriteReplicateMessage(ctx context.Context, msg mes } // create collection message will set the vchannel in its body, so we need to overwrite it. - if msg.MessageType() == message.MessageTypeCreateCollection { + switch msg.MessageType() { + case message.MessageTypeCreateCollection: if err := s.overwriteCreateCollectionMessage(sourceCluster, msg); err != nil { return nil, err } + case message.MessageTypeAlterReplicateConfig: + if err := s.overwriteAlterReplicateConfigMessage(cfg, msg); err != nil { + return nil, err + } } if funcutil.IsControlChannel(msg.VChannel()) { @@ -157,3 +164,27 @@ func (s replicateService) overwriteCreateCollectionMessage(sourceCluster *replic createCollectionMsg.OverwriteBody(body) return nil } + +// overwriteAlterReplicateConfigMessage overwrites the alter replicate configuration message. +func (s replicateService) overwriteAlterReplicateConfigMessage(currentReplicateConfig *replicateutil.ConfigHelper, msg message.ReplicateMutableMessage) error { + alterReplicateConfigMsg := message.MustAsMutableAlterReplicateConfigMessageV2(msg) + cfg := alterReplicateConfigMsg.Header().ReplicateConfiguration + _, err := replicateutil.NewConfigHelper(s.clusterID, cfg) + if err == nil { + return nil + } + if !errors.Is(err, replicateutil.ErrCurrentClusterNotFound) { + return err + } + + // Current cluster not found in the replicate configuration, + // it means that the current cluster is removed from the replicate topology and become a independent cluster. + // So we need to overwrite the replicate configuration to make current cluster to be a primary cluster without replicate topology. + cluster := currentReplicateConfig.GetCurrentCluster() + alterReplicateConfigMsg.OverwriteHeader(&message.AlterReplicateConfigMessageHeader{ + ReplicateConfiguration: &commonpb.ReplicateConfiguration{ + Clusters: []*commonpb.MilvusCluster{cluster.MilvusCluster}, + }, + }) + return nil +} diff --git a/pkg/util/replicateutil/config_helper.go b/pkg/util/replicateutil/config_helper.go index 56623b0f3a..112ea9f453 100644 --- a/pkg/util/replicateutil/config_helper.go +++ b/pkg/util/replicateutil/config_helper.go @@ -32,7 +32,10 @@ const ( RoleSecondary ) -var ErrWrongConfiguration = errors.New("wrong replicate configuration") +var ( + ErrWrongConfiguration = errors.New("wrong replicate configuration") + ErrCurrentClusterNotFound = errors.New("current cluster not found") +) func (r Role) String() string { switch r { @@ -101,7 +104,7 @@ func NewConfigHelper(currentClusterID string, cfg *commonpb.ReplicateConfigurati return nil, errors.Wrap(ErrWrongConfiguration, "primary count is not 1") } if _, ok := vs[currentClusterID]; !ok { - return nil, errors.Wrap(ErrWrongConfiguration, fmt.Sprintf("current cluster %s not found", currentClusterID)) + return nil, ErrCurrentClusterNotFound } pchannels := len(vs[currentClusterID].Pchannels) for _, vertice := range vs {