Zhen Ye 8bf7d6ae72
enhance: refactor update replicate config operation using wal-broadcast-based DDL/DCL framework (#44560)
issue: #43897

- UpdateReplicateConfig operation will broadcast AlterReplicateConfig
message into all pchannels with cluster-exclusive-lock.
- Begin txn message will use commit message timetick now (to avoid
timetick rollback when CDC with txn message).
- If current cluster is secondary, the UpdateReplicateConfig will wait
until the replicate configuration is consistent with the config
replicated from primary.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2025-10-15 15:26:01 +08:00

43 lines
1.2 KiB
Go

package broadcast
import (
"context"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
var (
singleton = syncutil.NewFuture[broadcaster.Broadcaster]()
ErrNotPrimary = broadcaster.ErrNotPrimary
)
// Register registers the broadcaster.
func Register(broadcaster broadcaster.Broadcaster) {
singleton.Set(broadcaster)
}
// GetWithContext gets the broadcaster with context.
func GetWithContext(ctx context.Context) (broadcaster.Broadcaster, error) {
return singleton.GetWithContext(ctx)
}
// StartBroadcastWithResourceKeys starts a broadcast with resource keys.
// Return ErrNotPrimary if the cluster is not primary, so no DDL message can be broadcasted.
func StartBroadcastWithResourceKeys(ctx context.Context, resourceKeys ...message.ResourceKey) (broadcaster.BroadcastAPI, error) {
broadcaster, err := singleton.GetWithContext(ctx)
if err != nil {
return nil, err
}
return broadcaster.WithResourceKeys(ctx, resourceKeys...)
}
// Release releases the broadcaster.
func Release() {
if !singleton.Ready() {
return
}
singleton.Get().Close()
}