enhance: make upgrading from 2.5.x less down time (#42082)

issue: #40532

- start timeticksync at rootcoord if the streaming service is not
available
- stop timeticksync if the streaming service is available
- open a read-only wal if some nodes in cluster is not upgrading to 2.6
- allow to open read-write wal after all nodes in cluster is upgrading
to 2.6

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-05-29 23:02:29 +08:00 committed by GitHub
parent b94cee2413
commit 4bad293655
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
73 changed files with 1917 additions and 1226 deletions

View File

@ -310,6 +310,7 @@ func (s *mixCoordImpl) GetStateCode() commonpb.StateCode {
func (s *mixCoordImpl) GracefulStop() {
if s.streamingCoord != nil {
s.streamingCoord.Stop()
s.streamingCoord = nil
}
}

View File

@ -58,6 +58,16 @@ func (s *StreamingNodeManager) GetLatestWALLocated(ctx context.Context, vchannel
return serverID, nil
}
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
func (s *StreamingNodeManager) RegisterStreamingEnabledListener(ctx context.Context, notifier *syncutil.AsyncTaskNotifier[struct{}]) error {
balancer, err := s.balancer.GetWithContext(ctx)
if err != nil {
return err
}
balancer.RegisterStreamingEnabledNotifier(notifier)
return nil
}
// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
pchannel := funcutil.ToPhysicalChannel(vChannel)

View File

@ -9,6 +9,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -33,6 +34,7 @@ func TestStreamingNodeManager(t *testing.T) {
}
}
})
b.EXPECT().RegisterStreamingEnabledNotifier(mock.Anything).Return()
m.SetBalancerReady(b)
streamingNodes := m.GetStreamingQueryNodeIDs()
@ -59,4 +61,6 @@ func TestStreamingNodeManager(t *testing.T) {
assert.Equal(t, node, int64(1))
streamingNodes = m.GetStreamingQueryNodeIDs()
assert.Equal(t, len(streamingNodes), 1)
assert.NoError(t, m.RegisterStreamingEnabledListener(context.Background(), syncutil.NewAsyncTaskNotifier[struct{}]()))
}

View File

@ -0,0 +1,8 @@
//go:build test
// +build test
package snmanager
func ResetStreamingNodeManager() {
StaticStreamingNodeManager = newStreamingNodeManager()
}

View File

@ -203,6 +203,12 @@ type QueryCoordCatalog interface {
// StreamingCoordCataLog is the interface for streamingcoord catalog
type StreamingCoordCataLog interface {
// GetVersion get the streaming version from metastore.
GetVersion(ctx context.Context) (*streamingpb.StreamingVersion, error)
// SaveVersion save the streaming version to metastore.
SaveVersion(ctx context.Context, version *streamingpb.StreamingVersion) error
// physical channel watch related
// ListPChannel list all pchannels on milvus.

View File

@ -4,4 +4,5 @@ const (
MetaPrefix = "streamingcoord-meta/"
PChannelMetaPrefix = MetaPrefix + "pchannel/"
BroadcastTaskPrefix = MetaPrefix + "broadcast-task/"
VersionPrefix = MetaPrefix + "version/"
)

View File

@ -12,10 +12,12 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
// NewCataLog creates a new catalog instance
// streamingcoord-meta
// ├── version
// ├── broadcast
// │   ├── task-1
// │   └── task-2
@ -34,6 +36,34 @@ type catalog struct {
metaKV kv.MetaKv
}
// GetVersion returns the streaming version
func (c *catalog) GetVersion(ctx context.Context) (*streamingpb.StreamingVersion, error) {
value, err := c.metaKV.Load(ctx, VersionPrefix)
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return nil, nil
}
return nil, err
}
info := &streamingpb.StreamingVersion{}
if err = proto.Unmarshal([]byte(value), info); err != nil {
return nil, errors.Wrapf(err, "unmarshal streaming version failed")
}
return info, nil
}
// SaveVersion saves the streaming version
func (c *catalog) SaveVersion(ctx context.Context, version *streamingpb.StreamingVersion) error {
if version == nil {
return errors.New("version is nil")
}
v, err := proto.Marshal(version)
if err != nil {
return errors.Wrapf(err, "marshal streaming version failed")
}
return c.metaKV.Save(ctx, VersionPrefix, string(v))
}
// ListPChannels returns all pchannels
func (c *catalog) ListPChannel(ctx context.Context) ([]*streamingpb.PChannelMeta, error) {
keys, values, err := c.metaKV.LoadWithPrefix(ctx, PChannelMetaPrefix)

View File

@ -17,6 +17,9 @@ func TestCatalog(t *testing.T) {
kv := mock_kv.NewMockMetaKv(t)
kvStorage := make(map[string]string)
kv.EXPECT().Load(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (string, error) {
return kvStorage[s], nil
})
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) ([]string, []string, error) {
keys := make([]string, 0, len(kvStorage))
vals := make([]string, 0, len(kvStorage))
@ -48,6 +51,16 @@ func TestCatalog(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, metas)
// Version test
err = catalog.SaveVersion(context.Background(), &streamingpb.StreamingVersion{
Version: 1,
})
assert.NoError(t, err)
v, err := catalog.GetVersion(context.Background())
assert.NoError(t, err)
assert.Equal(t, v.Version, int64(1))
// PChannel test
err = catalog.SavePChannels(context.Background(), []*streamingpb.PChannelMeta{
{

View File

@ -23,6 +23,64 @@ func (_m *MockStreamingCoordCataLog) EXPECT() *MockStreamingCoordCataLog_Expecte
return &MockStreamingCoordCataLog_Expecter{mock: &_m.Mock}
}
// GetVersion provides a mock function with given fields: ctx
func (_m *MockStreamingCoordCataLog) GetVersion(ctx context.Context) (*streamingpb.StreamingVersion, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetVersion")
}
var r0 *streamingpb.StreamingVersion
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*streamingpb.StreamingVersion, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *streamingpb.StreamingVersion); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*streamingpb.StreamingVersion)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStreamingCoordCataLog_GetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVersion'
type MockStreamingCoordCataLog_GetVersion_Call struct {
*mock.Call
}
// GetVersion is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockStreamingCoordCataLog_Expecter) GetVersion(ctx interface{}) *MockStreamingCoordCataLog_GetVersion_Call {
return &MockStreamingCoordCataLog_GetVersion_Call{Call: _e.mock.On("GetVersion", ctx)}
}
func (_c *MockStreamingCoordCataLog_GetVersion_Call) Run(run func(ctx context.Context)) *MockStreamingCoordCataLog_GetVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockStreamingCoordCataLog_GetVersion_Call) Return(_a0 *streamingpb.StreamingVersion, _a1 error) *MockStreamingCoordCataLog_GetVersion_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockStreamingCoordCataLog_GetVersion_Call) RunAndReturn(run func(context.Context) (*streamingpb.StreamingVersion, error)) *MockStreamingCoordCataLog_GetVersion_Call {
_c.Call.Return(run)
return _c
}
// ListBroadcastTask provides a mock function with given fields: ctx
func (_m *MockStreamingCoordCataLog) ListBroadcastTask(ctx context.Context) ([]*streamingpb.BroadcastTask, error) {
ret := _m.Called(ctx)
@ -234,6 +292,53 @@ func (_c *MockStreamingCoordCataLog_SavePChannels_Call) RunAndReturn(run func(co
return _c
}
// SaveVersion provides a mock function with given fields: ctx, version
func (_m *MockStreamingCoordCataLog) SaveVersion(ctx context.Context, version *streamingpb.StreamingVersion) error {
ret := _m.Called(ctx, version)
if len(ret) == 0 {
panic("no return value specified for SaveVersion")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *streamingpb.StreamingVersion) error); ok {
r0 = rf(ctx, version)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStreamingCoordCataLog_SaveVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveVersion'
type MockStreamingCoordCataLog_SaveVersion_Call struct {
*mock.Call
}
// SaveVersion is a helper method to define mock.On call
// - ctx context.Context
// - version *streamingpb.StreamingVersion
func (_e *MockStreamingCoordCataLog_Expecter) SaveVersion(ctx interface{}, version interface{}) *MockStreamingCoordCataLog_SaveVersion_Call {
return &MockStreamingCoordCataLog_SaveVersion_Call{Call: _e.mock.On("SaveVersion", ctx, version)}
}
func (_c *MockStreamingCoordCataLog_SaveVersion_Call) Run(run func(ctx context.Context, version *streamingpb.StreamingVersion)) *MockStreamingCoordCataLog_SaveVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*streamingpb.StreamingVersion))
})
return _c
}
func (_c *MockStreamingCoordCataLog_SaveVersion_Call) Return(_a0 error) *MockStreamingCoordCataLog_SaveVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockStreamingCoordCataLog_SaveVersion_Call) RunAndReturn(run func(context.Context, *streamingpb.StreamingVersion) error) *MockStreamingCoordCataLog_SaveVersion_Call {
_c.Call.Return(run)
return _c
}
// NewMockStreamingCoordCataLog creates a new instance of MockStreamingCoordCataLog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockStreamingCoordCataLog(t interface {

View File

