mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
issue: #43897 - Return LastConfirmedMessageID when wal append operation. - Add resource-key-based locker for broadcast-ack operation to protect the coord state when executing ddl. - Resource-key-based locker is held until the broadcast operation is acked. - ResourceKey support shared and exclusive lock. - Add FastAck execute ack right away after the broadcast done to speed up ddl. - Ack callback will support broadcast message result now. - Add tombstone for broadcaster to avoid to repeatedly commit DDL and ABA issue. --------- Signed-off-by: chyezh <chyezh@outlook.com>
105 lines
3.3 KiB
Go
105 lines
3.3 KiB
Go
package kv
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
)
|
|
|
|
var _ MetaKv = (*ReliableWriteMetaKv)(nil)
|
|
|
|
// NewReliableWriteMetaKv returns a new ReliableWriteMetaKv if the kv is not a ReliableWriteMetaKv.
|
|
func NewReliableWriteMetaKv(kv MetaKv) MetaKv {
|
|
if _, ok := kv.(*ReliableWriteMetaKv); ok {
|
|
return kv
|
|
}
|
|
return &ReliableWriteMetaKv{
|
|
Binder: log.Binder{},
|
|
MetaKv: kv,
|
|
}
|
|
}
|
|
|
|
// ReliableWriteMetaKv is a wrapper of MetaKv that ensures the data is written reliably.
|
|
// It will retry the metawrite operation until the data is written successfully or the context is timeout.
|
|
// It's useful to promise the meta data is consistent in memory and underlying meta storage.
|
|
type ReliableWriteMetaKv struct {
|
|
log.Binder
|
|
MetaKv
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) Save(ctx context.Context, key, value string) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.Save(ctx, key, value)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) MultiSave(ctx context.Context, kvs map[string]string) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.MultiSave(ctx, kvs)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) Remove(ctx context.Context, key string) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.Remove(ctx, key)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) MultiRemove(ctx context.Context, keys []string) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.MultiRemove(ctx, keys)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) MultiSaveAndRemove(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.MultiSaveAndRemove(ctx, saves, removals, preds...)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error {
|
|
return kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
return kv.MetaKv.MultiSaveAndRemoveWithPrefix(ctx, saves, removals, preds...)
|
|
})
|
|
}
|
|
|
|
func (kv *ReliableWriteMetaKv) CompareVersionAndSwap(ctx context.Context, key string, version int64, target string) (bool, error) {
|
|
var result bool
|
|
err := kv.retryWithBackoff(ctx, func(ctx context.Context) error {
|
|
var err error
|
|
result, err = kv.MetaKv.CompareVersionAndSwap(ctx, key, version, target)
|
|
return err
|
|
})
|
|
return result, err
|
|
}
|
|
|
|
// retryWithBackoff retries the function with backoff.
|
|
func (kv *ReliableWriteMetaKv) retryWithBackoff(ctx context.Context, fn func(ctx context.Context) error) error {
|
|
backoff := backoff.NewExponentialBackOff()
|
|
backoff.InitialInterval = 10 * time.Millisecond
|
|
backoff.MaxInterval = 1 * time.Second
|
|
backoff.MaxElapsedTime = 0
|
|
backoff.Reset()
|
|
for {
|
|
err := fn(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
nextInterval := backoff.NextBackOff()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(nextInterval):
|
|
kv.Logger().Warn("failed to persist operation, wait for retry...", zap.Duration("nextRetryInterval", nextInterval), zap.Error(err))
|
|
}
|
|
}
|
|
}
|