diff --git a/configs/milvus.yaml b/configs/milvus.yaml index bf1e67d820..b76b68b6e4 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1204,6 +1204,9 @@ streaming: # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration backoffInitialInterval: 50ms backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default + # The timeout of wal balancer operation, 10s by default. + # If the operation exceeds this timeout, it will be canceled. + operationTimeout: 10s balancePolicy: name: vchannelFair # The multiplier of balance task trigger backoff, 2 by default vchannelFair: diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index f27d3b55bc..c80c567d39 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -79,8 +79,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni ) for _, s := range segments { if (partitionID > allPartitionID && s.PartitionID != partitionID) || - (s.GetState() != commonpb.SegmentState_Growing && s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { - // empty growing segment don't have dml position and start position + ((s.GetState() != commonpb.SegmentState_Growing && s.GetState() != commonpb.SegmentState_Sealed) && s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { + // empty growing and sealed segment don't have dml position and start position // and it should be recovered for streamingnode, so we add the state-filter here. continue } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index e2f6d2f4b5..e386fbec93 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -562,6 +562,9 @@ func (node *QueryNode) Stop() error { if node.pipelineManager != nil { channelNum = node.pipelineManager.Num() } + if len(sealedSegments) == 0 && len(growingSegments) == 0 && channelNum == 0 { + break outer + } select { case <-timeoutCh: diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index 75242870f5..07a3250a6a 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -315,13 +315,16 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo // different channel can be execute concurrently. g, _ := errgroup.WithContext(ctx) + opTimeout := paramtable.Get().StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse() // generate balance operations and applied them. for _, channel := range modifiedChannels { channel := channel g.Go(func() error { // all history channels should be remove from related nodes. for _, assignment := range channel.AssignHistories() { - if err := resource.Resource().StreamingNodeManagerClient().Remove(ctx, assignment); err != nil { + opCtx, cancel := context.WithTimeout(ctx, opTimeout) + defer cancel() + if err := resource.Resource().StreamingNodeManagerClient().Remove(opCtx, assignment); err != nil { b.Logger().Warn("fail to remove channel", zap.String("assignment", assignment.String()), zap.Error(err)) return err } @@ -329,7 +332,9 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo } // assign the channel to the target node. - if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil { + opCtx, cancel := context.WithTimeout(ctx, opTimeout) + defer cancel() + if err := resource.Resource().StreamingNodeManagerClient().Assign(opCtx, channel.CurrentAssignment()); err != nil { b.Logger().Warn("fail to assign channel", zap.String("assignment", channel.CurrentAssignment().String()), zap.Error(err)) return err } diff --git a/internal/streamingnode/client/handler/handler_client.go b/internal/streamingnode/client/handler/handler_client.go index a412d2fe32..e3db2e6b18 100644 --- a/internal/streamingnode/client/handler/handler_client.go +++ b/internal/streamingnode/client/handler/handler_client.go @@ -31,6 +31,7 @@ var ( _ HandlerClient = (*handlerClientImpl)(nil) ErrClientClosed = errors.New("handler client is closed") ErrClientAssignmentNotReady = errors.New("handler client assignment not ready") + ErrReadOnlyWAL = errors.New("wal is read only") ) type ( diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index 8dc6896dfd..4a06bc4752 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -59,6 +59,9 @@ func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, if err != nil { return 0, err } + if w.Channel().AccessMode != types.AccessModeRW { + return 0, ErrReadOnlyWAL + } return w.GetLatestMVCCTimestamp(ctx, vchannel) } diff --git a/internal/streamingnode/server/walmanager/wal_lifetime.go b/internal/streamingnode/server/walmanager/wal_lifetime.go index 8e45962392..8b5791a441 100644 --- a/internal/streamingnode/server/walmanager/wal_lifetime.go +++ b/internal/streamingnode/server/walmanager/wal_lifetime.go @@ -93,6 +93,7 @@ func (w *walLifetime) Close() { logger := log.With(zap.String("current", toStateString(currentState))) if oldWAL := currentState.GetWAL(); oldWAL != nil { oldWAL.Close() + w.statePair.SetCurrentState(newUnavailableCurrentState(currentState.Term(), nil)) logger.Info("close current term wal done at wal life time close") } logger.Info("wal lifetime closed") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 052206916a..c89b71fcbf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5572,6 +5572,7 @@ type streamingConfig struct { WALBalancerTriggerInterval ParamItem `refreshable:"true"` WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"` WALBalancerBackoffMultiplier ParamItem `refreshable:"true"` + WALBalancerOperationTimeout ParamItem `refreshable:"true"` // balancer Policy WALBalancerPolicyName ParamItem `refreshable:"true"` @@ -5637,6 +5638,15 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura Export: true, } p.WALBalancerBackoffMultiplier.Init(base.mgr) + p.WALBalancerOperationTimeout = ParamItem{ + Key: "streaming.walBalancer.operationTimeout", + Version: "2.6.0", + Doc: `The timeout of wal balancer operation, 10s by default. +If the operation exceeds this timeout, it will be canceled.`, + DefaultValue: "10s", + Export: true, + } + p.WALBalancerOperationTimeout.Init(base.mgr) p.WALBalancerPolicyName = ParamItem{ Key: "streaming.walBalancer.balancePolicy.name", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index d891047df0..6dfc27efb2 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -628,6 +628,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairAntiAffinityWeight.GetAsFloat()) assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceTolerance.GetAsFloat()) assert.Equal(t, 3, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceMaxStep.GetAsInt()) + assert.Equal(t, 10*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse()) assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())