@ -5,9 +5,11 @@ package mock_balancer
import (
context "context"
types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
syncutil "github.com/milvus-io/milvus/pkg/v2/util/syncutil"
mock "github.com/stretchr/testify/mock"
types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
typeutil "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -160,6 +162,39 @@ func (_c *MockBalancer_MarkAsUnavailable_Call) RunAndReturn(run func(context.Con
return _c
}
// RegisterStreamingEnabledNotifier provides a mock function with given fields: notifier
func (_m *MockBalancer) RegisterStreamingEnabledNotifier(notifier *syncutil.AsyncTaskNotifier[struct{}]) {
_m.Called(notifier)
}
// MockBalancer_RegisterStreamingEnabledNotifier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingEnabledNotifier'
type MockBalancer_RegisterStreamingEnabledNotifier_Call struct {
*mock.Call
}
// RegisterStreamingEnabledNotifier is a helper method to define mock.On call
// - notifier *syncutil.AsyncTaskNotifier[struct{}]
func (_e *MockBalancer_Expecter) RegisterStreamingEnabledNotifier(notifier interface{}) *MockBalancer_RegisterStreamingEnabledNotifier_Call {
return &MockBalancer_RegisterStreamingEnabledNotifier_Call{Call: _e.mock.On("RegisterStreamingEnabledNotifier", notifier)}
}
func (_c *MockBalancer_RegisterStreamingEnabledNotifier_Call) Run(run func(notifier *syncutil.AsyncTaskNotifier[struct{}])) *MockBalancer_RegisterStreamingEnabledNotifier_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*syncutil.AsyncTaskNotifier[struct{}]))
})
return _c
}
func (_c *MockBalancer_RegisterStreamingEnabledNotifier_Call) Return() *MockBalancer_RegisterStreamingEnabledNotifier_Call {
_c.Call.Return()
return _c
}
func (_c *MockBalancer_RegisterStreamingEnabledNotifier_Call) RunAndReturn(run func(*syncutil.AsyncTaskNotifier[struct{}])) *MockBalancer_RegisterStreamingEnabledNotifier_Call {
_c.Run(run)
return _c
}
// Trigger provides a mock function with given fields: ctx
func (_m *MockBalancer) Trigger(ctx context.Context) error {
ret := _m.Called(ctx)

View File

@ -29,6 +29,7 @@ import (
"testing"
"time"
"github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/prometheus/client_golang/prometheus"
@ -88,6 +89,7 @@ func init() {
Registry = prometheus.NewRegistry()
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
common.Version = semver.MustParse("2.5.9")
}
func runMixCoord(ctx context.Context, localMsg bool) *grpcmixcoord.Server {

View File

@ -997,7 +997,7 @@ func (sd *shardDelegator) SyncTargetVersion(action *querypb.SyncAction, partitio
}
func (sd *shardDelegator) GetChannelQueryView() *channelQueryView {
return sd.distribution.queryView
return sd.distribution.GetQueryView()
}
func (sd *shardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) {

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/metastore"
@ -69,6 +70,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -215,10 +217,28 @@ func (c *Core) sendMinDdlTsAsTt() {
func (c *Core) startTimeTickLoop() {
log := log.Ctx(c.ctx)
defer c.wg.Done()
streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]()
defer streamingNotifier.Finish(struct{}{})
if streamingutil.IsStreamingServiceEnabled() {
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(c.ctx, streamingNotifier); err != nil {
log.Info("register streaming enabled listener failed", zap.Error(err))
return
}
if streamingNotifier.Context().Err() != nil {
log.Info("streaming service has been enabled, ddl timetick from rootcoord should not start")
return
}
}
ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-streamingNotifier.Context().Done():
log.Info("streaming service has been enabled, ddl timetick from rootcoord should stop")
return
case <-c.ctx.Done():
log.Info("rootcoord's timetick loop quit!")
return
@ -425,22 +445,13 @@ func (c *Core) initInternal() error {
c.garbageCollector = newBgGarbageCollector(c)
c.stepExecutor = newBgStepExecutor(c.ctx)
if !streamingutil.IsStreamingServiceEnabled() {
c.proxyWatcher = proxyutil.NewProxyWatcher(
c.etcdCli,
c.chanTimeTick.initSessions,
c.proxyClientManager.AddProxyClients,
)
c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
} else {
c.proxyWatcher = proxyutil.NewProxyWatcher(
c.etcdCli,
c.proxyClientManager.AddProxyClients,
)
c.proxyWatcher.AddSessionFunc(c.proxyClientManager.AddProxyClient)
c.proxyWatcher.DelSessionFunc(c.proxyClientManager.DelProxyClient)
}
c.proxyWatcher = proxyutil.NewProxyWatcher(
c.etcdCli,
c.chanTimeTick.initSessions,
c.proxyClientManager.AddProxyClients,
)
c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
log.Info("init proxy manager done")
c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
@ -693,13 +704,10 @@ func (c *Core) startInternal() error {
}
func (c *Core) startServerLoop() {
c.wg.Add(1)
c.wg.Add(3)
go c.tsLoop()
if !streamingutil.IsStreamingServiceEnabled() {
c.wg.Add(2)
go c.startTimeTickLoop()
go c.chanTimeTick.startWatch(&c.wg)
}
go c.startTimeTickLoop()
go c.chanTimeTick.startWatch(&c.wg)
}
// Start starts RootCoord.

View File

@ -22,11 +22,13 @@ import (
"sync"
"time"
"github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -34,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -231,6 +234,14 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
func (t *timetickSync) addSession(sess *sessionutil.Session) {
t.lock.Lock()
defer t.lock.Unlock()
rangeChecker := semver.MustParseRange(">=2.6.0-dev")
if rangeChecker(sess.Version) {
log.Info("new proxy with no timetick join, ignored",
zap.String("version", sess.Version.String()),
zap.Int64("serverID", sess.ServerID),
zap.String("address", sess.Address))
return
}
t.sess2ChanTsMap[sess.ServerID] = nil
log.Info("Add session for timeticksync", zap.Int64("serverID", sess.ServerID))
}
@ -261,6 +272,20 @@ func (t *timetickSync) initSessions(sess []*sessionutil.Session) {
func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
defer wg.Done()
streamingNotifier := syncutil.NewAsyncTaskNotifier[struct{}]()
defer streamingNotifier.Finish(struct{}{})
if streamingutil.IsStreamingServiceEnabled() {
if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(t.ctx, streamingNotifier); err != nil {
log.Info("register streaming enabled listener failed", zap.Error(err))
return
}
if streamingNotifier.Context().Err() != nil {
log.Info("streaming service has been enabled, proxy timetick from rootcoord should not start")
return
}
}
var checker *timerecord.LongTermChecker
if enableTtChecker {
checker = timerecord.NewLongTermChecker(t.ctx, ttCheckerName, timeTickSyncTtInterval, ttCheckerWarnMsg)
@ -270,6 +295,9 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
for {
select {
case <-streamingNotifier.Context().Done():
log.Info("streaming service has been enabled, proxy timetick from rootcoord should stop")
return
case <-t.ctx.Done():
log.Info("rootcoord context done", zap.Error(t.ctx.Err()))
return

View File

@ -6,6 +6,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -19,6 +20,16 @@ var (
// Balancer is a local component, it should promise all channel can be assigned, and reach the final consistency.
// Balancer should be thread safe.
type Balancer interface {
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
// If the error is returned, the balancer is closed.
// Otherwise, the following rules are applied:
// 1. If the streaming service already enabled once (write is applied), the notifier will be notified before this function returns.
// 2. If the streaming service is not already enabled,
// the notifier will be notified when the streaming service can be enabled (all node in cluster is upgrading to 2.6)
// and the balancer will wait for all notifier is finish, and then start the streaming service.
// 3. The caller should call the notifier finish method, after the caller see notification and finish its work.
RegisterStreamingEnabledNotifier(notifier *syncutil.AsyncTaskNotifier[struct{}])
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool)

View File

@ -65,6 +65,11 @@ type balancerImpl struct {
backgroundTaskNotifier *syncutil.AsyncTaskNotifier[struct{}] // backgroundTaskNotifier is used to conmunicate with the background task.
}
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
func (b *balancerImpl) RegisterStreamingEnabledNotifier(notifier *syncutil.AsyncTaskNotifier[struct{}]) {
b.channelMetaManager.RegisterStreamingEnabledNotifier(notifier)
}
// GetLatestWALLocated returns the server id of the node that the wal of the vChannel is located.
func (b *balancerImpl) GetLatestWALLocated(ctx context.Context, pchannel string) (int64, bool) {
return b.channelMetaManager.GetLatestWALLocated(ctx, pchannel)
@ -133,11 +138,7 @@ func (b *balancerImpl) execute() {
b.backgroundTaskNotifier.Finish(struct{}{})
b.Logger().Info("balancer execute finished")
}()
if err := b.blockUntilAllNodeIsGreaterThan260(b.ctx); err != nil {
b.Logger().Warn("fail to block until all node is greater than 2.6.0", zap.Error(err))
return
}
ready260Future := b.blockUntilAllNodeIsGreaterThan260(b.ctx)
balanceTimer := typeutil.NewBackoffTimer(&backoffConfigFetcher{})
nodeChanged, err := resource.Resource().StreamingNodeManagerClient().WatchNodeChanged(b.backgroundTaskNotifier.Context())
@ -156,6 +157,11 @@ func (b *balancerImpl) execute() {
// Wait for next balance trigger.
// Maybe trigger by timer or by request.
nextTimer, nextBalanceInterval := balanceTimer.NextTimer()
var ready260 <-chan struct{}
if ready260Future != nil {
ready260 = ready260Future.Done()
}
b.Logger().Info("balance wait", zap.Duration("nextBalanceInterval", nextBalanceInterval))
select {
case <-b.backgroundTaskNotifier.Context().Done():
@ -163,6 +169,13 @@ func (b *balancerImpl) execute() {
case newReq := <-b.reqCh:
newReq.apply(b)
b.applyAllRequest()
case <-ready260:
if err := ready260Future.Get(); err != nil {
b.Logger().Warn("fail to block until all node is greater than 2.6.0", zap.Error(err))
return
}
b.Logger().Info("all nodes is greater than 2.6.0, start to open read-write wal")
ready260Future = nil
case <-nextTimer:
// balance triggered by timer.
case _, ok := <-nodeChanged:
@ -192,15 +205,30 @@ func (b *balancerImpl) execute() {
// blockUntilAllNodeIsGreaterThan260 block until all node is greater than 2.6.0.
// It's just a protection, but didn't promised that there will never be a node with version < 2.6.0 join the cluster.
// These promise can only be achieved by the cluster dev-ops.
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) error {
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) *syncutil.Future[error] {
f := syncutil.NewFuture[error]()
if b.channelMetaManager.IsStreamingEnabledOnce() {
// Once the streaming is enabled, we can not check the node version anymore.
// because the first channel-assignment is generated after the old node is down.
return nil
}
go func() {
err := b.blockUntilAllNodeIsGreaterThan260AtBackground(ctx)
f.Set(err)
}()
return f
}
// blockUntilAllNodeIsGreaterThan260AtBackground block until all node is greater than 2.6.0 at background.
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260AtBackground(ctx context.Context) error {
doneErr := errors.New("done")
expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole}
expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole, typeutil.QueryNodeRole}
for _, role := range expectedRoles {
logger := b.Logger().With(zap.String("role", role))
logger.Info("start to wait that the nodes is greater than 2.6.0")
// Check if there's any proxy or data node with version < 2.6.0.
proxyResolver := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), "<2.6.0-dev")
r := proxyResolver.Resolver()
rosolver := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), "<2.6.0-dev")
r := rosolver.Resolver()
err := r.Watch(ctx, func(vs resolver.VersionedState) error {
if len(vs.Sessions()) == 0 {
return doneErr
@ -213,9 +241,9 @@ func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) er
return err
}
logger.Info("all nodes is greater than 2.6.0")
proxyResolver.Close()
rosolver.Close()
}
return nil
return b.channelMetaManager.MarkStreamingHasEnabled(ctx)
}
// applyAllRequest apply all request in the request channel.
@ -257,7 +285,11 @@ func (b *balancerImpl) balance(ctx context.Context) (bool, error) {
}
// call the balance strategy to generate the expected layout.
currentLayout := generateCurrentLayout(pchannelView, nodeStatus)
accessMode := types.AccessModeRO
if b.channelMetaManager.IsStreamingEnabledOnce() {
accessMode = types.AccessModeRW
}
currentLayout := generateCurrentLayout(pchannelView, nodeStatus, accessMode)
expectedLayout, err := b.policy.Balance(currentLayout)
if err != nil {
return false, errors.Wrap(err, "fail to balance")
@ -290,22 +322,22 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo
// all history channels should be remove from related nodes.
for _, assignment := range channel.AssignHistories() {
if err := resource.Resource().StreamingNodeManagerClient().Remove(ctx, assignment); err != nil {
b.Logger().Warn("fail to remove channel", zap.Any("assignment", assignment), zap.Error(err))
b.Logger().Warn("fail to remove channel", zap.String("assignment", assignment.String()), zap.Error(err))
return err
}
b.Logger().Info("remove channel success", zap.Any("assignment", assignment))
b.Logger().Info("remove channel success", zap.String("assignment", assignment.String()))
}
// assign the channel to the target node.
if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil {
b.Logger().Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()), zap.Error(err))
b.Logger().Warn("fail to assign channel", zap.String("assignment", channel.CurrentAssignment().String()), zap.Error(err))
return err
}
b.Logger().Info("assign channel success", zap.Any("assignment", channel.CurrentAssignment()))
b.Logger().Info("assign channel success", zap.String("assignment", channel.CurrentAssignment().String()))
// bookkeeping the meta assignment done.
if err := b.channelMetaManager.AssignPChannelsDone(ctx, []types.ChannelID{channel.ChannelID()}); err != nil {
b.Logger().Warn("fail to bookkeep pchannel assignment done", zap.Any("assignment", channel.CurrentAssignment()))
b.Logger().Warn("fail to bookkeep pchannel assignment done", zap.String("assignment", channel.CurrentAssignment().String()))
return err
}
return nil
@ -318,10 +350,14 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo
}
// generateCurrentLayout generate layout from all nodes info and meta.
func generateCurrentLayout(view *channel.PChannelView, allNodesStatus map[int64]*types.StreamingNodeStatus) (layout CurrentLayout) {
func generateCurrentLayout(view *channel.PChannelView, allNodesStatus map[int64]*types.StreamingNodeStatus, accessMode types.AccessMode) (layout CurrentLayout) {
channelsToNodes := make(map[types.ChannelID]int64, len(view.Channels))
channels := make(map[channel.ChannelID]types.PChannelInfo, len(view.Channels))
expectedAccessMode := make(map[types.ChannelID]types.AccessMode, len(view.Channels))
for id, meta := range view.Channels {
expectedAccessMode[id] = accessMode
channels[id] = meta.ChannelInfo()
if !meta.IsAssigned() {
// dead or expired relationship.
log.Warn("channel is not assigned to any server",
@ -352,9 +388,11 @@ func generateCurrentLayout(view *channel.PChannelView, allNodesStatus map[int64]
}
}
return CurrentLayout{
Channels: view.Stats,
AllNodesInfo: allNodesInfo,
ChannelsToNodes: channelsToNodes,
Channels: channels,
Stats: view.Stats,
AllNodesInfo: allNodesInfo,
ChannelsToNodes: channelsToNodes,
ExpectedAccessMode: expectedAccessMode,
}
}

View File

@ -71,29 +71,34 @@ func TestBalancer(t *testing.T) {
catalog := mock_metastore.NewMockStreamingCoordCataLog(t)
resource.InitForTest(resource.OptETCD(etcdClient), resource.OptStreamingCatalog(catalog), resource.OptStreamingManagerClient(streamingNodeManager))
catalog.EXPECT().GetVersion(mock.Anything).Return(nil, nil)
catalog.EXPECT().SaveVersion(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListPChannel(mock.Anything).Unset()
catalog.EXPECT().ListPChannel(mock.Anything).RunAndReturn(func(ctx context.Context) ([]*streamingpb.PChannelMeta, error) {
return []*streamingpb.PChannelMeta{
{
Channel: &streamingpb.PChannelInfo{
Name: "test-channel-1",
Term: 1,
Name: "test-channel-1",
Term: 1,
AccessMode: streamingpb.PChannelAccessMode_PCHANNEL_ACCESS_READONLY,
},
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED,
Node: &streamingpb.StreamingNodeInfo{ServerId: 1},
},
{
Channel: &streamingpb.PChannelInfo{
Name: "test-channel-2",
Term: 1,
Name: "test-channel-2",
Term: 1,
AccessMode: streamingpb.PChannelAccessMode_PCHANNEL_ACCESS_READONLY,
},
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED,
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNAVAILABLE,
Node: &streamingpb.StreamingNodeInfo{ServerId: 4},
},
{
Channel: &streamingpb.PChannelInfo{
Name: "test-channel-3",
Term: 2,
Name: "test-channel-3",
Term: 2,
AccessMode: streamingpb.PChannelAccessMode_PCHANNEL_ACCESS_READONLY,
},
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING,
Node: &streamingpb.StreamingNodeInfo{ServerId: 2},
@ -121,37 +126,50 @@ func TestBalancer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, b)
ctx1, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err = b.WatchChannelAssignments(ctx1, func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
assert.Len(t, relations, 2)
doneErr := errors.New("done")
err = b.WatchChannelAssignments(context.Background(), func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
for _, relation := range relations {
assert.Equal(t, relation.Channel.AccessMode, types.AccessModeRO)
}
if len(relations) == 3 {
return doneErr
}
return nil
})
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.ErrorIs(t, err, doneErr)
resource.Resource().ETCD().Delete(context.Background(), proxyPath1)
resource.Resource().ETCD().Delete(context.Background(), proxyPath2)
resource.Resource().ETCD().Delete(context.Background(), dataNodePath)
checkReady := func() {
err = b.WatchChannelAssignments(ctx, func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
// should one pchannel be assigned to per nodes
nodeIDs := typeutil.NewSet[int64]()
if len(relations) == 3 {
rwCount := types.AccessModeRW
for _, relation := range relations {
if relation.Channel.AccessMode == types.AccessModeRW {
rwCount++
}
nodeIDs.Insert(relation.Node.ServerID)
}
if rwCount == 3 {
assert.Equal(t, 3, nodeIDs.Len())
return doneErr
}
}
return nil
})
assert.ErrorIs(t, err, doneErr)
}
checkReady()
b.MarkAsUnavailable(ctx, []types.PChannelInfo{{
Name: "test-channel-1",
Term: 1,
}})
b.Trigger(ctx)
doneErr := errors.New("done")
err = b.WatchChannelAssignments(ctx, func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
// should one pchannel be assigned to per nodes
nodeIDs := typeutil.NewSet[int64]()
if len(relations) == 3 {
for _, status := range relations {
nodeIDs.Insert(status.Node.ServerID)
}
assert.Equal(t, 3, nodeIDs.Len())
return doneErr
}
return nil
})
assert.ErrorIs(t, err, doneErr)
checkReady()
// create a inifite block watcher and can be interrupted by close of balancer.
f := syncutil.NewFuture[error]()

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -18,10 +19,17 @@ var ErrChannelNotExist = errors.New("channel not exist")
// RecoverChannelManager creates a new channel manager.
func RecoverChannelManager(ctx context.Context, incomingChannel ...string) (*ChannelManager, error) {
channels, metrics, err := recoverFromConfigurationAndMeta(ctx, incomingChannel...)
// streamingVersion is used to identify current streaming service version.
// Used to check if there's some upgrade happens.
streamingVersion, err := resource.Resource().StreamingCatalog().GetVersion(ctx)
if err != nil {
return nil, err
}
channels, metrics, err := recoverFromConfigurationAndMeta(ctx, streamingVersion, incomingChannel...)
if err != nil {
return nil, err
}
globalVersion := paramtable.GetNodeID()
return &ChannelManager{
cond: syncutil.NewContextCond(&sync.Mutex{}),
@ -30,12 +38,13 @@ func RecoverChannelManager(ctx context.Context, incomingChannel ...string) (*Cha
Global: globalVersion, // global version should be keep increasing globally, it's ok to use node id.
Local: 0,
},
metrics: metrics,
metrics: metrics,
streamingVersion: streamingVersion,
}, nil
}
// recoverFromConfigurationAndMeta recovers the channel manager from configuration and meta.
func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...string) (map[ChannelID]*PChannelMeta, *channelMetrics, error) {
func recoverFromConfigurationAndMeta(ctx context.Context, streamingVersion *streamingpb.StreamingVersion, incomingChannel ...string) (map[ChannelID]*PChannelMeta, *channelMetrics, error) {
// Recover metrics.
metrics := newPChannelMetrics()
@ -55,7 +64,14 @@ func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...str
// Get new incoming meta from configuration.
for _, newChannel := range incomingChannel {
c := newPChannelMeta(newChannel)
var c *PChannelMeta
if streamingVersion == nil {
// if streaming service has never been enabled, we treat all channels as read-only.
c = newPChannelMeta(newChannel, types.AccessModeRO)
} else {
// once the streaming service is enabled, we treat all channels as read-write.
c = newPChannelMeta(newChannel, types.AccessModeRW)
}
if _, ok := channels[c.ChannelID()]; !ok {
channels[c.ChannelID()] = c
}
@ -67,10 +83,62 @@ func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...str
// ChannelManager is the `wal` of channel assignment and unassignment.
// Every operation applied to the streaming node should be recorded in ChannelManager first.
type ChannelManager struct {
cond *syncutil.ContextCond
channels map[ChannelID]*PChannelMeta
version typeutil.VersionInt64Pair
metrics *channelMetrics
cond *syncutil.ContextCond
channels map[ChannelID]*PChannelMeta
version typeutil.VersionInt64Pair
metrics *channelMetrics
streamingVersion *streamingpb.StreamingVersion // used to identify the current streaming service version.
// null if no streaming service has been run.
// 1 if streaming service has been run once.
streamingEnableNotifiers []*syncutil.AsyncTaskNotifier[struct{}]
}
// RegisterStreamingEnabledNotifier registers a notifier into the balancer.
func (cm *ChannelManager) RegisterStreamingEnabledNotifier(notifier *syncutil.AsyncTaskNotifier[struct{}]) {
cm.cond.L.Lock()
defer cm.cond.L.Unlock()
if cm.streamingVersion != nil {
// If the streaming service is already enabled once, notify the notifier and ignore it.
notifier.Cancel()
return
}
cm.streamingEnableNotifiers = append(cm.streamingEnableNotifiers, notifier)
}
// IsStreamingEnabledOnce returns true if streaming is enabled once.
func (cm *ChannelManager) IsStreamingEnabledOnce() bool {
cm.cond.L.Lock()
defer cm.cond.L.Unlock()
return cm.streamingVersion != nil
}
// MarkStreamingHasEnabled marks the streaming service has been enabled.
func (cm *ChannelManager) MarkStreamingHasEnabled(ctx context.Context) error {
cm.cond.L.Lock()
defer cm.cond.L.Unlock()
cm.streamingVersion = &streamingpb.StreamingVersion{
Version: 1,
}
if err := retry.Do(ctx, func() error {
return resource.Resource().StreamingCatalog().SaveVersion(ctx, cm.streamingVersion)
}, retry.AttemptAlways()); err != nil {
return err
}
// notify all notifiers that the streaming service has been enabled.
for _, notifier := range cm.streamingEnableNotifiers {
notifier.Cancel()
}
// and block until the listener of notifiers are finished.
for _, notifier := range cm.streamingEnableNotifiers {
notifier.BlockUntilFinish()
}
cm.streamingEnableNotifiers = nil
return nil
}
// CurrentPChannelsView returns the current view of pchannels.
@ -89,19 +157,19 @@ func (cm *ChannelManager) CurrentPChannelsView() *PChannelView {
// When the balancer want to assign a pchannel into a new server.
// It should always call this function to update the pchannel assignment first.
// Otherwise, the pchannel assignment tracing is lost at meta.
func (cm *ChannelManager) AssignPChannels(ctx context.Context, pChannelToStreamingNode map[ChannelID]types.StreamingNodeInfo) (map[ChannelID]*PChannelMeta, error) {
func (cm *ChannelManager) AssignPChannels(ctx context.Context, pChannelToStreamingNode map[ChannelID]types.PChannelInfoAssigned) (map[ChannelID]*PChannelMeta, error) {
cm.cond.LockAndBroadcast()
defer cm.cond.L.Unlock()
// modified channels.
pChannelMetas := make([]*streamingpb.PChannelMeta, 0, len(pChannelToStreamingNode))
for id, streamingNode := range pChannelToStreamingNode {
for id, assign := range pChannelToStreamingNode {
pchannel, ok := cm.channels[id]
if !ok {
return nil, ErrChannelNotExist
}
mutablePchannel := pchannel.CopyForWrite()
if mutablePchannel.TryAssignToServerID(streamingNode) {
if mutablePchannel.TryAssignToServerID(assign.Channel.AccessMode, assign.Node) {
pChannelMetas = append(pChannelMetas, mutablePchannel.IntoRawMeta())
}
}
@ -181,8 +249,11 @@ func (cm *ChannelManager) updatePChannelMeta(ctx context.Context, pChannelMetas
if len(pChannelMetas) == 0 {
return nil
}
if err := resource.Resource().StreamingCatalog().SavePChannels(ctx, pChannelMetas); err != nil {
return errors.Wrap(err, "update meta at catalog")
if err := retry.Do(ctx, func() error {
return resource.Resource().StreamingCatalog().SavePChannels(ctx, pChannelMetas)
}, retry.AttemptAlways()); err != nil {
return err
}
// update in-memory copy and increase the version.

View File

@ -2,6 +2,7 @@ package channel
import (
"context"
"math/rand"
"testing"
"github.com/cockroachdb/errors"
@ -12,6 +13,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -24,6 +26,9 @@ func TestChannelManager(t *testing.T) {
ctx := context.Background()
// Test recover failure.
catalog.EXPECT().GetVersion(mock.Anything).Return(&streamingpb.StreamingVersion{
Version: 1,
}, nil)
catalog.EXPECT().ListPChannel(mock.Anything).Return(nil, errors.New("recover failure"))
m, err := RecoverChannelManager(ctx)
assert.Nil(t, m)
@ -47,21 +52,15 @@ func TestChannelManager(t *testing.T) {
assert.NotNil(t, m)
assert.NoError(t, err)
// Test save meta failure
catalog.EXPECT().SavePChannels(mock.Anything, mock.Anything).Return(errors.New("save meta failure"))
modified, err := m.AssignPChannels(ctx, map[ChannelID]types.StreamingNodeInfo{newChannelID("test-channel"): {ServerID: 2}})
assert.Nil(t, modified)
assert.Error(t, err)
err = m.AssignPChannelsDone(ctx, []ChannelID{newChannelID("test-channel")})
assert.Error(t, err)
err = m.MarkAsUnavailable(ctx, []types.PChannelInfo{{
Name: "test-channel",
Term: 2,
}})
assert.Error(t, err)
// Test update non exist pchannel
modified, err = m.AssignPChannels(ctx, map[ChannelID]types.StreamingNodeInfo{newChannelID("non-exist-channel"): {ServerID: 2}})
modified, err := m.AssignPChannels(ctx, map[ChannelID]types.PChannelInfoAssigned{newChannelID("non-exist-channel"): {
Channel: types.PChannelInfo{
Name: "non-exist-channel",
Term: 1,
AccessMode: types.AccessModeRW,
},
Node: types.StreamingNodeInfo{ServerID: 2},
}})
assert.Nil(t, modified)
assert.ErrorIs(t, err, ErrChannelNotExist)
err = m.AssignPChannelsDone(ctx, []ChannelID{newChannelID("non-exist-channel")})
@ -74,13 +73,30 @@ func TestChannelManager(t *testing.T) {
// Test success.
catalog.EXPECT().SavePChannels(mock.Anything, mock.Anything).Unset()
catalog.EXPECT().SavePChannels(mock.Anything, mock.Anything).Return(nil)
modified, err = m.AssignPChannels(ctx, map[ChannelID]types.StreamingNodeInfo{newChannelID("test-channel"): {ServerID: 2}})
catalog.EXPECT().SavePChannels(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pm []*streamingpb.PChannelMeta) error {
if rand.Int31n(3) == 0 {
return errors.New("save meta failure")
}
return nil
})
modified, err = m.AssignPChannels(ctx, map[ChannelID]types.PChannelInfoAssigned{newChannelID("test-channel"): {
Channel: types.PChannelInfo{
Name: "test-channel",
Term: 1,
AccessMode: types.AccessModeRW,
},
Node: types.StreamingNodeInfo{ServerID: 2},
}})
assert.NotNil(t, modified)
assert.NoError(t, err)
assert.Len(t, modified, 1)
err = m.AssignPChannelsDone(ctx, []ChannelID{newChannelID("test-channel")})
assert.NoError(t, err)
nodeID, ok := m.GetLatestWALLocated(ctx, "test-channel")
assert.True(t, ok)
assert.NotZero(t, nodeID)
err = m.MarkAsUnavailable(ctx, []types.PChannelInfo{{
Name: "test-channel",
Term: 2,
@ -93,6 +109,45 @@ func TestChannelManager(t *testing.T) {
channel, ok := view.Channels[newChannelID("test-channel")]
assert.True(t, ok)
assert.NotNil(t, channel)
nodeID, ok = m.GetLatestWALLocated(ctx, "test-channel")
assert.False(t, ok)
assert.Zero(t, nodeID)
}
func TestStreamingEnableChecker(t *testing.T) {
ctx := context.Background()
ResetStaticPChannelStatsManager()
RecoverPChannelStatsManager([]string{})
catalog := mock_metastore.NewMockStreamingCoordCataLog(t)
resource.InitForTest(resource.OptStreamingCatalog(catalog))
// Test recover failure.
catalog.EXPECT().GetVersion(mock.Anything).Return(nil, nil)
catalog.EXPECT().SaveVersion(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListPChannel(mock.Anything).Return(nil, nil)
m, err := RecoverChannelManager(ctx, "test-channel")
assert.NoError(t, err)
assert.False(t, m.IsStreamingEnabledOnce())
n := syncutil.NewAsyncTaskNotifier[struct{}]()
m.RegisterStreamingEnabledNotifier(n)
assert.NoError(t, n.Context().Err())
go func() {
defer n.Finish(struct{}{})
<-n.Context().Done()
}()
err = m.MarkStreamingHasEnabled(ctx)
assert.NoError(t, err)
n2 := syncutil.NewAsyncTaskNotifier[struct{}]()
m.RegisterStreamingEnabledNotifier(n2)
assert.Error(t, n.Context().Err())
assert.Error(t, n2.Context().Err())
}
func TestChannelManagerWatch(t *testing.T) {
@ -101,7 +156,9 @@ func TestChannelManagerWatch(t *testing.T) {
catalog := mock_metastore.NewMockStreamingCoordCataLog(t)
resource.InitForTest(resource.OptStreamingCatalog(catalog))
catalog.EXPECT().GetVersion(mock.Anything).Return(&streamingpb.StreamingVersion{
Version: 1,
}, nil)
catalog.EXPECT().ListPChannel(mock.Anything).Unset()
catalog.EXPECT().ListPChannel(mock.Anything).RunAndReturn(func(ctx context.Context) ([]*streamingpb.PChannelMeta, error) {
return []*streamingpb.PChannelMeta{
@ -137,7 +194,14 @@ func TestChannelManagerWatch(t *testing.T) {
assert.ErrorIs(t, err, context.Canceled)
}()
manager.AssignPChannels(ctx, map[ChannelID]types.StreamingNodeInfo{newChannelID("test-channel"): {ServerID: 2}})
manager.AssignPChannels(ctx, map[ChannelID]types.PChannelInfoAssigned{newChannelID("test-channel"): {
Channel: types.PChannelInfo{
Name: "test-channel",
Term: 1,
AccessMode: types.AccessModeRW,
},
Node: types.StreamingNodeInfo{ServerID: 2},
}})
manager.AssignPChannelsDone(ctx, []ChannelID{newChannelID("test-channel")})
<-called

View File

@ -8,13 +8,13 @@ import (
)
// newPChannelMeta creates a new PChannelMeta.
func newPChannelMeta(name string) *PChannelMeta {
func newPChannelMeta(name string, accessMode types.AccessMode) *PChannelMeta {
return &PChannelMeta{
inner: &streamingpb.PChannelMeta{
Channel: &streamingpb.PChannelInfo{
Name: name,
Term: 1,
AccessMode: streamingpb.PChannelAccessMode(types.AccessModeRW),
AccessMode: streamingpb.PChannelAccessMode(accessMode),
},
Node: nil,
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNINITIALIZED,
@ -113,8 +113,8 @@ type mutablePChannel struct {
}
// TryAssignToServerID assigns the channel to a server.
func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeInfo) bool {
if m.CurrentServerID() == streamingNode.ServerID && m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED {
func (m *mutablePChannel) TryAssignToServerID(accessMode types.AccessMode, streamingNode types.StreamingNodeInfo) bool {
if m.ChannelInfo().AccessMode == accessMode && m.CurrentServerID() == streamingNode.ServerID && m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED {
// if the channel is already assigned to the server, return false.
return false
}
@ -128,6 +128,7 @@ func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeI
}
// otherwise update the channel into assgining state.
m.inner.Channel.AccessMode = streamingpb.PChannelAccessMode(accessMode)
m.inner.Channel.Term++
m.inner.Node = types.NewProtoFromStreamingNodeInfo(streamingNode)
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING

View File

@ -39,7 +39,7 @@ func TestPChannel(t *testing.T) {
},
}, pchannel.CurrentAssignment())
pchannel = newPChannelMeta("test-channel")
pchannel = newPChannelMeta("test-channel", types.AccessModeRW)
assert.Equal(t, "test-channel", pchannel.Name())
assert.Equal(t, int64(1), pchannel.CurrentTerm())
assert.Empty(t, pchannel.AssignHistories())
@ -53,7 +53,7 @@ func TestPChannel(t *testing.T) {
newServerID := types.StreamingNodeInfo{
ServerID: 456,
}
assert.True(t, mutablePChannel.TryAssignToServerID(newServerID))
assert.True(t, mutablePChannel.TryAssignToServerID(types.AccessModeRW, newServerID))
updatedChannelInfo := newPChannelMetaFromProto(mutablePChannel.IntoRawMeta())
assert.Equal(t, "test-channel", pchannel.Name())
@ -69,7 +69,7 @@ func TestPChannel(t *testing.T) {
mutablePChannel = updatedChannelInfo.CopyForWrite()
mutablePChannel.TryAssignToServerID(types.StreamingNodeInfo{ServerID: 789})
mutablePChannel.TryAssignToServerID(types.AccessModeRW, types.StreamingNodeInfo{ServerID: 789})
updatedChannelInfo = newPChannelMetaFromProto(mutablePChannel.IntoRawMeta())
assert.Equal(t, "test-channel", updatedChannelInfo.Name())
assert.Equal(t, int64(3), updatedChannelInfo.CurrentTerm())
@ -94,7 +94,7 @@ func TestPChannel(t *testing.T) {
// Test reassigned
mutablePChannel = updatedChannelInfo.CopyForWrite()
assert.False(t, mutablePChannel.TryAssignToServerID(types.StreamingNodeInfo{ServerID: 789}))
assert.False(t, mutablePChannel.TryAssignToServerID(types.AccessModeRW, types.StreamingNodeInfo{ServerID: 789}))
// Test MarkAsUnavailable
mutablePChannel = updatedChannelInfo.CopyForWrite()
@ -107,4 +107,9 @@ func TestPChannel(t *testing.T) {
updatedChannelInfo = newPChannelMetaFromProto(mutablePChannel.IntoRawMeta())
assert.False(t, updatedChannelInfo.IsAssigned())
assert.Equal(t, streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNAVAILABLE, updatedChannelInfo.State())
// Test assign on unavailable
mutablePChannel = updatedChannelInfo.CopyForWrite()
assert.True(t, mutablePChannel.TryAssignToServerID(types.AccessModeRW, types.StreamingNodeInfo{ServerID: 789}))
assert.Len(t, mutablePChannel.AssignHistories(), 1)
}

View File

@ -30,4 +30,13 @@ func TestPChannelView(t *testing.T) {
view := newPChannelView(metas)
assert.Len(t, view.Channels, 2)
assert.Len(t, view.Stats, 2)
StaticPChannelStatsManager.Get().AddVChannel(
"by-dev-rootcoord-dml_0_100v0",
"by-dev-rootcoord-dml_0_101v0",
)
StaticPChannelStatsManager.Get().RemoveVChannel(
"by-dev-rootcoord-dml_0_100v0",
"by-dev-rootcoord-dml_0_101v0",
)
StaticPChannelStatsManager.Get().WatchAtChannelCountChanged()
}

View File

@ -17,7 +17,7 @@ func newExpectedLayoutForVChannelFairPolicy(currentLayout balancer.CurrentLayout
// perfect average vchannel count per node.
averageVChannelPerNode := float64(totalVChannel) / float64(currentLayout.TotalNodes())
// current affinity of pchannel.
affinity := newPChannelAffinity(currentLayout.Channels)
affinity := newPChannelAffinity(currentLayout.Stats)
// Create the node info for all
nodes := make(map[int64]*streamingNodeInfo)
@ -32,7 +32,7 @@ func newExpectedLayoutForVChannelFairPolicy(currentLayout balancer.CurrentLayout
CurrentLayout: currentLayout,
AveragePChannelPerNode: averagePChannelPerNode,
AverageVChannelPerNode: averageVChannelPerNode,
Assignments: make(map[types.ChannelID]types.StreamingNodeInfo),
Assignments: make(map[types.ChannelID]types.PChannelInfoAssigned),
Nodes: nodes,
}
layout.updateAllScore()
@ -41,7 +41,7 @@ func newExpectedLayoutForVChannelFairPolicy(currentLayout balancer.CurrentLayout
// assignmentSnapshot is the assignment snapshot of the expected layout.
type assignmentSnapshot struct {
Assignments map[types.ChannelID]types.StreamingNodeInfo
Assignments map[types.ChannelID]types.PChannelInfoAssigned
GlobalUnbalancedScore float64
}
@ -59,14 +59,14 @@ type expectedLayoutForVChannelFairPolicy struct {
AveragePChannelPerNode float64
AverageVChannelPerNode float64
PChannelAffinity *pchannelAffinity
GlobalUnbalancedScore float64 // the sum of unbalance score of all streamingnode, indicates how unbalanced the layout is, better if lower.
Assignments map[types.ChannelID]types.StreamingNodeInfo // current assignment of pchannel to streamingnode.
GlobalUnbalancedScore float64 // the sum of unbalance score of all streamingnode, indicates how unbalanced the layout is, better if lower.
Assignments map[types.ChannelID]types.PChannelInfoAssigned // current assignment of pchannel to streamingnode.
Nodes map[int64]*streamingNodeInfo
}
// AssignmentSnapshot will return the assignment snapshot.
func (p *expectedLayoutForVChannelFairPolicy) AssignmentSnapshot() assignmentSnapshot {
assignments := make(map[types.ChannelID]types.StreamingNodeInfo)
assignments := make(map[types.ChannelID]types.PChannelInfoAssigned)
for channelID, node := range p.Assignments {
assignments[channelID] = node
}
@ -89,17 +89,27 @@ func (p *expectedLayoutForVChannelFairPolicy) Assign(channelID types.ChannelID,
if _, ok := p.Assignments[channelID]; ok {
panic("channel already assigned")
}
stats, ok := p.CurrentLayout.Channels[channelID]
stats, ok := p.CurrentLayout.Stats[channelID]
if !ok {
panic("stats not found")
}
expectedAccessMode, ok := p.CurrentLayout.ExpectedAccessMode[channelID]
if !ok {
panic("expected access mode not found")
}
node, ok := p.CurrentLayout.AllNodesInfo[serverID]
if !ok {
panic("node info not found")
}
// assign to the node that already has pchannel at highest priority.
p.Assignments[channelID] = node
info := p.CurrentLayout.Channels[channelID]
info.AccessMode = expectedAccessMode
info.Term++
p.Assignments[channelID] = types.PChannelInfoAssigned{
Channel: info,
Node: node,
}
p.Nodes[node.ServerID].AssignedChannels[channelID] = struct{}{}
p.Nodes[node.ServerID].AssignedVChannelCount += len(stats.VChannels)
p.updateNodeScore(node.ServerID)
@ -107,13 +117,14 @@ func (p *expectedLayoutForVChannelFairPolicy) Assign(channelID types.ChannelID,
// Unassign will unassign the channel from the node.
func (p *expectedLayoutForVChannelFairPolicy) Unassign(channelID types.ChannelID) {
node, ok := p.Assignments[channelID]
assignment, ok := p.Assignments[channelID]
if !ok {
panic("channel is not assigned")
}
node := assignment.Node
delete(p.Assignments, channelID)
delete(p.Nodes[node.ServerID].AssignedChannels, channelID)
p.Nodes[node.ServerID].AssignedVChannelCount -= len(p.CurrentLayout.Channels[channelID].VChannels)
p.Nodes[node.ServerID].AssignedVChannelCount -= len(p.CurrentLayout.Stats[channelID].VChannels)
p.updateNodeScore(node.ServerID)
}
@ -160,7 +171,7 @@ func (p *expectedLayoutForVChannelFairPolicy) FindTheLeastUnbalanceScoreIncremen
var targetChannelID types.ChannelID
minScore := math.MaxFloat64
for channelID := range p.Assignments {
serverID := p.Assignments[channelID].ServerID
serverID := p.Assignments[channelID].Node.ServerID
p.Unassign(channelID)
currentScore := p.GlobalUnbalancedScore
if currentScore < minScore {

View File

@ -31,13 +31,13 @@ func TestPChannelCountFair(t *testing.T) {
}))
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.EqualValues(t, int64(2), expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c3")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c2")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c5")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c6")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c7")].ServerID)
assert.EqualValues(t, int64(2), expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c3")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c2")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c5")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c6")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c7")].Node.ServerID)
counts := countByServerID(expected)
assert.Equal(t, 3, len(counts))
for _, count := range counts {
@ -60,8 +60,8 @@ func TestPChannelCountFair(t *testing.T) {
}, make(map[string]map[string]int64), []int64{1, 2, 3}))
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].Node.ServerID)
counts = countByServerID(expected)
assert.Equal(t, 3, len(counts))
for _, count := range counts {
@ -84,15 +84,15 @@ func TestPChannelCountFair(t *testing.T) {
}, make(map[string]map[string]int64), []int64{1, 2, 3}))
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c2")].ServerID)
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c3")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c5")].ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c6")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c7")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c8")].ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c9")].ServerID)
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c2")].Node.ServerID)
assert.Equal(t, int64(1), expected.ChannelAssignment[newChannelID("c3")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c4")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c5")].Node.ServerID)
assert.Equal(t, int64(2), expected.ChannelAssignment[newChannelID("c6")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c7")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c8")].Node.ServerID)
assert.Equal(t, int64(3), expected.ChannelAssignment[newChannelID("c9")].Node.ServerID)
counts = countByServerID(expected)
assert.Equal(t, 3, len(counts))
for _, count := range counts {
@ -108,7 +108,7 @@ func TestPChannelCountFair(t *testing.T) {
func countByServerID(expected balancer.ExpectedLayout) map[int64]int {
counts := make(map[int64]int)
for _, node := range expected.ChannelAssignment {
counts[node.ServerID]++
counts[node.Node.ServerID]++
}
return counts
}
@ -122,9 +122,11 @@ func newChannelID(channel string) types.ChannelID {
// newLayout creates a new layout for test.
func newLayout(channels map[string]int, vchannels map[string]map[string]int64, serverID []int64) balancer.CurrentLayout {
layout := balancer.CurrentLayout{
Channels: make(map[types.ChannelID]channel.PChannelStatsView),
AllNodesInfo: make(map[int64]types.StreamingNodeInfo),
ChannelsToNodes: make(map[types.ChannelID]int64),
Channels: make(map[channel.ChannelID]types.PChannelInfo),
Stats: make(map[channel.ChannelID]channel.PChannelStatsView),
AllNodesInfo: make(map[int64]types.StreamingNodeInfo),
ChannelsToNodes: make(map[types.ChannelID]int64),
ExpectedAccessMode: make(map[channel.ChannelID]types.AccessMode),
}
for _, id := range serverID {
layout.AllNodesInfo[id] = types.StreamingNodeInfo{
@ -133,13 +135,19 @@ func newLayout(channels map[string]int, vchannels map[string]map[string]int64, s
}
for c, node := range channels {
if vc, ok := vchannels[c]; !ok {
layout.Channels[newChannelID(c)] = channel.PChannelStatsView{VChannels: make(map[string]int64)}
layout.Stats[newChannelID(c)] = channel.PChannelStatsView{VChannels: make(map[string]int64)}
} else {
layout.Channels[newChannelID(c)] = channel.PChannelStatsView{VChannels: vc}
layout.Stats[newChannelID(c)] = channel.PChannelStatsView{VChannels: vc}
}
if node > 0 {
layout.ChannelsToNodes[newChannelID(c)] = int64(node)
}
layout.Channels[newChannelID(c)] = types.PChannelInfo{
Name: c,
Term: 0,
AccessMode: types.AccessModeRW,
}
layout.ExpectedAccessMode[newChannelID(c)] = types.AccessModeRW
}
return layout
}

View File

@ -28,11 +28,10 @@ func (p *policy) Name() string {
}
// Balance will balance the load of streaming node by vchannel count.
func (p *policy) Balance(currentLayout balancer.CurrentLayout) (balancer.ExpectedLayout, error) {
func (p *policy) Balance(currentLayout balancer.CurrentLayout) (layout balancer.ExpectedLayout, err error) {
if currentLayout.TotalNodes() == 0 {
return balancer.ExpectedLayout{}, errors.New("no available streaming node")
}
// update policy configuration before balancing.
p.updatePolicyConfiguration()

View File

@ -45,7 +45,7 @@ func TestVChannelFairPolicy(t *testing.T) {
}, []int64{2, 3}))
assert.NoError(t, err)
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].ServerID, expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].Node.ServerID, expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
expected, err = policy.Balance(newLayout(map[string]int{
"c1": 2,
@ -76,9 +76,9 @@ func TestVChannelFairPolicy(t *testing.T) {
}, []int64{1, 2, 3}))
assert.NoError(t, err)
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].ServerID, expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].ServerID, expected.ChannelAssignment[newChannelID("c5")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c5")].ServerID, expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].Node.ServerID, expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].Node.ServerID, expected.ChannelAssignment[newChannelID("c5")].Node.ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c5")].Node.ServerID, expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
expected, err = policy.Balance(newLayout(map[string]int{
"c1": -1,
@ -109,7 +109,7 @@ func TestVChannelFairPolicy(t *testing.T) {
}, []int64{1, 2, 3}))
assert.NoError(t, err)
assert.Equal(t, 10, len(expected.ChannelAssignment))
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].ServerID, expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].ServerID, expected.ChannelAssignment[newChannelID("c5")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c5")].ServerID, expected.ChannelAssignment[newChannelID("c1")].ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].Node.ServerID, expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c3")].Node.ServerID, expected.ChannelAssignment[newChannelID("c5")].Node.ServerID)
assert.NotEqual(t, expected.ChannelAssignment[newChannelID("c5")].Node.ServerID, expected.ChannelAssignment[newChannelID("c1")].Node.ServerID)
}

View File

@ -17,9 +17,11 @@ var policiesBuilders typeutil.ConcurrentMap[string, PolicyBuilder]
// CurrentLayout is the full topology of streaming node and pChannel.
type CurrentLayout struct {
Channels map[types.ChannelID]channel.PChannelStatsView // Stats is the statistics of all pchannels.
AllNodesInfo map[int64]types.StreamingNodeInfo // AllNodesInfo is the full information of all available streaming nodes and related pchannels (contain the node not assign anything on it).
ChannelsToNodes map[types.ChannelID]int64 // ChannelsToNodes maps assigned channel name to node id.
Channels map[channel.ChannelID]types.PChannelInfo
Stats map[channel.ChannelID]channel.PChannelStatsView
AllNodesInfo map[int64]types.StreamingNodeInfo // AllNodesInfo is the full information of all available streaming nodes and related pchannels (contain the node not assign anything on it).
ChannelsToNodes map[types.ChannelID]int64 // ChannelsToNodes maps assigned channel name to node id.
ExpectedAccessMode map[channel.ChannelID]types.AccessMode // ExpectedAccessMode is the expected access mode of all channel.
}
// TotalChannels returns the total number of channels in the layout.
@ -30,7 +32,7 @@ func (layout *CurrentLayout) TotalChannels() int {
// TotalVChannels returns the total number of vchannels in the layout.
func (layout *CurrentLayout) TotalVChannels() int {
cnt := 0
for _, stats := range layout.Channels {
for _, stats := range layout.Stats {
cnt += len(stats.VChannels)
}
return cnt
@ -39,7 +41,7 @@ func (layout *CurrentLayout) TotalVChannels() int {
// TotalVChannelPerCollection returns the total number of vchannels per collection in the layout.
func (layout *CurrentLayout) TotalVChannelsOfCollection() map[int64]int {
cnt := make(map[int64]int)
for _, stats := range layout.Channels {
for _, stats := range layout.Stats {
for _, collectionID := range stats.VChannels {
cnt[collectionID]++
}
@ -55,7 +57,7 @@ func (layout *CurrentLayout) TotalNodes() int {
// GetAllPChannelsSortedByVChannelCountDesc returns all pchannels sorted by vchannel count in descending order.
func (layout *CurrentLayout) GetAllPChannelsSortedByVChannelCountDesc() []types.ChannelID {
sorter := make(byVChannelCountDesc, 0, layout.TotalChannels())
for id, stats := range layout.Channels {
for id, stats := range layout.Stats {
sorter = append(sorter, withVChannelCount{
id: id,
vchannelCount: len(stats.VChannels),
@ -84,14 +86,14 @@ func (a byVChannelCountDesc) Less(i, j int) bool {
// ExpectedLayout is the expected layout of streaming node and pChannel.
type ExpectedLayout struct {
ChannelAssignment map[types.ChannelID]types.StreamingNodeInfo // ChannelAssignment is the assignment of channel to node.
ChannelAssignment map[types.ChannelID]types.PChannelInfoAssigned // ChannelAssignment is the assignment of channel to node.
}
// String returns the string representation of the expected layout.
func (layout ExpectedLayout) String() string {
ss := make([]string, 0, len(layout.ChannelAssignment))
for channelID, node := range layout.ChannelAssignment {
ss = append(ss, channelID.String()+":"+node.String())
for _, assignment := range layout.ChannelAssignment {
ss = append(ss, assignment.String())
}
return strings.Join(ss, ",")
}

View File

@ -62,6 +62,13 @@ func Init(opts ...optResourceInit) {
r = newR
}
// Release release the streamingnode client
func Release() {
if r.streamingNodeManagerClient != nil {
r.streamingNodeManagerClient.Close()
}
}
// Resource access the underlying singleton of resources.
func Resource() *resourceImpl {
return r

View File

@ -25,4 +25,5 @@ func TestInit(t *testing.T) {
func TestInitForTest(t *testing.T) {
InitForTest()
Release()
}

View File

@ -11,6 +11,7 @@ import (
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -106,5 +107,7 @@ func (s *Server) Stop() {
} else {
s.logger.Info("broadcaster not ready, skip close")
}
s.logger.Info("release streamingcoord resource...")
resource.Release()
s.logger.Info("streamingcoord server stopped")
}

View File

@ -71,6 +71,9 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
logger := log.With(zap.String("pchannel", opts.PChannel), zap.String("handler", "producer"))
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, logger, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error) {
if assign.Channel.AccessMode != types.AccessModeRW {
return nil, errors.New("producer can only be created for RW channel")
}
// Check if the localWAL is assigned at local
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil {

View File

@ -0,0 +1,10 @@
//go:build test
// +build test
package registry
import "github.com/milvus-io/milvus/pkg/v2/util/syncutil"
func ResetRegisterLocalWALManager() {
registry = syncutil.NewFuture[WALManager]()
}

View File

@ -2,6 +2,7 @@ package adaptor
import (
"context"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -101,8 +102,10 @@ type scannerAdaptorImpl struct {
reorderBuffer *utility.ReOrderByTimeTickBuffer // support time tick reorder.
pendingQueue *utility.PendingQueue
txnBuffer *utility.TxnBuffer // txn buffer for txn message.
cleanup func()
metrics *metricsutil.ScannerMetrics
cleanup func()
clearOnce sync.Once
metrics *metricsutil.ScannerMetrics
}
// Channel returns the channel assignment info of the wal.
@ -119,13 +122,21 @@ func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage {
// Return the error same with `Error`
func (s *scannerAdaptorImpl) Close() error {
err := s.ScannerHelper.Close()
if s.cleanup != nil {
s.cleanup()
}
s.metrics.Close()
// Close may be called multiple times, so we need to clear the resources only once.
s.clear()
return err
}
// clear clears the resources of the scanner.
func (s *scannerAdaptorImpl) clear() {
s.clearOnce.Do(func() {
if s.cleanup != nil {
s.cleanup()
}
s.metrics.Close()
})
}
func (s *scannerAdaptorImpl) execute() {
defer func() {
s.readOption.MesasgeHandler.Close()

View File

@ -1,11 +1,15 @@
package testutil
import (
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/channel"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
registry2 "github.com/milvus-io/milvus/internal/streamingnode/client/handler/registry"
)
func ResetEnvironment() {
channel.ResetStaticPChannelStatsManager()
registry.ResetRegistration()
snmanager.ResetStreamingNodeManager()
registry2.ResetRegisterLocalWALManager()
}

View File

@ -62,6 +62,13 @@ message PChannelMeta {
4; // keep the meta info assignment log that used to be assigned to.
}
// StreamingVersion is the version of the streaming service.
message StreamingVersion {
int64 version = 1; // version of the streaming,
// null if there's no streaming service running, the cluster is upgrading from 2.5.x or cluster startup.
// 1 means the first version of streaming service is already running.
}
// VersionPair is the version pair of global and local.
message VersionPair {
int64 global = 1;

File diff suppressed because it is too large Load Diff

View File

@ -89,7 +89,12 @@ func (c PChannelInfo) String() string {
return fmt.Sprintf("%s:%s@%d", c.Name, c.AccessMode, c.Term)
}
// PChannelInfoAssigned is a pair that represent a channel assignment of channel
type PChannelInfoAssigned struct {
Channel PChannelInfo
Node StreamingNodeInfo
}
func (c PChannelInfoAssigned) String() string {
return fmt.Sprintf("%s>%s", c.Channel, c.Node)
}

View File

@ -31,4 +31,13 @@ func TestPChannelInfo(t *testing.T) {
assert.Panics(t, func() {
NewPChannelInfoFromProto(&streamingpb.PChannelInfo{Name: "c", Term: -1})
})
c := PChannelInfoAssigned{
Channel: info,
Node: StreamingNodeInfo{
ServerID: 1,
Address: "127.0.0.1",
},
}
assert.Equal(t, c.String(), "pchannel:ro@1>1@127.0.0.1")
}

View File

@ -25,7 +25,7 @@ source $BASEDIR/setenv.sh
TEST_CMD=$@
if [ -z "$TEST_CMD" ]; then
TEST_CMD="go test"
TEST_CMD="go test -failfast -count=1"
fi
set -e
@ -41,7 +41,7 @@ for d in $(go list ./tests/integration/...); do
# simplified command to speed up coord init test since it is large.
$TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m
elif [[ $d == *"import"* ]]; then
go test -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=60m
$TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=60m
else
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m
fi

View File

@ -310,5 +310,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
}
func TestBalance(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(BalanceTestSuit))
}

View File

@ -259,5 +259,7 @@ func (s *ChannelExclusiveBalanceSuit) isSameChannel(segments []*querypb.SegmentV
}
func TestChannelExclusiveBalance(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ChannelExclusiveBalanceSuit))
}

View File

@ -162,13 +162,13 @@ func (s *ClusteringCompactionNullDataSuite) TestClusteringCompactionNullData() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
indexType := integration.IndexFaissIvfFlat
metricType := metric.L2
@ -310,5 +310,7 @@ func (s *ClusteringCompactionNullDataSuite) TestClusteringCompactionNullData() {
}
func TestClusteringCompactionNullData(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ClusteringCompactionNullDataSuite))
}

View File

@ -139,13 +139,13 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
indexType := integration.IndexFaissIvfFlat
metricType := metric.L2
@ -382,5 +382,7 @@ func ConstructVectorClusteringSchema(collection string, dim int, autoID bool, fi
}
func TestClusteringCompaction(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ClusteringCompactionSuite))
}

View File

@ -110,10 +110,10 @@ func (s *CoordDownSearch) loadCollection(collectionName string, dim int) {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
log.Info("=========================Data flush finished=========================")
// create index
@ -313,5 +313,7 @@ func (s *CoordDownSearch) TestSearchAfterCoordDown() {
}
func TestCoordDownSearch(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(CoordDownSearch))
}

View File

@ -108,10 +108,10 @@ func (s *CoordSwitchSuite) loadCollection(collectionName string, dim int) {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
log.Info("=========================Data flush finished=========================")
// create index

View File

@ -111,10 +111,10 @@ func (s *DataNodeSuite) loadCollection(collectionName string) {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
log.Info("=========================Data flush finished=========================")
// create index

View File

@ -129,13 +129,13 @@ func (s *ExpressionSuite) insertFlushIndexLoad(ctx context.Context, fieldData []
flushTs, has := flushResp.GetCollFlushTs()[s.collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, s.dbName, s.collectionName)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, s.dbName, s.collectionName)
// create index
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{

View File

@ -162,12 +162,11 @@ func (s *TestGetVectorSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.Require().True(has)
s.WaitForFlush(ctx, ids, flushTs, s.dbName, collection)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().NotEmpty(segments)
s.WaitForFlush(ctx, ids, flushTs, s.dbName, collection)
// create index
_, err = s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
DbName: s.dbName,

View File

@ -129,13 +129,13 @@ func (s *HelloMilvusSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -93,13 +93,13 @@ func (s *HybridSearchSuite) TestHybridSearch() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// load without index on vector fields
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
@ -312,13 +312,13 @@ func (s *HybridSearchSuite) TestHybridSearchSingleSubReq() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// load without index on vector fields
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{

View File

@ -109,13 +109,13 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, "", collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, "", collectionName)
// delete
beginIndex := 0

View File

@ -164,13 +164,13 @@ func (s *InternaltlsTestSuit) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -331,5 +331,7 @@ func (s *InternaltlsTestSuit) TearDownSuite() {
}
func TestInternalTLS(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(InternaltlsTestSuit))
}

View File

@ -746,13 +746,13 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, dbName, collec
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -63,13 +64,22 @@ func (s *LevelZeroSuite) TestDeletePartitionKeyHint() {
// Generate 2 growing segments with 2 differenct partition key 0, 1001, with exactlly same PK start from 0
s.generateSegment(collectionName, 1000, 0, false, 0)
s.generateSegment(collectionName, 1001, 0, false, 1001)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 2)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState())
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
}
var segments []*datapb.SegmentInfo
assert.Eventually(s.T(), func() bool {
var err error
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
if len(segments) == 2 {
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState())
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
}
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
L1SegIDs := lo.Map(segments, func(seg *datapb.SegmentInfo, _ int) int64 {
return seg.GetID()

View File

@ -116,13 +116,13 @@ func (s *MaterializedViewTestSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -22,6 +22,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -142,13 +143,17 @@ func (s *MetaWatcherSuite) TestShowSegments() {
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
log.Info("TestShowSegments succeed")
assert.Eventually(s.T(), func() bool {
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
if len(segments) != 0 {
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
}
func (s *MetaWatcherSuite) TestShowReplicas() {
@ -241,12 +246,17 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
})
s.NoError(err)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
assert.Eventually(s.T(), func() bool {
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
if len(segments) != 0 {
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)

View File

@ -394,19 +394,41 @@ func (cluster *MiniClusterV2) Stop() error {
if cluster.clientConn != nil {
cluster.clientConn.Close()
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if cluster.Proxy != nil {
cluster.Proxy.Stop()
log.Info("mini cluster proxy stopped")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
cluster.StopAllDataNodes()
}()
wg.Add(1)
go func() {
defer wg.Done()
cluster.StopAllStreamingNodes()
}()
wg.Add(1)
go func() {
defer wg.Done()
cluster.StopAllQueryNodes()
}()
wg.Wait()
if cluster.MixCoord != nil {
cluster.MixCoord.Stop()
log.Info("mini cluster rootCoord stopped")
}
if cluster.Proxy != nil {
cluster.Proxy.Stop()
log.Info("mini cluster proxy stopped")
}
cluster.StopAllDataNodes()
cluster.StopAllStreamingNodes()
cluster.StopAllQueryNodes()
streaming.Release()
log.Info("mini cluster streaming released")
cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
defer cluster.EtcdCli.Close()
@ -420,7 +442,6 @@ func (cluster *MiniClusterV2) Stop() error {
}
}
cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
streaming.Release()
return nil
}

View File

@ -153,13 +153,13 @@ func (s *NullDataSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -347,13 +347,13 @@ func (s *NullDataSuite) run() {
flushTs, has = flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// search
searchResult, err = c.Proxy.Search(ctx, searchReq)

View File

@ -538,5 +538,7 @@ func (s *PartialSearchTestSuit) TestSkipWaitTSafe() {
}
func TestPartialResult(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(PartialSearchTestSuit))
}

View File

@ -140,13 +140,13 @@ func (s *PartitionKeySuite) TestPartitionKey() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -111,10 +111,10 @@ func (s *QueryNodeSuite) loadCollection(collectionName string, dim int) {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName)
log.Info("=========================Data flush finished=========================")
// create index
@ -303,5 +303,7 @@ func (s *QueryNodeSuite) TestSwapQN() {
}
func TestQueryNodeUtil(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(QueryNodeSuite))
}

View File

@ -98,13 +98,13 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -247,13 +247,13 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -184,5 +184,7 @@ func (s *ReplicaTestSuit) TestNodeDownOnMultiReplica() {
}
func TestReplicas(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ReplicaTestSuit))
}

View File

@ -1032,5 +1032,7 @@ func (s *LoadTestSuite) TestLoadWithCompact() {
}
func TestReplicas(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(LoadTestSuite))
}

View File

@ -348,5 +348,7 @@ func newCreateCollectionConfig(collectionName string) *integration.CreateCollect
}
func TestResourceGroup(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ResourceGroupTestSuite))
}

View File

@ -360,5 +360,7 @@ func (s *ManualRollingUpgradeSuite) TestSuspendNode() {
}
func TestManualRollingUpgrade(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(ManualRollingUpgradeSuite))
}

View File

@ -40,5 +40,7 @@ func (s *SealSuite) TearDownSuite() {
}
func TestSealPolicies(t *testing.T) {
g := integration.WithoutStreamingService()
defer g()
suite.Run(t, new(SealSuite))
}

View File

@ -172,13 +172,13 @@ func (s *SearchSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{

View File

@ -248,13 +248,13 @@ func (s *SparseTestSuite) TestSparse_invalid_index_build() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// unsupported index type
indexParams := []*commonpb.KeyValuePair{
@ -408,13 +408,13 @@ func (s *SparseTestSuite) TestSparse_invalid_search_request() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
indexType := integration.IndexSparseInvertedIndex
metricType := metric.IP

View File

@ -262,13 +262,13 @@ func (s *StatsTaskCheckerSuite) run() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, s.dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, s.dbName, collectionName)
}
// create index

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -45,13 +44,11 @@ type HelloStreamingSuite struct {
}
func (s *HelloStreamingSuite) SetupSuite() {
streamingutil.SetStreamingServiceEnabled()
s.MiniClusterSuite.SetupSuite()
}
func (s *HelloStreamingSuite) TeardownSuite() {
s.MiniClusterSuite.TearDownSuite()
streamingutil.UnsetStreamingServiceEnabled()
}
func (s *HelloStreamingSuite) TestHelloStreaming() {

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"github.com/blang/semver/v4"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
@ -31,6 +32,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@ -40,6 +43,18 @@ var caseTimeout time.Duration
func init() {
flag.DurationVar(&caseTimeout, "caseTimeout", 10*time.Minute, "timeout duration for single case")
streamingutil.SetStreamingServiceEnabled()
}
// WithoutStreamingService run the test not in streaming service
func WithoutStreamingService() func() {
oldVersion := common.Version
common.Version = semver.MustParse("2.5.9")
streamingutil.UnsetStreamingServiceEnabled()
return func() {
common.Version = oldVersion
streamingutil.SetStreamingServiceEnabled()
}
}
// EmbedEtcdSuite contains embed setup & teardown related logic
@ -130,6 +145,7 @@ func (s *MiniClusterSuite) SetupTest() {
select {
case <-timeoutCtx.Done():
s.Fail("node id check timeout")
return
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
s.T().Log("node id report info: ", reportInfo)

View File

@ -101,13 +101,13 @@ func (s *UpsertSuite) TestUpsertAutoIDFalse() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -233,13 +233,13 @@ func (s *UpsertSuite) TestUpsertAutoIDTrue() {
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{