mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
fix: timetick interceptor panics when closing write ahead buffer (#40970)
issue: #40967 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
5b78ef0a49
commit
cef1d16454
@ -19,6 +19,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
@ -27,10 +28,8 @@ var _ inspector.TimeTickSyncOperator = &timeTickSyncOperator{}
|
||||
|
||||
// NewTimeTickSyncOperator creates a new time tick sync operator.
|
||||
func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTickSyncOperator {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &timeTickSyncOperator{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
backgroundTaskNotifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
logger: resource.Resource().Logger().With(
|
||||
log.FieldComponent("timetick-sync"),
|
||||
zap.Any("pchannel", param.WALImpls.Channel()),
|
||||
@ -47,8 +46,7 @@ func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTick
|
||||
|
||||
// timeTickSyncOperator is a time tick sync operator.
|
||||
type timeTickSyncOperator struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backgroundTaskNotifier *syncutil.AsyncTaskNotifier[struct{}]
|
||||
|
||||
logger *log.MLogger
|
||||
pchannel types.PChannelInfo // pchannel info belong to.
|
||||
@ -115,12 +113,16 @@ func (impl *timeTickSyncOperator) Sync(ctx context.Context) {
|
||||
}
|
||||
|
||||
// initialize initializes the time tick sync operator.
|
||||
// !!! This method should always be called after the operator is created.
|
||||
// Otherwise, the Close operation will be blocked forever.
|
||||
func (impl *timeTickSyncOperator) initialize() {
|
||||
impl.blockUntilSyncTimeTickReady()
|
||||
}
|
||||
|
||||
// blockUntilSyncTimeTickReady blocks until the first time tick message is sent.
|
||||
func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
|
||||
defer impl.backgroundTaskNotifier.Finish(struct{}{})
|
||||
|
||||
underlyingWALImpls := impl.interceptorBuildParam.WALImpls
|
||||
|
||||
impl.logger.Info("start to sync first time tick")
|
||||
@ -148,8 +150,8 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
|
||||
zap.Error(lastErr),
|
||||
)
|
||||
select {
|
||||
case <-impl.ctx.Done():
|
||||
return impl.ctx.Err()
|
||||
case <-impl.backgroundTaskNotifier.Context().Done():
|
||||
return impl.backgroundTaskNotifier.Context().Err()
|
||||
case <-nextTimer:
|
||||
}
|
||||
}
|
||||
@ -161,7 +163,7 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
|
||||
// And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID
|
||||
// !!! Send a timetick message into walimpls directly is safe.
|
||||
resource.Resource().TSOAllocator().Sync()
|
||||
ts, err := resource.Resource().TSOAllocator().Allocate(impl.ctx)
|
||||
ts, err := resource.Resource().TSOAllocator().Allocate(impl.backgroundTaskNotifier.Context())
|
||||
if err != nil {
|
||||
lastErr = errors.Wrap(err, "allocate timestamp failed")
|
||||
continue
|
||||
@ -171,7 +173,7 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
|
||||
lastErr = errors.Wrap(err, "at build time tick msg")
|
||||
continue
|
||||
}
|
||||
msgID, err := underlyingWALImpls.Append(impl.ctx, msg)
|
||||
msgID, err := underlyingWALImpls.Append(impl.backgroundTaskNotifier.Context(), msg)
|
||||
if err != nil {
|
||||
lastErr = errors.Wrap(err, "send first timestamp message failed")
|
||||
continue
|
||||
@ -221,9 +223,14 @@ func (impl *timeTickSyncOperator) AckManager() *ack.AckManager {
|
||||
|
||||
// Close close the time tick sync operator.
|
||||
func (impl *timeTickSyncOperator) Close() {
|
||||
impl.cancel()
|
||||
// the initialization works at background, so it should be canceled, and wait until it's done.
|
||||
impl.backgroundTaskNotifier.Cancel()
|
||||
impl.backgroundTaskNotifier.BlockUntilFinish()
|
||||
|
||||
impl.metrics.Close()
|
||||
impl.writeAheadBuffer.Close()
|
||||
if impl.writeAheadBuffer != nil {
|
||||
impl.writeAheadBuffer.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// sendTsMsg sends first timestamp message to wal.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user