Zhen Ye 66cc194ab2
enhance: add partition gc at streaming arch (#42179)
issue: #41976

- make drop partition message as a broadcast message.
- add gc when drop partition message is acked.
- add a call back to handle the broadcast message when ack.
- the ack operation of broadcast message will retry until success.

Signed-off-by: chyezh <chyezh@outlook.com>
2025-05-29 23:20:30 +08:00

57 lines
1.7 KiB
Go

package registry
import (
"context"
"fmt"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
// init the message ack callbacks
func init() {
resetMessageAckCallbacks()
}
// resetMessageAckCallbacks resets the message ack callbacks.
func resetMessageAckCallbacks() {
messageAckCallbacks = map[message.MessageType]*syncutil.Future[MessageCallback]{
message.MessageTypeDropPartition: syncutil.NewFuture[MessageCallback](),
}
}
// MessageCallback is the callback function for the message type.
type MessageCallback = func(ctx context.Context, msg message.MutableMessage) error
// messageAckCallbacks is the map of message type to the callback function.
var messageAckCallbacks map[message.MessageType]*syncutil.Future[MessageCallback]
// RegisterMessageAckCallback registers the callback function for the message type.
func RegisterMessageAckCallback(typ message.MessageType, callback MessageCallback) {
future, ok := messageAckCallbacks[typ]
if !ok {
panic(fmt.Sprintf("the future of message callback for type %s is not registered", typ))
}
if future.Ready() {
// only for test, the register callback should be called once and only once
return
}
future.Set(callback)
}
// CallMessageAckCallback calls the callback function for the message type.
func CallMessageAckCallback(ctx context.Context, msg message.MutableMessage) error {
callbackFuture, ok := messageAckCallbacks[msg.MessageType()]
if !ok {
// No callback need tobe called, return nil
return nil
}
callback, err := callbackFuture.GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "when waiting callback registered")
}
return callback(ctx, msg)
}