fix: Fix shard interceptor incorrectly skip flushallmsg (#47003)

issue: https://github.com/milvus-io/milvus/issues/46799

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2026-01-13 10:07:26 +08:00 committed by GitHub
parent b9e9c54843
commit 30d8f9804a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 11 additions and 13 deletions

View File

@ -114,7 +114,7 @@ func (impl *flusherComponents) WhenDropCollection(vchannel string) {
// HandleMessage handles the plain message.
func (impl *flusherComponents) HandleMessage(ctx context.Context, msg message.ImmutableMessage) error {
vchannel := msg.VChannel()
if vchannel == "" || isBroadcastToAllMessage(msg.MessageType()) {
if vchannel == "" || msg.MessageType().IsBroadcastToAll() {
return impl.broadcastToAllDataSyncService(ctx, msg)
}
if _, ok := impl.dataServices[vchannel]; !ok {

View File

@ -18,16 +18,6 @@ import (
var defaultCollectionNotFoundTolerance = 10
var broadcastToAllMessageType = map[message.MessageType]struct{}{
message.MessageTypeFlushAll: {},
}
// isBroadcastToAllMessage checks if the message need to be broadcast to all data sync services.
func isBroadcastToAllMessage(msg message.MessageType) bool {
_, ok := broadcastToAllMessageType[msg]
return ok
}
// getRecoveryInfos gets the recovery info of the vchannels from datacoord
func (impl *WALFlusherImpl) getRecoveryInfos(ctx context.Context, vchannel []string) (map[string]*datapb.GetChannelRecoveryInfoResponse, message.MessageID, error) {
futures := make([]*conc.Future[interface{}], 0, len(vchannel))

View File

@ -248,7 +248,7 @@ func (impl *WALFlusherImpl) dispatch(msg message.ImmutableMessage) (err error) {
}
// wal flusher will not handle the control channel message.
if funcutil.IsControlChannel(msg.VChannel()) && !isBroadcastToAllMessage(msg.MessageType()) {
if funcutil.IsControlChannel(msg.VChannel()) && !msg.MessageType().IsBroadcastToAll() {
return nil
}

View File

@ -57,7 +57,7 @@ func (impl *shardInterceptor) Name() string {
// DoAppend assigns segment for every partition in the message.
func (impl *shardInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (msgID message.MessageID, err error) {
op, ok := impl.ops[msg.MessageType()]
if ok && !funcutil.IsControlChannel(msg.VChannel()) {
if ok && (!funcutil.IsControlChannel(msg.VChannel()) || msg.MessageType().IsBroadcastToAll()) {
// If the message type is registered in the interceptor, use the registered operation.
// control channel message is only used to determine the DDL/DCL order,
// perform no effect on the shard manager, so skip it.

View File

@ -21,6 +21,8 @@ type MessageTypeProperties struct {
ExclusiveRequired bool
// a cipher enabled message type will be encrypted before appending to the wal if cipher is enabled.
CipherEnabled bool
// A broadcast to all message type is a message that will be broadcasted to all vchannels.
BroadcastToAll bool
}
var messageTypePropertiesMap = map[MessageType]MessageTypeProperties{
@ -112,6 +114,7 @@ var messageTypePropertiesMap = map[MessageType]MessageTypeProperties{
MessageTypeDropIndex: {},
MessageTypeFlushAll: {
ExclusiveRequired: true,
BroadcastToAll: true,
},
}
@ -154,6 +157,11 @@ func (t MessageType) IsSelfControlled() bool {
return messageTypePropertiesMap[t].SelfControlled
}
// IsBroadcastToAll checks if the MessageType is broadcast to all.
func (t MessageType) IsBroadcastToAll() bool {
return messageTypePropertiesMap[t].BroadcastToAll
}
// LogLevel returns the log level of the MessageType.
func (t MessageType) LogLevel() zapcore.Level {
return messageTypePropertiesMap[t].LogLevel