mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
fix: timetick cannot push forward when upgrading (#42567)
issue #42492 - streamingcoord start before old rootcoord. - streaming balancer will check the node session synchronously to avoid redundant operation when cluster startup. - ddl operation will check if streaming enabled, if the streaming is not enabled, it will use msgstream. - msgstream will initialize if streaming is not enabled, and stop when streaming is enabled. --------- Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
317bbfbf81
commit
af0881ee5d
@ -165,6 +165,11 @@ func (s *mixCoordImpl) initInternal() error {
|
||||
s.datacoordServer.SetMixCoord(s)
|
||||
s.queryCoordServer.SetMixCoord(s)
|
||||
|
||||
if err := s.streamingCoord.Start(s.ctx); err != nil {
|
||||
log.Error("streamCoord start failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.rootcoordServer.Init(); err != nil {
|
||||
log.Error("rootCoord init failed", zap.Error(err))
|
||||
return err
|
||||
@ -175,11 +180,6 @@ func (s *mixCoordImpl) initInternal() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.streamingCoord.Start(s.ctx); err != nil {
|
||||
log.Error("streamCoord start failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.datacoordServer.Init(); err != nil {
|
||||
log.Error("dataCoord init failed", zap.Error(err))
|
||||
return err
|
||||
|
||||
@ -17,6 +17,8 @@ import (
|
||||
|
||||
var StaticStreamingNodeManager = newStreamingNodeManager()
|
||||
|
||||
var ErrStreamingServiceNotReady = errors.New("streaming service is not ready, may be on-upgrading from old arch")
|
||||
|
||||
// TODO: can be removed after streaming service fully manage all growing data.
|
||||
func newStreamingNodeManager() *StreamingNodeManager {
|
||||
snm := &StreamingNodeManager{
|
||||
@ -31,6 +33,34 @@ func newStreamingNodeManager() *StreamingNodeManager {
|
||||
return snm
|
||||
}
|
||||
|
||||
// NewStreamingReadyNotifier creates a new streaming ready notifier.
|
||||
func NewStreamingReadyNotifier() *StreamingReadyNotifier {
|
||||
return &StreamingReadyNotifier{
|
||||
inner: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
}
|
||||
}
|
||||
|
||||
// StreamingReadyNotifier is a notifier for streaming service ready.
|
||||
type StreamingReadyNotifier struct {
|
||||
inner *syncutil.AsyncTaskNotifier[struct{}]
|
||||
}
|
||||
|
||||
// Release releases the notifier.
|
||||
func (s *StreamingReadyNotifier) Release() {
|
||||
s.inner.Finish(struct{}{})
|
||||
}
|
||||
|
||||
// Ready returns a channel that will be closed when the streaming service is ready.
|
||||
func (s *StreamingReadyNotifier) Ready() <-chan struct{} {
|
||||
return s.inner.Context().Done()
|
||||
}
|
||||
|
||||
// IsReady returns true if the streaming service is ready.
|
||||
func (s *StreamingReadyNotifier) IsReady() bool {
|
||||
return s.inner.Context().Err() != nil
|
||||
}
|
||||
|
||||
// Context returns the context of the notifier.
|
||||
// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node.
|
||||
// StreamingNodeManager is exclusive with ResourceManager.
|
||||
type StreamingNodeManager struct {
|
||||
@ -58,13 +88,27 @@ func (s *StreamingNodeManager) GetLatestWALLocated(ctx context.Context, vchannel
|
||||
return serverID, nil
|
||||
}
|
||||
|
||||
// CheckIfStreamingServiceReady checks if the streaming service is ready.
|
||||
func (s *StreamingNodeManager) CheckIfStreamingServiceReady(ctx context.Context) error {
|
||||
n := NewStreamingReadyNotifier()
|
||||
if err := s.RegisterStreamingEnabledListener(ctx, n); err != nil {
|
||||
return err
|
||||
}
|
||||
defer n.Release()
|
||||
if !n.IsReady() {
|
||||
// The notifier is not canceled, so the streaming service is not ready.
|
||||
return ErrStreamingServiceNotReady
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
|
||||
func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *syncutil.AsyncTaskNotifier[struct{}]) error {
|
||||
func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *StreamingReadyNotifier) error {
|
||||
balancer, err := s.balancer.GetWithContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
balancer.RegisterStreamingEnabledNotifier(notifier)
|
||||
balancer.RegisterStreamingEnabledNotifier(notifier.inner)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
@ -62,5 +61,15 @@ func TestStreamingNodeManager(t *testing.T) {
|
||||
streamingNodes = m.GetStreamingQueryNodeIDs()
|
||||
assert.Equal(t, len(streamingNodes), 1)
|
||||
|
||||
assert.NoError(t, m.RegisterStreamingEnabledListener(context.Background(), syncutil.NewAsyncTaskNotifier[struct{}]()))
|
||||
assert.NoError(t, m.RegisterStreamingEnabledListener(context.Background(), NewStreamingReadyNotifier()))
|
||||
}
|
||||
|
||||
func TestStreamingReadyNotifier(t *testing.T) {
|
||||
n := NewStreamingReadyNotifier()
|
||||
assert.False(t, n.IsReady())
|
||||
n.inner.Cancel()
|
||||
<-n.Ready()
|
||||
assert.True(t, n.IsReady())
|
||||
n.Release()
|
||||
assert.True(t, n.IsReady())
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
@ -420,6 +421,19 @@ func (t *createCollectionTask) addChannelsAndGetStartPositions(ctx context.Conte
|
||||
}
|
||||
|
||||
func (t *createCollectionTask) broadcastCreateCollectionMsgIntoStreamingService(ctx context.Context, ts uint64) (map[string][]byte, error) {
|
||||
notifier := snmanager.NewStreamingReadyNotifier()
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !notifier.IsReady() {
|
||||
// streaming service is not ready, so we send it into msgstream.
|
||||
defer notifier.Release()
|
||||
msg := t.genCreateCollectionMsg(ctx, ts)
|
||||
return t.core.chanTimeTick.broadcastMarkDmlChannels(t.channels.physicalChannels, msg)
|
||||
}
|
||||
// streaming service is ready, so we release the ready notifier and send it into streaming service.
|
||||
notifier.Release()
|
||||
|
||||
req := t.genCreateCollectionRequest()
|
||||
// dispatch the createCollectionMsg into all vchannel.
|
||||
msgs := make([]message.MutableMessage, 0, len(req.VirtualChannelNames))
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
@ -120,12 +121,14 @@ func executeCreatePartitionTaskSteps(ctx context.Context,
|
||||
})
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
undoTask.AddStep(&broadcastCreatePartitionMsgStep{
|
||||
baseStep: baseStep{core: core},
|
||||
vchannels: col.VirtualChannelNames,
|
||||
partition: partition,
|
||||
ts: ts,
|
||||
}, &nullStep{})
|
||||
if err := snmanager.StaticStreamingNodeManager.CheckIfStreamingServiceReady(ctx); err == nil {
|
||||
undoTask.AddStep(&broadcastCreatePartitionMsgStep{
|
||||
baseStep: baseStep{core: core},
|
||||
vchannels: col.VirtualChannelNames,
|
||||
partition: partition,
|
||||
ts: ts,
|
||||
}, &nullStep{})
|
||||
}
|
||||
}
|
||||
|
||||
undoTask.AddStep(&nullStep{}, &releasePartitionsStep{
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
@ -182,20 +183,28 @@ func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanName
|
||||
for i, name := range names {
|
||||
var ms msgstream.MsgStream
|
||||
if !streamingutil.IsStreamingServiceEnabled() {
|
||||
var err error
|
||||
ms, err = factory.NewMsgStream(initCtx)
|
||||
if err != nil {
|
||||
log.Ctx(initCtx).Error("Failed to add msgstream",
|
||||
zap.String("name", name),
|
||||
zap.Error(err))
|
||||
panic("Failed to add msgstream")
|
||||
ms = d.newMsgstream(initCtx, factory, name)
|
||||
} else {
|
||||
notifier := snmanager.NewStreamingReadyNotifier()
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(initCtx, notifier); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if params.PreCreatedTopicEnabled.GetAsBool() {
|
||||
d.checkPreCreatedTopic(initCtx, factory, name)
|
||||
logger := log.Ctx(initCtx).With(zap.String("pchannel", name))
|
||||
if !notifier.IsReady() {
|
||||
logger.Info("streaming service is not enabled, create a msgstream to use")
|
||||
ms = d.newMsgstream(initCtx, factory, name)
|
||||
go func() {
|
||||
defer notifier.Release()
|
||||
<-notifier.Ready()
|
||||
// release the msgstream.
|
||||
logger.Info("streaming service is enabled, release the msgstream...")
|
||||
ms.Close()
|
||||
logger.Info("streaming service is enabled, release the msgstream done")
|
||||
}()
|
||||
} else {
|
||||
logger.Info("streaming service has been enabled, msgstream should not be created")
|
||||
notifier.Release()
|
||||
}
|
||||
|
||||
ms.AsProducer(initCtx, []string{name})
|
||||
}
|
||||
dms := &dmlMsgStream{
|
||||
ms: ms,
|
||||
@ -218,6 +227,24 @@ func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanName
|
||||
return d
|
||||
}
|
||||
|
||||
func (d *dmlChannels) newMsgstream(initCtx context.Context, factory msgstream.Factory, name string) msgstream.MsgStream {
|
||||
var err error
|
||||
ms, err := factory.NewMsgStream(initCtx)
|
||||
if err != nil {
|
||||
log.Ctx(initCtx).Error("Failed to add msgstream",
|
||||
zap.String("name", name),
|
||||
zap.Error(err))
|
||||
panic("Failed to add msgstream")
|
||||
}
|
||||
|
||||
if paramtable.Get().CommonCfg.PreCreatedTopicEnabled.GetAsBool() {
|
||||
d.checkPreCreatedTopic(initCtx, factory, name)
|
||||
}
|
||||
|
||||
ms.AsProducer(initCtx, []string{name})
|
||||
return ms
|
||||
}
|
||||
|
||||
func (d *dmlChannels) checkPreCreatedTopic(ctx context.Context, factory msgstream.Factory, name string) {
|
||||
tmpMs, err := factory.NewMsgStream(ctx)
|
||||
if err != nil {
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
@ -168,7 +169,17 @@ func (c *bgGarbageCollector) RemoveCreatingPartition(dbID int64, partition *mode
|
||||
|
||||
func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
return c.notifyCollectionGcByStreamingService(ctx, coll)
|
||||
notifier := snmanager.NewStreamingReadyNotifier()
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if notifier.IsReady() {
|
||||
// streaming service is ready, so we release the ready notifier and send it into streaming service.
|
||||
notifier.Release()
|
||||
return c.notifyCollectionGcByStreamingService(ctx, coll)
|
||||
}
|
||||
// streaming service is not ready, so we send it into msgstream.
|
||||
defer notifier.Release()
|
||||
}
|
||||
|
||||
ts, err := c.s.tsoAllocator.GenerateTSO(1)
|
||||
@ -317,7 +328,19 @@ func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels, vch
|
||||
defer c.s.ddlTsLockManager.Unlock()
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition)
|
||||
notifier := snmanager.NewStreamingReadyNotifier()
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if notifier.IsReady() {
|
||||
// streaming service is ready, so we release the ready notifier and send it into streaming service.
|
||||
notifier.Release()
|
||||
ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition)
|
||||
} else {
|
||||
// streaming service is not ready, so we send it into msgstream with the notifier holding.
|
||||
defer notifier.Release()
|
||||
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
|
||||
}
|
||||
} else {
|
||||
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
|
||||
}
|
||||
|
||||
@ -24,10 +24,12 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
|
||||
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
@ -35,6 +37,8 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
@ -545,6 +549,17 @@ func TestGcPartitionData(t *testing.T) {
|
||||
streamingutil.SetStreamingServiceEnabled()
|
||||
defer streamingutil.UnsetStreamingServiceEnabled()
|
||||
|
||||
snmanager.ResetStreamingNodeManager()
|
||||
b := mock_balancer.NewMockBalancer(t)
|
||||
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run(
|
||||
func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) {
|
||||
<-ctx.Done()
|
||||
})
|
||||
b.EXPECT().RegisterStreamingEnabledNotifier(mock.Anything).Run(func(notifier *syncutil.AsyncTaskNotifier[struct{}]) {
|
||||
notifier.Cancel()
|
||||
})
|
||||
snmanager.StaticStreamingNodeManager.SetBalancerReady(b)
|
||||
|
||||
wal := mock_streaming.NewMockWALAccesser(t)
|
||||
broadcast := mock_streaming.NewMockBroadcast(t)
|
||||
broadcast.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{
|
||||
|
||||
@ -70,7 +70,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
@ -218,15 +217,15 @@ func (c *Core) startTimeTickLoop() {
|
||||
log := log.Ctx(c.ctx)
|
||||
defer c.wg.Done()
|
||||
|
||||
streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]()
|
||||
defer streamingNotifier.Finish(struct{}{})
|
||||
streamingNotifier := snmanager.NewStreamingReadyNotifier()
|
||||
defer streamingNotifier.Release()
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(c.ctx, streamingNotifier); err != nil {
|
||||
log.Info("register streaming enabled listener failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if streamingNotifier.Context().Err() != nil {
|
||||
if streamingNotifier.IsReady() {
|
||||
log.Info("streaming service has been enabled, ddl timetick from rootcoord should not start")
|
||||
return
|
||||
}
|
||||
@ -236,7 +235,7 @@ func (c *Core) startTimeTickLoop() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-streamingNotifier.Context().Done():
|
||||
case <-streamingNotifier.Ready():
|
||||
log.Info("streaming service has been enabled, ddl timetick from rootcoord should stop")
|
||||
return
|
||||
case <-c.ctx.Done():
|
||||
|
||||
@ -36,7 +36,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
@ -272,15 +271,15 @@ func (t *timetickSync) initSessions(sess []*sessionutil.Session) {
|
||||
func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]()
|
||||
defer streamingNotifier.Finish(struct{}{})
|
||||
streamingNotifier := snmanager.NewStreamingReadyNotifier()
|
||||
defer streamingNotifier.Release()
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(t.ctx, streamingNotifier); err != nil {
|
||||
log.Info("register streaming enabled listener failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if streamingNotifier.Context().Err() != nil {
|
||||
if streamingNotifier.IsReady() {
|
||||
log.Info("streaming service has been enabled, proxy timetick from rootcoord should not start")
|
||||
return
|
||||
}
|
||||
@ -295,7 +294,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-streamingNotifier.Context().Done():
|
||||
case <-streamingNotifier.Ready():
|
||||
log.Info("streaming service has been enabled, proxy timetick from rootcoord should stop")
|
||||
return
|
||||
case <-t.ctx.Done():
|
||||
@ -349,9 +348,6 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
|
||||
// SendTimeTickToChannel send each channel's min timetick to msg stream
|
||||
func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error {
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
return nil
|
||||
}
|
||||
func() {
|
||||
sub := tsoutil.SubByNow(ts)
|
||||
for _, chanName := range chanNames {
|
||||
|
||||
@ -21,6 +21,10 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
versionChecker260 = "<2.6.0-dev"
|
||||
)
|
||||
|
||||
// RecoverBalancer recover the balancer working.
|
||||
func RecoverBalancer(
|
||||
ctx context.Context,
|
||||
@ -48,7 +52,11 @@ func RecoverBalancer(
|
||||
backgroundTaskNotifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
}
|
||||
b.SetLogger(logger)
|
||||
go b.execute()
|
||||
ready260Future, err := b.checkIfAllNodeGreaterThan260AndWatch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go b.execute(ready260Future)
|
||||
return b, nil
|
||||
}
|
||||
|
||||
@ -132,13 +140,12 @@ func (b *balancerImpl) Close() {
|
||||
}
|
||||
|
||||
// execute the balancer.
|
||||
func (b *balancerImpl) execute() {
|
||||
func (b *balancerImpl) execute(ready260Future *syncutil.Future[error]) {
|
||||
b.Logger().Info("balancer start to execute")
|
||||
defer func() {
|
||||
b.backgroundTaskNotifier.Finish(struct{}{})
|
||||
b.Logger().Info("balancer execute finished")
|
||||
}()
|
||||
ready260Future := b.blockUntilAllNodeIsGreaterThan260(b.ctx)
|
||||
|
||||
balanceTimer := typeutil.NewBackoffTimer(&backoffConfigFetcher{})
|
||||
nodeChanged, err := resource.Resource().StreamingNodeManagerClient().WatchNodeChanged(b.backgroundTaskNotifier.Context())
|
||||
@ -202,50 +209,94 @@ func (b *balancerImpl) execute() {
|
||||
}
|
||||
}
|
||||
|
||||
// blockUntilAllNodeIsGreaterThan260 block until all node is greater than 2.6.0.
|
||||
// It's just a protection, but didn't promised that there will never be a node with version < 2.6.0 join the cluster.
|
||||
// These promise can only be achieved by the cluster dev-ops.
|
||||
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) *syncutil.Future[error] {
|
||||
// checkIfAllNodeGreaterThan260AndWatch check if all node is greater than 2.6.0.
|
||||
// It will return a future if there's any node with version < 2.6.0,
|
||||
// and the future will be set when the all node version is greater than 2.6.0.
|
||||
func (b *balancerImpl) checkIfAllNodeGreaterThan260AndWatch(ctx context.Context) (*syncutil.Future[error], error) {
|
||||
f := syncutil.NewFuture[error]()
|
||||
if b.channelMetaManager.IsStreamingEnabledOnce() {
|
||||
// Once the streaming is enabled, we can not check the node version anymore.
|
||||
// because the first channel-assignment is generated after the old node is down.
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if greaterThan260, err := b.checkIfAllNodeGreaterThan260(ctx); err != nil || greaterThan260 {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
err := b.blockUntilAllNodeIsGreaterThan260AtBackground(ctx)
|
||||
f.Set(err)
|
||||
}()
|
||||
return f
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// checkIfAllNodeGreaterThan260 check if all node is greater than 2.6.0.
|
||||
func (b *balancerImpl) checkIfAllNodeGreaterThan260(ctx context.Context) (bool, error) {
|
||||
expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole, typeutil.QueryNodeRole}
|
||||
for _, role := range expectedRoles {
|
||||
if greaterThan260, err := b.checkIfRoleGreaterThan260(ctx, role); err != nil || !greaterThan260 {
|
||||
return greaterThan260, err
|
||||
}
|
||||
}
|
||||
b.Logger().Info("all nodes is greater than 2.6.0 when checking")
|
||||
return true, b.channelMetaManager.MarkStreamingHasEnabled(ctx)
|
||||
}
|
||||
|
||||
// checkIfRoleGreaterThan260 check if the role is greater than 2.6.0.
|
||||
func (b *balancerImpl) checkIfRoleGreaterThan260(ctx context.Context, role string) (bool, error) {
|
||||
logger := b.Logger().With(zap.String("role", role))
|
||||
rb := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), versionChecker260)
|
||||
defer rb.Close()
|
||||
|
||||
r := rb.Resolver()
|
||||
state, err := r.GetLatestState(ctx)
|
||||
if err != nil {
|
||||
logger.Warn("fail to get latest state", zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
if len(state.Sessions()) > 0 {
|
||||
logger.Info("node is not greater than 2.6.0 when checking", zap.Int("sessionCount", len(state.Sessions())))
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// blockUntilAllNodeIsGreaterThan260AtBackground block until all node is greater than 2.6.0 at background.
|
||||
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260AtBackground(ctx context.Context) error {
|
||||
doneErr := errors.New("done")
|
||||
expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole, typeutil.QueryNodeRole}
|
||||
for _, role := range expectedRoles {
|
||||
logger := b.Logger().With(zap.String("role", role))
|
||||
logger.Info("start to wait that the nodes is greater than 2.6.0")
|
||||
// Check if there's any proxy or data node with version < 2.6.0.
|
||||
rosolver := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), "<2.6.0-dev")
|
||||
r := rosolver.Resolver()
|
||||
err := r.Watch(ctx, func(vs resolver.VersionedState) error {
|
||||
if len(vs.Sessions()) == 0 {
|
||||
return doneErr
|
||||
}
|
||||
logger.Info("session changes", zap.Int("sessionCount", len(vs.Sessions())))
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, doneErr) {
|
||||
logger.Info("fail to wait that the nodes is greater than 2.6.0", zap.Error(err))
|
||||
if err := b.blockUntilRoleGreaterThan260AtBackground(ctx, role); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Info("all nodes is greater than 2.6.0")
|
||||
rosolver.Close()
|
||||
}
|
||||
return b.channelMetaManager.MarkStreamingHasEnabled(ctx)
|
||||
}
|
||||
|
||||
// blockUntilRoleGreaterThan260AtBackground block until the role is greater than 2.6.0 at background.
|
||||
func (b *balancerImpl) blockUntilRoleGreaterThan260AtBackground(ctx context.Context, role string) error {
|
||||
doneErr := errors.New("done")
|
||||
logger := b.Logger().With(zap.String("role", role))
|
||||
logger.Info("start to wait that the nodes is greater than 2.6.0")
|
||||
// Check if there's any proxy or data node with version < 2.6.0.
|
||||
rb := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), versionChecker260)
|
||||
defer rb.Close()
|
||||
|
||||
r := rb.Resolver()
|
||||
err := r.Watch(ctx, func(vs resolver.VersionedState) error {
|
||||
if len(vs.Sessions()) == 0 {
|
||||
return doneErr
|
||||
}
|
||||
logger.Info("session changes", zap.Int("sessionCount", len(vs.Sessions())))
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, doneErr) {
|
||||
logger.Info("fail to wait that the nodes is greater than 2.6.0", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
logger.Info("all nodes is greater than 2.6.0 when watching")
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyAllRequest apply all request in the request channel.
|
||||
func (b *balancerImpl) applyAllRequest() {
|
||||
for {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user