diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e2a95ad6de..9f7ffe1c98 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -258,6 +258,9 @@ dataCoord: enableGarbageCollection: true enableActiveStandby: false # Enable active-standby + channel: + maxWatchDuration: 60 # Timeout on watching channels (in seconds). Default 60 seconds. + segment: maxSize: 512 # Maximum size of a segment in MB diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 8b3a0674e6..ee86d1cafe 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -37,10 +37,7 @@ import ( "stathat.com/c/consistent" ) -const ( - bgCheckInterval = 3 * time.Second - maxWatchDuration = 20 * time.Second -) +const bgCheckInterval = 3 * time.Second // ChannelManager manages the allocation and the balance between channels and data nodes. type ChannelManager struct { @@ -452,7 +449,7 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { Vchan: vcInfo, StartTs: time.Now().Unix(), State: datapb.ChannelWatchState_Uncomplete, - TimeoutTs: time.Now().Add(maxWatchDuration).UnixNano(), + TimeoutTs: time.Now().Add(Params.DataCoordCfg.MaxWatchDuration).UnixNano(), Schema: ch.Schema, } op.ChannelWatchInfos = append(op.ChannelWatchInfos, info) @@ -463,7 +460,7 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string { var channelsWithTimer = []string{} startTs := time.Now().Unix() - timeoutTs := time.Now().Add(maxWatchDuration).UnixNano() + timeoutTs := time.Now().Add(Params.DataCoordCfg.MaxWatchDuration).UnixNano() for _, ch := range op.Channels { vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID) info := &datapb.ChannelWatchInfo{ diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 11ba002d64..04ae95c836 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -566,7 +566,7 @@ func TestChannelManager(t *testing.T) { bufferID: {bufferID, []*channel{}}, }, } - chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, time.Now().Add(maxWatchDuration).UnixNano()) + chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, time.Now().Add(Params.DataCoordCfg.MaxWatchDuration).UnixNano()) err = chManager.DeleteNode(1) assert.NoError(t, err) diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index e9e02ebd93..a2bb270143 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -434,7 +434,7 @@ func EmptyBgChecker(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelIn return nil, nil } -// BgCheckWithMaxWatchDuration returns a ChannelBGChecker with the maxWatchDuration +// BgCheckWithMaxWatchDuration returns a ChannelBGChecker with `Params.DataCoordCfg.MaxWatchDuration`. func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { return func(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error) { reAllocations := make([]*NodeChannelInfo, 0, len(channels)) @@ -461,7 +461,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { } startTime := time.Unix(watchInfo.StartTs, 0) d := ts.Sub(startTime) - if d >= maxWatchDuration { + if d >= Params.DataCoordCfg.MaxWatchDuration { cinfo.Channels = append(cinfo.Channels, c) } } diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 89744a923e..b5b0b651ac 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -402,7 +402,7 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) { getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}}, {1, "chan2", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Complete}}}), []*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}}, - ts.Add(maxWatchDuration), + ts.Add(Params.DataCoordCfg.MaxWatchDuration), }, []*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}}, nil, @@ -412,7 +412,7 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) { args{ getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}}}), []*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}}, - ts.Add(maxWatchDuration).Add(-time.Second), + ts.Add(Params.DataCoordCfg.MaxWatchDuration).Add(-time.Second), }, []*NodeChannelInfo{}, nil, diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 47b4b6bfc8..9941981323 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -697,7 +697,7 @@ func (p *proxyConfig) initAccessLogMinioConfig() { p.AccessLog.RemotePath = p.Base.LoadWithDefault("proxy.accessLog.remotePath", "access_log/") } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- querycoord --- type queryCoordConfig struct { Base *BaseTable @@ -1140,6 +1140,9 @@ type dataCoordConfig struct { // --- ETCD --- ChannelWatchSubPath string + // --- CHANNEL --- + MaxWatchDuration time.Duration + // --- SEGMENTS --- SegmentMaxSize float64 DiskSegmentMaxSize float64 @@ -1179,6 +1182,8 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.Base = base p.initChannelWatchPrefix() + p.initMaxWatchDuration() + p.initSegmentMaxSize() p.initDiskSegmentMaxSize() p.initSegmentSealProportion() @@ -1208,6 +1213,10 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.initEnableActiveStandby() } +func (p *dataCoordConfig) initMaxWatchDuration() { + p.MaxWatchDuration = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.channel.maxWatchDuration", 60)) * time.Second +} + func (p *dataCoordConfig) initSegmentMaxSize() { p.SegmentMaxSize = p.Base.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0) }