From af0881ee5d44527d2e789322ae5d57b48cdb4161 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 10 Jun 2025 14:52:42 +0800 Subject: [PATCH] fix: timetick cannot push forward when upgrading (#42567) issue #42492 - streamingcoord start before old rootcoord. - streaming balancer will check the node session synchronously to avoid redundant operation when cluster startup. - ddl operation will check if streaming enabled, if the streaming is not enabled, it will use msgstream. - msgstream will initialize if streaming is not enabled, and stop when streaming is enabled. --------- Signed-off-by: chyezh --- internal/coordinator/mix_coord.go | 10 +- .../snmanager/streaming_node_manager.go | 48 +++++++- .../snmanager/streaming_node_manager_test.go | 13 ++- internal/rootcoord/create_collection_task.go | 14 +++ internal/rootcoord/create_partition_task.go | 15 ++- internal/rootcoord/dml_channels.go | 51 +++++++-- internal/rootcoord/garbage_collector.go | 27 ++++- internal/rootcoord/garbage_collector_test.go | 15 +++ internal/rootcoord/root_coord.go | 9 +- internal/rootcoord/timeticksync.go | 12 +- .../server/balancer/balancer_impl.go | 103 +++++++++++++----- 11 files changed, 249 insertions(+), 68 deletions(-) diff --git a/internal/coordinator/mix_coord.go b/internal/coordinator/mix_coord.go index e6b5078cf2..6a5c6feece 100644 --- a/internal/coordinator/mix_coord.go +++ b/internal/coordinator/mix_coord.go @@ -165,6 +165,11 @@ func (s *mixCoordImpl) initInternal() error { s.datacoordServer.SetMixCoord(s) s.queryCoordServer.SetMixCoord(s) + if err := s.streamingCoord.Start(s.ctx); err != nil { + log.Error("streamCoord start failed", zap.Error(err)) + return err + } + if err := s.rootcoordServer.Init(); err != nil { log.Error("rootCoord init failed", zap.Error(err)) return err @@ -175,11 +180,6 @@ func (s *mixCoordImpl) initInternal() error { return err } - if err := s.streamingCoord.Start(s.ctx); err != nil { - log.Error("streamCoord start failed", zap.Error(err)) - return err - } - if err := s.datacoordServer.Init(); err != nil { log.Error("dataCoord init failed", zap.Error(err)) return err diff --git a/internal/coordinator/snmanager/streaming_node_manager.go b/internal/coordinator/snmanager/streaming_node_manager.go index ebfed0509a..1b752a7a4b 100644 --- a/internal/coordinator/snmanager/streaming_node_manager.go +++ b/internal/coordinator/snmanager/streaming_node_manager.go @@ -17,6 +17,8 @@ import ( var StaticStreamingNodeManager = newStreamingNodeManager() +var ErrStreamingServiceNotReady = errors.New("streaming service is not ready, may be on-upgrading from old arch") + // TODO: can be removed after streaming service fully manage all growing data. func newStreamingNodeManager() *StreamingNodeManager { snm := &StreamingNodeManager{ @@ -31,6 +33,34 @@ func newStreamingNodeManager() *StreamingNodeManager { return snm } +// NewStreamingReadyNotifier creates a new streaming ready notifier. +func NewStreamingReadyNotifier() *StreamingReadyNotifier { + return &StreamingReadyNotifier{ + inner: syncutil.NewAsyncTaskNotifier[struct{}](), + } +} + +// StreamingReadyNotifier is a notifier for streaming service ready. +type StreamingReadyNotifier struct { + inner *syncutil.AsyncTaskNotifier[struct{}] +} + +// Release releases the notifier. +func (s *StreamingReadyNotifier) Release() { + s.inner.Finish(struct{}{}) +} + +// Ready returns a channel that will be closed when the streaming service is ready. +func (s *StreamingReadyNotifier) Ready() <-chan struct{} { + return s.inner.Context().Done() +} + +// IsReady returns true if the streaming service is ready. +func (s *StreamingReadyNotifier) IsReady() bool { + return s.inner.Context().Err() != nil +} + +// Context returns the context of the notifier. // StreamingNodeManager is a manager for manage the querynode that embedded into streaming node. // StreamingNodeManager is exclusive with ResourceManager. type StreamingNodeManager struct { @@ -58,13 +88,27 @@ func (s *StreamingNodeManager) GetLatestWALLocated(ctx context.Context, vchannel return serverID, nil } +// CheckIfStreamingServiceReady checks if the streaming service is ready. +func (s *StreamingNodeManager) CheckIfStreamingServiceReady(ctx context.Context) error { + n := NewStreamingReadyNotifier() + if err := s.RegisterStreamingEnabledListener(ctx, n); err != nil { + return err + } + defer n.Release() + if !n.IsReady() { + // The notifier is not canceled, so the streaming service is not ready. + return ErrStreamingServiceNotReady + } + return nil +} + // RegisterStreamingEnabledNotifier registers a notifier into the balancer. -func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *syncutil.AsyncTaskNotifier[struct{}]) error { +func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *StreamingReadyNotifier) error { balancer, err := s.balancer.GetWithContext(ctx) if err != nil { return err } - balancer.RegisterStreamingEnabledNotifier(notifier) + balancer.RegisterStreamingEnabledNotifier(notifier.inner) return nil } diff --git a/internal/coordinator/snmanager/streaming_node_manager_test.go b/internal/coordinator/snmanager/streaming_node_manager_test.go index 333f699cb6..84936a586e 100644 --- a/internal/coordinator/snmanager/streaming_node_manager_test.go +++ b/internal/coordinator/snmanager/streaming_node_manager_test.go @@ -9,7 +9,6 @@ import ( "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" - "github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -62,5 +61,15 @@ func TestStreamingNodeManager(t *testing.T) { streamingNodes = m.GetStreamingQueryNodeIDs() assert.Equal(t, len(streamingNodes), 1) - assert.NoError(t, m.RegisterStreamingEnabledListener(context.Background(), syncutil.NewAsyncTaskNotifier[struct{}]())) + assert.NoError(t, m.RegisterStreamingEnabledListener(context.Background(), NewStreamingReadyNotifier())) +} + +func TestStreamingReadyNotifier(t *testing.T) { + n := NewStreamingReadyNotifier() + assert.False(t, n.IsReady()) + n.inner.Cancel() + <-n.Ready() + assert.True(t, n.IsReady()) + n.Release() + assert.True(t, n.IsReady()) } diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 35df045f4e..19a8895ad4 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/proxyutil" @@ -420,6 +421,19 @@ func (t *createCollectionTask) addChannelsAndGetStartPositions(ctx context.Conte } func (t *createCollectionTask) broadcastCreateCollectionMsgIntoStreamingService(ctx context.Context, ts uint64) (map[string][]byte, error) { + notifier := snmanager.NewStreamingReadyNotifier() + if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil { + return nil, err + } + if !notifier.IsReady() { + // streaming service is not ready, so we send it into msgstream. + defer notifier.Release() + msg := t.genCreateCollectionMsg(ctx, ts) + return t.core.chanTimeTick.broadcastMarkDmlChannels(t.channels.physicalChannels, msg) + } + // streaming service is ready, so we release the ready notifier and send it into streaming service. + notifier.Release() + req := t.genCreateCollectionRequest() // dispatch the createCollectionMsg into all vchannel. msgs := make([]message.MutableMessage, 0, len(req.VirtualChannelNames)) diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index a6ace87ace..92a1e28833 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -120,12 +121,14 @@ func executeCreatePartitionTaskSteps(ctx context.Context, }) if streamingutil.IsStreamingServiceEnabled() { - undoTask.AddStep(&broadcastCreatePartitionMsgStep{ - baseStep: baseStep{core: core}, - vchannels: col.VirtualChannelNames, - partition: partition, - ts: ts, - }, &nullStep{}) + if err := snmanager.StaticStreamingNodeManager.CheckIfStreamingServiceReady(ctx); err == nil { + undoTask.AddStep(&broadcastCreatePartitionMsgStep{ + baseStep: baseStep{core: core}, + vchannels: col.VirtualChannelNames, + partition: partition, + ts: ts, + }, &nullStep{}) + } } undoTask.AddStep(&nullStep{}, &releasePartitionsStep{ diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 9e10fc1233..0d9e843fdb 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/errors" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" @@ -182,20 +183,28 @@ func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanName for i, name := range names { var ms msgstream.MsgStream if !streamingutil.IsStreamingServiceEnabled() { - var err error - ms, err = factory.NewMsgStream(initCtx) - if err != nil { - log.Ctx(initCtx).Error("Failed to add msgstream", - zap.String("name", name), - zap.Error(err)) - panic("Failed to add msgstream") + ms = d.newMsgstream(initCtx, factory, name) + } else { + notifier := snmanager.NewStreamingReadyNotifier() + if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(initCtx, notifier); err != nil { + panic(err) } - - if params.PreCreatedTopicEnabled.GetAsBool() { - d.checkPreCreatedTopic(initCtx, factory, name) + logger := log.Ctx(initCtx).With(zap.String("pchannel", name)) + if !notifier.IsReady() { + logger.Info("streaming service is not enabled, create a msgstream to use") + ms = d.newMsgstream(initCtx, factory, name) + go func() { + defer notifier.Release() + <-notifier.Ready() + // release the msgstream. + logger.Info("streaming service is enabled, release the msgstream...") + ms.Close() + logger.Info("streaming service is enabled, release the msgstream done") + }() + } else { + logger.Info("streaming service has been enabled, msgstream should not be created") + notifier.Release() } - - ms.AsProducer(initCtx, []string{name}) } dms := &dmlMsgStream{ ms: ms, @@ -218,6 +227,24 @@ func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanName return d } +func (d *dmlChannels) newMsgstream(initCtx context.Context, factory msgstream.Factory, name string) msgstream.MsgStream { + var err error + ms, err := factory.NewMsgStream(initCtx) + if err != nil { + log.Ctx(initCtx).Error("Failed to add msgstream", + zap.String("name", name), + zap.Error(err)) + panic("Failed to add msgstream") + } + + if paramtable.Get().CommonCfg.PreCreatedTopicEnabled.GetAsBool() { + d.checkPreCreatedTopic(initCtx, factory, name) + } + + ms.AsProducer(initCtx, []string{name}) + return ms +} + func (d *dmlChannels) checkPreCreatedTopic(ctx context.Context, factory msgstream.Factory, name string) { tmpMs, err := factory.NewMsgStream(ctx) if err != nil { diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index 5164541cce..1e66e8959c 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -168,7 +169,17 @@ func (c *bgGarbageCollector) RemoveCreatingPartition(dbID int64, partition *mode func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) { if streamingutil.IsStreamingServiceEnabled() { - return c.notifyCollectionGcByStreamingService(ctx, coll) + notifier := snmanager.NewStreamingReadyNotifier() + if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil { + return 0, err + } + if notifier.IsReady() { + // streaming service is ready, so we release the ready notifier and send it into streaming service. + notifier.Release() + return c.notifyCollectionGcByStreamingService(ctx, coll) + } + // streaming service is not ready, so we send it into msgstream. + defer notifier.Release() } ts, err := c.s.tsoAllocator.GenerateTSO(1) @@ -317,7 +328,19 @@ func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels, vch defer c.s.ddlTsLockManager.Unlock() if streamingutil.IsStreamingServiceEnabled() { - ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition) + notifier := snmanager.NewStreamingReadyNotifier() + if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(ctx, notifier); err != nil { + return 0, err + } + if notifier.IsReady() { + // streaming service is ready, so we release the ready notifier and send it into streaming service. + notifier.Release() + ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition) + } else { + // streaming service is not ready, so we send it into msgstream with the notifier holding. + defer notifier.Release() + ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition) + } } else { ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition) } diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index f9ec72c651..82add0095c 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -24,10 +24,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" + "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" mocktso "github.com/milvus-io/milvus/internal/tso/mocks" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -35,6 +37,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { @@ -545,6 +549,17 @@ func TestGcPartitionData(t *testing.T) { streamingutil.SetStreamingServiceEnabled() defer streamingutil.UnsetStreamingServiceEnabled() + snmanager.ResetStreamingNodeManager() + b := mock_balancer.NewMockBalancer(t) + b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run( + func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) { + <-ctx.Done() + }) + b.EXPECT().RegisterStreamingEnabledNotifier(mock.Anything).Run(func(notifier *syncutil.AsyncTaskNotifier[struct{}]) { + notifier.Cancel() + }) + snmanager.StaticStreamingNodeManager.SetBalancerReady(b) + wal := mock_streaming.NewMockWALAccesser(t) broadcast := mock_streaming.NewMockBroadcast(t) broadcast.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{ diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a86273ed8c..94d51b42ff 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -70,7 +70,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/retry" - "github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -218,15 +217,15 @@ func (c *Core) startTimeTickLoop() { log := log.Ctx(c.ctx) defer c.wg.Done() - streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]() - defer streamingNotifier.Finish(struct{}{}) + streamingNotifier := snmanager.NewStreamingReadyNotifier() + defer streamingNotifier.Release() if streamingutil.IsStreamingServiceEnabled() { if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(c.ctx, streamingNotifier); err != nil { log.Info("register streaming enabled listener failed", zap.Error(err)) return } - if streamingNotifier.Context().Err() != nil { + if streamingNotifier.IsReady() { log.Info("streaming service has been enabled, ddl timetick from rootcoord should not start") return } @@ -236,7 +235,7 @@ func (c *Core) startTimeTickLoop() { defer ticker.Stop() for { select { - case <-streamingNotifier.Context().Done(): + case <-streamingNotifier.Ready(): log.Info("streaming service has been enabled, ddl timetick from rootcoord should stop") return case <-c.ctx.Done(): diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 0854009bdf..df5b009b11 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -36,7 +36,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -272,15 +271,15 @@ func (t *timetickSync) initSessions(sess []*sessionutil.Session) { func (t *timetickSync) startWatch(wg *sync.WaitGroup) { defer wg.Done() - streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]() - defer streamingNotifier.Finish(struct{}{}) + streamingNotifier := snmanager.NewStreamingReadyNotifier() + defer streamingNotifier.Release() if streamingutil.IsStreamingServiceEnabled() { if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(t.ctx, streamingNotifier); err != nil { log.Info("register streaming enabled listener failed", zap.Error(err)) return } - if streamingNotifier.Context().Err() != nil { + if streamingNotifier.IsReady() { log.Info("streaming service has been enabled, proxy timetick from rootcoord should not start") return } @@ -295,7 +294,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { for { select { - case <-streamingNotifier.Context().Done(): + case <-streamingNotifier.Ready(): log.Info("streaming service has been enabled, proxy timetick from rootcoord should stop") return case <-t.ctx.Done(): @@ -349,9 +348,6 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { // SendTimeTickToChannel send each channel's min timetick to msg stream func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { - if streamingutil.IsStreamingServiceEnabled() { - return nil - } func() { sub := tsoutil.SubByNow(ts) for _, chanName := range chanNames { diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index 07a3250a6a..b9a5c65edd 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -21,6 +21,10 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +const ( + versionChecker260 = "<2.6.0-dev" +) + // RecoverBalancer recover the balancer working. func RecoverBalancer( ctx context.Context, @@ -48,7 +52,11 @@ func RecoverBalancer( backgroundTaskNotifier: syncutil.NewAsyncTaskNotifier[struct{}](), } b.SetLogger(logger) - go b.execute() + ready260Future, err := b.checkIfAllNodeGreaterThan260AndWatch(ctx) + if err != nil { + return nil, err + } + go b.execute(ready260Future) return b, nil } @@ -132,13 +140,12 @@ func (b *balancerImpl) Close() { } // execute the balancer. -func (b *balancerImpl) execute() { +func (b *balancerImpl) execute(ready260Future *syncutil.Future[error]) { b.Logger().Info("balancer start to execute") defer func() { b.backgroundTaskNotifier.Finish(struct{}{}) b.Logger().Info("balancer execute finished") }() - ready260Future := b.blockUntilAllNodeIsGreaterThan260(b.ctx) balanceTimer := typeutil.NewBackoffTimer(&backoffConfigFetcher{}) nodeChanged, err := resource.Resource().StreamingNodeManagerClient().WatchNodeChanged(b.backgroundTaskNotifier.Context()) @@ -202,50 +209,94 @@ func (b *balancerImpl) execute() { } } -// blockUntilAllNodeIsGreaterThan260 block until all node is greater than 2.6.0. -// It's just a protection, but didn't promised that there will never be a node with version < 2.6.0 join the cluster. -// These promise can only be achieved by the cluster dev-ops. -func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) *syncutil.Future[error] { +// checkIfAllNodeGreaterThan260AndWatch check if all node is greater than 2.6.0. +// It will return a future if there's any node with version < 2.6.0, +// and the future will be set when the all node version is greater than 2.6.0. +func (b *balancerImpl) checkIfAllNodeGreaterThan260AndWatch(ctx context.Context) (*syncutil.Future[error], error) { f := syncutil.NewFuture[error]() if b.channelMetaManager.IsStreamingEnabledOnce() { // Once the streaming is enabled, we can not check the node version anymore. // because the first channel-assignment is generated after the old node is down. - return nil + return nil, nil + } + + if greaterThan260, err := b.checkIfAllNodeGreaterThan260(ctx); err != nil || greaterThan260 { + return nil, err } go func() { err := b.blockUntilAllNodeIsGreaterThan260AtBackground(ctx) f.Set(err) }() - return f + return f, nil +} + +// checkIfAllNodeGreaterThan260 check if all node is greater than 2.6.0. +func (b *balancerImpl) checkIfAllNodeGreaterThan260(ctx context.Context) (bool, error) { + expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole, typeutil.QueryNodeRole} + for _, role := range expectedRoles { + if greaterThan260, err := b.checkIfRoleGreaterThan260(ctx, role); err != nil || !greaterThan260 { + return greaterThan260, err + } + } + b.Logger().Info("all nodes is greater than 2.6.0 when checking") + return true, b.channelMetaManager.MarkStreamingHasEnabled(ctx) +} + +// checkIfRoleGreaterThan260 check if the role is greater than 2.6.0. +func (b *balancerImpl) checkIfRoleGreaterThan260(ctx context.Context, role string) (bool, error) { + logger := b.Logger().With(zap.String("role", role)) + rb := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), versionChecker260) + defer rb.Close() + + r := rb.Resolver() + state, err := r.GetLatestState(ctx) + if err != nil { + logger.Warn("fail to get latest state", zap.Error(err)) + return false, err + } + if len(state.Sessions()) > 0 { + logger.Info("node is not greater than 2.6.0 when checking", zap.Int("sessionCount", len(state.Sessions()))) + return false, nil + } + return true, nil } // blockUntilAllNodeIsGreaterThan260AtBackground block until all node is greater than 2.6.0 at background. func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260AtBackground(ctx context.Context) error { - doneErr := errors.New("done") expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole, typeutil.QueryNodeRole} for _, role := range expectedRoles { - logger := b.Logger().With(zap.String("role", role)) - logger.Info("start to wait that the nodes is greater than 2.6.0") - // Check if there's any proxy or data node with version < 2.6.0. - rosolver := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), "<2.6.0-dev") - r := rosolver.Resolver() - err := r.Watch(ctx, func(vs resolver.VersionedState) error { - if len(vs.Sessions()) == 0 { - return doneErr - } - logger.Info("session changes", zap.Int("sessionCount", len(vs.Sessions()))) - return nil - }) - if err != nil && !errors.Is(err, doneErr) { - logger.Info("fail to wait that the nodes is greater than 2.6.0", zap.Error(err)) + if err := b.blockUntilRoleGreaterThan260AtBackground(ctx, role); err != nil { return err } - logger.Info("all nodes is greater than 2.6.0") - rosolver.Close() } return b.channelMetaManager.MarkStreamingHasEnabled(ctx) } +// blockUntilRoleGreaterThan260AtBackground block until the role is greater than 2.6.0 at background. +func (b *balancerImpl) blockUntilRoleGreaterThan260AtBackground(ctx context.Context, role string) error { + doneErr := errors.New("done") + logger := b.Logger().With(zap.String("role", role)) + logger.Info("start to wait that the nodes is greater than 2.6.0") + // Check if there's any proxy or data node with version < 2.6.0. + rb := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), versionChecker260) + defer rb.Close() + + r := rb.Resolver() + err := r.Watch(ctx, func(vs resolver.VersionedState) error { + if len(vs.Sessions()) == 0 { + return doneErr + } + logger.Info("session changes", zap.Int("sessionCount", len(vs.Sessions()))) + return nil + }) + if err != nil && !errors.Is(err, doneErr) { + logger.Info("fail to wait that the nodes is greater than 2.6.0", zap.Error(err)) + return err + } + logger.Info("all nodes is greater than 2.6.0 when watching") + return nil +} + // applyAllRequest apply all request in the request channel. func (b *balancerImpl) applyAllRequest() { for {