enhance: support all partitions in shard manager for L0 segment (#43385)

issue: #42416

- change the key from partitionID into PartitionUniqueKey to support
AllPartitionsID

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-18 11:40:51 +08:00 committed by GitHub
parent 5aa7a116d2
commit b142589942
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 215 additions and 164 deletions

View File

@ -301,17 +301,17 @@ func (_c *MockShardManager_CheckIfCollectionExists_Call) RunAndReturn(run func(i
return _c
}
// CheckIfPartitionCanBeCreated provides a mock function with given fields: collectionID, partitionID
func (_m *MockShardManager) CheckIfPartitionCanBeCreated(collectionID int64, partitionID int64) error {
ret := _m.Called(collectionID, partitionID)
// CheckIfPartitionCanBeCreated provides a mock function with given fields: uniquePartitionKey
func (_m *MockShardManager) CheckIfPartitionCanBeCreated(uniquePartitionKey shards.PartitionUniqueKey) error {
ret := _m.Called(uniquePartitionKey)
if len(ret) == 0 {
panic("no return value specified for CheckIfPartitionCanBeCreated")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(collectionID, partitionID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey) error); ok {
r0 = rf(uniquePartitionKey)
} else {
r0 = ret.Error(0)
}
@ -325,15 +325,14 @@ type MockShardManager_CheckIfPartitionCanBeCreated_Call struct {
}
// CheckIfPartitionCanBeCreated is a helper method to define mock.On call
// - collectionID int64
// - partitionID int64
func (_e *MockShardManager_Expecter) CheckIfPartitionCanBeCreated(collectionID interface{}, partitionID interface{}) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
return &MockShardManager_CheckIfPartitionCanBeCreated_Call{Call: _e.mock.On("CheckIfPartitionCanBeCreated", collectionID, partitionID)}
// - uniquePartitionKey shards.PartitionUniqueKey
func (_e *MockShardManager_Expecter) CheckIfPartitionCanBeCreated(uniquePartitionKey interface{}) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
return &MockShardManager_CheckIfPartitionCanBeCreated_Call{Call: _e.mock.On("CheckIfPartitionCanBeCreated", uniquePartitionKey)}
}
func (_c *MockShardManager_CheckIfPartitionCanBeCreated_Call) Run(run func(collectionID int64, partitionID int64)) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
func (_c *MockShardManager_CheckIfPartitionCanBeCreated_Call) Run(run func(uniquePartitionKey shards.PartitionUniqueKey)) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
run(args[0].(shards.PartitionUniqueKey))
})
return _c
}
@ -343,22 +342,22 @@ func (_c *MockShardManager_CheckIfPartitionCanBeCreated_Call) Return(_a0 error)
return _c
}
func (_c *MockShardManager_CheckIfPartitionCanBeCreated_Call) RunAndReturn(run func(int64, int64) error) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
func (_c *MockShardManager_CheckIfPartitionCanBeCreated_Call) RunAndReturn(run func(shards.PartitionUniqueKey) error) *MockShardManager_CheckIfPartitionCanBeCreated_Call {
_c.Call.Return(run)
return _c
}
// CheckIfPartitionExists provides a mock function with given fields: collectionID, partitionID
func (_m *MockShardManager) CheckIfPartitionExists(collectionID int64, partitionID int64) error {
ret := _m.Called(collectionID, partitionID)
// CheckIfPartitionExists provides a mock function with given fields: uniquePartitionKey
func (_m *MockShardManager) CheckIfPartitionExists(uniquePartitionKey shards.PartitionUniqueKey) error {
ret := _m.Called(uniquePartitionKey)
if len(ret) == 0 {
panic("no return value specified for CheckIfPartitionExists")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(collectionID, partitionID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey) error); ok {
r0 = rf(uniquePartitionKey)
} else {
r0 = ret.Error(0)
}
@ -372,15 +371,14 @@ type MockShardManager_CheckIfPartitionExists_Call struct {
}
// CheckIfPartitionExists is a helper method to define mock.On call
// - collectionID int64
// - partitionID int64
func (_e *MockShardManager_Expecter) CheckIfPartitionExists(collectionID interface{}, partitionID interface{}) *MockShardManager_CheckIfPartitionExists_Call {
return &MockShardManager_CheckIfPartitionExists_Call{Call: _e.mock.On("CheckIfPartitionExists", collectionID, partitionID)}
// - uniquePartitionKey shards.PartitionUniqueKey
func (_e *MockShardManager_Expecter) CheckIfPartitionExists(uniquePartitionKey interface{}) *MockShardManager_CheckIfPartitionExists_Call {
return &MockShardManager_CheckIfPartitionExists_Call{Call: _e.mock.On("CheckIfPartitionExists", uniquePartitionKey)}
}
func (_c *MockShardManager_CheckIfPartitionExists_Call) Run(run func(collectionID int64, partitionID int64)) *MockShardManager_CheckIfPartitionExists_Call {
func (_c *MockShardManager_CheckIfPartitionExists_Call) Run(run func(uniquePartitionKey shards.PartitionUniqueKey)) *MockShardManager_CheckIfPartitionExists_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
run(args[0].(shards.PartitionUniqueKey))
})
return _c
}
@ -390,22 +388,22 @@ func (_c *MockShardManager_CheckIfPartitionExists_Call) Return(_a0 error) *MockS
return _c
}
func (_c *MockShardManager_CheckIfPartitionExists_Call) RunAndReturn(run func(int64, int64) error) *MockShardManager_CheckIfPartitionExists_Call {
func (_c *MockShardManager_CheckIfPartitionExists_Call) RunAndReturn(run func(shards.PartitionUniqueKey) error) *MockShardManager_CheckIfPartitionExists_Call {
_c.Call.Return(run)
return _c
}
// CheckIfSegmentCanBeCreated provides a mock function with given fields: collectionID, partitionID, segmentID
func (_m *MockShardManager) CheckIfSegmentCanBeCreated(collectionID int64, partitionID int64, segmentID int64) error {
ret := _m.Called(collectionID, partitionID, segmentID)
// CheckIfSegmentCanBeCreated provides a mock function with given fields: uniquePartitionKey, segmentID
func (_m *MockShardManager) CheckIfSegmentCanBeCreated(uniquePartitionKey shards.PartitionUniqueKey, segmentID int64) error {
ret := _m.Called(uniquePartitionKey, segmentID)
if len(ret) == 0 {
panic("no return value specified for CheckIfSegmentCanBeCreated")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64, int64) error); ok {
r0 = rf(collectionID, partitionID, segmentID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey, int64) error); ok {
r0 = rf(uniquePartitionKey, segmentID)
} else {
r0 = ret.Error(0)
}
@ -419,16 +417,15 @@ type MockShardManager_CheckIfSegmentCanBeCreated_Call struct {
}
// CheckIfSegmentCanBeCreated is a helper method to define mock.On call
// - collectionID int64
// - partitionID int64
// - uniquePartitionKey shards.PartitionUniqueKey
// - segmentID int64
func (_e *MockShardManager_Expecter) CheckIfSegmentCanBeCreated(collectionID interface{}, partitionID interface{}, segmentID interface{}) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
return &MockShardManager_CheckIfSegmentCanBeCreated_Call{Call: _e.mock.On("CheckIfSegmentCanBeCreated", collectionID, partitionID, segmentID)}
func (_e *MockShardManager_Expecter) CheckIfSegmentCanBeCreated(uniquePartitionKey interface{}, segmentID interface{}) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
return &MockShardManager_CheckIfSegmentCanBeCreated_Call{Call: _e.mock.On("CheckIfSegmentCanBeCreated", uniquePartitionKey, segmentID)}
}
func (_c *MockShardManager_CheckIfSegmentCanBeCreated_Call) Run(run func(collectionID int64, partitionID int64, segmentID int64)) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
func (_c *MockShardManager_CheckIfSegmentCanBeCreated_Call) Run(run func(uniquePartitionKey shards.PartitionUniqueKey, segmentID int64)) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64), args[2].(int64))
run(args[0].(shards.PartitionUniqueKey), args[1].(int64))
})
return _c
}
@ -438,22 +435,22 @@ func (_c *MockShardManager_CheckIfSegmentCanBeCreated_Call) Return(_a0 error) *M
return _c
}
func (_c *MockShardManager_CheckIfSegmentCanBeCreated_Call) RunAndReturn(run func(int64, int64, int64) error) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
func (_c *MockShardManager_CheckIfSegmentCanBeCreated_Call) RunAndReturn(run func(shards.PartitionUniqueKey, int64) error) *MockShardManager_CheckIfSegmentCanBeCreated_Call {
_c.Call.Return(run)
return _c
}
// CheckIfSegmentCanBeFlushed provides a mock function with given fields: collecionID, partitionID, segmentID
func (_m *MockShardManager) CheckIfSegmentCanBeFlushed(collecionID int64, partitionID int64, segmentID int64) error {
ret := _m.Called(collecionID, partitionID, segmentID)
// CheckIfSegmentCanBeFlushed provides a mock function with given fields: uniquePartitionKey, segmentID
func (_m *MockShardManager) CheckIfSegmentCanBeFlushed(uniquePartitionKey shards.PartitionUniqueKey, segmentID int64) error {
ret := _m.Called(uniquePartitionKey, segmentID)
if len(ret) == 0 {
panic("no return value specified for CheckIfSegmentCanBeFlushed")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64, int64) error); ok {
r0 = rf(collecionID, partitionID, segmentID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey, int64) error); ok {
r0 = rf(uniquePartitionKey, segmentID)
} else {
r0 = ret.Error(0)
}
@ -467,16 +464,15 @@ type MockShardManager_CheckIfSegmentCanBeFlushed_Call struct {
}
// CheckIfSegmentCanBeFlushed is a helper method to define mock.On call
// - collecionID int64
// - partitionID int64
// - uniquePartitionKey shards.PartitionUniqueKey
// - segmentID int64
func (_e *MockShardManager_Expecter) CheckIfSegmentCanBeFlushed(collecionID interface{}, partitionID interface{}, segmentID interface{}) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
return &MockShardManager_CheckIfSegmentCanBeFlushed_Call{Call: _e.mock.On("CheckIfSegmentCanBeFlushed", collecionID, partitionID, segmentID)}
func (_e *MockShardManager_Expecter) CheckIfSegmentCanBeFlushed(uniquePartitionKey interface{}, segmentID interface{}) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
return &MockShardManager_CheckIfSegmentCanBeFlushed_Call{Call: _e.mock.On("CheckIfSegmentCanBeFlushed", uniquePartitionKey, segmentID)}
}
func (_c *MockShardManager_CheckIfSegmentCanBeFlushed_Call) Run(run func(collecionID int64, partitionID int64, segmentID int64)) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
func (_c *MockShardManager_CheckIfSegmentCanBeFlushed_Call) Run(run func(uniquePartitionKey shards.PartitionUniqueKey, segmentID int64)) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64), args[2].(int64))
run(args[0].(shards.PartitionUniqueKey), args[1].(int64))
})
return _c
}
@ -486,7 +482,7 @@ func (_c *MockShardManager_CheckIfSegmentCanBeFlushed_Call) Return(_a0 error) *M
return _c
}
func (_c *MockShardManager_CheckIfSegmentCanBeFlushed_Call) RunAndReturn(run func(int64, int64, int64) error) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
func (_c *MockShardManager_CheckIfSegmentCanBeFlushed_Call) RunAndReturn(run func(shards.PartitionUniqueKey, int64) error) *MockShardManager_CheckIfSegmentCanBeFlushed_Call {
_c.Call.Return(run)
return _c
}
@ -827,9 +823,9 @@ func (_c *MockShardManager_Logger_Call) RunAndReturn(run func() *log.MLogger) *M
return _c
}
// WaitUntilGrowingSegmentReady provides a mock function with given fields: collectionID, partitonID
func (_m *MockShardManager) WaitUntilGrowingSegmentReady(collectionID int64, partitonID int64) (<-chan struct{}, error) {
ret := _m.Called(collectionID, partitonID)
// WaitUntilGrowingSegmentReady provides a mock function with given fields: uniquePartitionKey
func (_m *MockShardManager) WaitUntilGrowingSegmentReady(uniquePartitionKey shards.PartitionUniqueKey) (<-chan struct{}, error) {
ret := _m.Called(uniquePartitionKey)
if len(ret) == 0 {
panic("no return value specified for WaitUntilGrowingSegmentReady")
@ -837,19 +833,19 @@ func (_m *MockShardManager) WaitUntilGrowingSegmentReady(collectionID int64, par
var r0 <-chan struct{}
var r1 error
if rf, ok := ret.Get(0).(func(int64, int64) (<-chan struct{}, error)); ok {
return rf(collectionID, partitonID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey) (<-chan struct{}, error)); ok {
return rf(uniquePartitionKey)
}
if rf, ok := ret.Get(0).(func(int64, int64) <-chan struct{}); ok {
r0 = rf(collectionID, partitonID)
if rf, ok := ret.Get(0).(func(shards.PartitionUniqueKey) <-chan struct{}); ok {
r0 = rf(uniquePartitionKey)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
if rf, ok := ret.Get(1).(func(int64, int64) error); ok {
r1 = rf(collectionID, partitonID)
if rf, ok := ret.Get(1).(func(shards.PartitionUniqueKey) error); ok {
r1 = rf(uniquePartitionKey)
} else {
r1 = ret.Error(1)
}
@ -863,15 +859,14 @@ type MockShardManager_WaitUntilGrowingSegmentReady_Call struct {
}
// WaitUntilGrowingSegmentReady is a helper method to define mock.On call
// - collectionID int64
// - partitonID int64
func (_e *MockShardManager_Expecter) WaitUntilGrowingSegmentReady(collectionID interface{}, partitonID interface{}) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
return &MockShardManager_WaitUntilGrowingSegmentReady_Call{Call: _e.mock.On("WaitUntilGrowingSegmentReady", collectionID, partitonID)}
// - uniquePartitionKey shards.PartitionUniqueKey
func (_e *MockShardManager_Expecter) WaitUntilGrowingSegmentReady(uniquePartitionKey interface{}) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
return &MockShardManager_WaitUntilGrowingSegmentReady_Call{Call: _e.mock.On("WaitUntilGrowingSegmentReady", uniquePartitionKey)}
}
func (_c *MockShardManager_WaitUntilGrowingSegmentReady_Call) Run(run func(collectionID int64, partitonID int64)) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
func (_c *MockShardManager_WaitUntilGrowingSegmentReady_Call) Run(run func(uniquePartitionKey shards.PartitionUniqueKey)) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
run(args[0].(shards.PartitionUniqueKey))
})
return _c
}
@ -881,7 +876,7 @@ func (_c *MockShardManager_WaitUntilGrowingSegmentReady_Call) Return(_a0 <-chan
return _c
}
func (_c *MockShardManager_WaitUntilGrowingSegmentReady_Call) RunAndReturn(run func(int64, int64) (<-chan struct{}, error)) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
func (_c *MockShardManager_WaitUntilGrowingSegmentReady_Call) RunAndReturn(run func(shards.PartitionUniqueKey) (<-chan struct{}, error)) *MockShardManager_WaitUntilGrowingSegmentReady_Call {
_c.Call.Return(run)
return _c
}

View File

@ -53,7 +53,8 @@ func (r *redoAppendInterceptor) waitUntilGrowingSegmentReady(ctx context.Context
panic("insert message should only have one partition")
}
for _, partition := range h.Partitions {
ready, err := r.shardManager.WaitUntilGrowingSegmentReady(h.CollectionId, partition.PartitionId)
uniqueKey := shards.PartitionUniqueKey{CollectionID: h.CollectionId, PartitionID: partition.PartitionId}
ready, err := r.shardManager.WaitUntilGrowingSegmentReady(uniqueKey)
if err != nil {
return err
}

View File

@ -100,7 +100,7 @@ func (impl *shardInterceptor) handleDropCollection(ctx context.Context, msg mess
func (impl *shardInterceptor) handleCreatePartition(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) {
createPartitionMessage := message.MustAsMutableCreatePartitionMessageV1(msg)
h := createPartitionMessage.Header()
if err := impl.shardManager.CheckIfPartitionCanBeCreated(h.GetCollectionId(), h.GetPartitionId()); err != nil {
if err := impl.shardManager.CheckIfPartitionCanBeCreated(shards.PartitionUniqueKey{CollectionID: h.GetCollectionId(), PartitionID: h.GetPartitionId()}); err != nil {
impl.shardManager.Logger().Warn("partition already exists when creating partition", zap.Int64("collectionID", h.GetCollectionId()), zap.Int64("partitionID", h.GetPartitionId()))
// TODO: idompotent for wal is required in future, but current milvus state is not recovered from wal.
// return nil, status.NewUnrecoverableError(err.Error())
@ -118,7 +118,7 @@ func (impl *shardInterceptor) handleCreatePartition(ctx context.Context, msg mes
func (impl *shardInterceptor) handleDropPartition(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) {
dropPartitionMessage := message.MustAsMutableDropPartitionMessageV1(msg)
h := dropPartitionMessage.Header()
if err := impl.shardManager.CheckIfPartitionExists(h.GetCollectionId(), h.GetPartitionId()); err != nil {
if err := impl.shardManager.CheckIfPartitionExists(shards.PartitionUniqueKey{CollectionID: h.GetCollectionId(), PartitionID: h.GetPartitionId()}); err != nil {
impl.shardManager.Logger().Warn("partition not found when dropping partition", zap.Int64("collectionID", h.GetCollectionId()), zap.Int64("partitionID", h.GetPartitionId()))
// The partition can not be dropped at current shard, ignored
// TODO: idompotent for wal is required in future, but current milvus state is not recovered from wal.
@ -245,7 +245,7 @@ func (impl *shardInterceptor) handleSchemaChange(ctx context.Context, msg messag
func (impl *shardInterceptor) handleCreateSegment(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) {
createSegmentMsg := message.MustAsMutableCreateSegmentMessageV2(msg)
h := createSegmentMsg.Header()
if err := impl.shardManager.CheckIfSegmentCanBeCreated(h.GetCollectionId(), h.GetPartitionId(), h.GetSegmentId()); err != nil {
if err := impl.shardManager.CheckIfSegmentCanBeCreated(shards.PartitionUniqueKey{CollectionID: h.GetCollectionId(), PartitionID: h.GetPartitionId()}, h.GetSegmentId()); err != nil {
// The segment can not be created at current shard, ignored
return nil, status.NewUnrecoverableError(err.Error())
}
@ -268,7 +268,7 @@ func (impl *shardInterceptor) handleFlushSegment(ctx context.Context, msg messag
return appendOp(ctx, msg)
}
if err := impl.shardManager.CheckIfSegmentCanBeFlushed(h.GetCollectionId(), h.GetPartitionId(), h.GetSegmentId()); err != nil {
if err := impl.shardManager.CheckIfSegmentCanBeFlushed(shards.PartitionUniqueKey{CollectionID: h.GetCollectionId(), PartitionID: h.GetPartitionId()}, h.GetSegmentId()); err != nil {
// The segment can not be flushed at current shard, ignored
return nil, status.NewUnrecoverableError(err.Error())
}

View File

@ -85,14 +85,14 @@ func TestShardInterceptor(t *testing.T) {
}).
WithBody(&msgpb.CreatePartitionRequest{}).
MustBuildMutable()
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything).Return(nil)
shardManager.EXPECT().CreatePartition(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
assert.NotNil(t, msgID)
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything).Unset()
shardManager.EXPECT().CheckIfPartitionCanBeCreated(mock.Anything).Return(mockErr)
shardManager.EXPECT().CreatePartition(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
@ -106,14 +106,14 @@ func TestShardInterceptor(t *testing.T) {
}).
WithBody(&msgpb.DropPartitionRequest{}).
MustBuildMutable()
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything).Return(nil)
shardManager.EXPECT().DropPartition(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
assert.NotNil(t, msgID)
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything).Unset()
shardManager.EXPECT().CheckIfPartitionExists(mock.Anything).Return(mockErr)
shardManager.EXPECT().DropPartition(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
@ -128,14 +128,14 @@ func TestShardInterceptor(t *testing.T) {
}).
WithBody(&messagespb.CreateSegmentMessageBody{}).
MustBuildMutable()
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().CreateSegment(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
assert.NotNil(t, msgID)
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfSegmentCanBeCreated(mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().CreateSegment(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.Error(t, err)
@ -150,14 +150,14 @@ func TestShardInterceptor(t *testing.T) {
}).
WithBody(&messagespb.FlushMessageBody{}).
MustBuildMutable()
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything).Return(nil)
shardManager.EXPECT().FlushSegment(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
assert.NotNil(t, msgID)
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything).Unset()
shardManager.EXPECT().CheckIfSegmentCanBeFlushed(mock.Anything, mock.Anything).Return(mockErr)
shardManager.EXPECT().FlushSegment(mock.Anything).Return()
msgID, err = i.DoAppend(ctx, msg, appender)
assert.Error(t, err)

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/stats"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/recovery"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
@ -54,20 +55,21 @@ func RecoverShardManager(param *ShardManagerRecoverParam) ShardManager {
ctx, cancel := context.WithCancel(context.Background())
logger := resource.Resource().Logger().With(log.FieldComponent("shard-manager")).With(zap.Stringer("pchannel", param.ChannelInfo))
// create managers list.
managers := make(map[int64]*partitionManager)
managers := make(map[PartitionUniqueKey]*partitionManager)
segmentTotal := 0
metrics := metricsutil.NewSegmentAssignMetrics(param.ChannelInfo.Name)
for collectionID, collectionInfo := range collections {
for partitionID := range collectionInfo.PartitionIDs {
segmentManagers := make(map[int64]*segmentAllocManager, 0)
// recovery meta is recovered , use it.
if managers, ok := partitionToSegmentManagers[partitionID]; ok {
uniqueKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
if managers, ok := partitionToSegmentManagers[uniqueKey]; ok {
segmentManagers = managers
}
if _, ok := managers[partitionID]; ok {
if _, ok := managers[uniqueKey]; ok {
panic("partition manager already exists when buildNewPartitionManagers in segment assignment service, there's a bug in system")
}
managers[partitionID] = newPartitionSegmentManager(
managers[uniqueKey] = newPartitionSegmentManager(
ctx,
logger,
param.WAL,
@ -100,7 +102,7 @@ func RecoverShardManager(param *ShardManagerRecoverParam) ShardManager {
belongs := lo.Values(segmentBelongs)
stats := make([]*stats.SegmentStats, 0, len(belongs))
for _, belong := range belongs {
stat := m.partitionManagers[belong.PartitionID].segments[belong.SegmentID].GetStatFromRecovery()
stat := m.partitionManagers[belong.PartitionUniqueKey()].segments[belong.SegmentID].GetStatFromRecovery()
stats = append(stats, stat)
}
resource.Resource().SegmentStatsManager().RegisterSealOperator(m, belongs, stats)
@ -109,11 +111,11 @@ func RecoverShardManager(param *ShardManagerRecoverParam) ShardManager {
// newSegmentAllocManagersFromRecovery creates new segment alloc managers from the recovery snapshot.
func newSegmentAllocManagersFromRecovery(pchannel types.PChannelInfo, recoverInfos *recovery.RecoverySnapshot, collections map[int64]*CollectionInfo) (
map[int64]map[int64]*segmentAllocManager,
map[PartitionUniqueKey]map[int64]*segmentAllocManager,
map[int64]stats.SegmentBelongs,
) {
// recover the segment infos from the streaming node segment assignment meta storage
partitionToSegmentManagers := make(map[int64]map[int64]*segmentAllocManager)
partitionToSegmentManagers := make(map[PartitionUniqueKey]map[int64]*segmentAllocManager)
growingBelongs := make(map[int64]stats.SegmentBelongs)
for _, rawMeta := range recoverInfos.SegmentAssignments {
m := newSegmentAllocManagerFromProto(pchannel, rawMeta)
@ -134,10 +136,14 @@ func newSegmentAllocManagersFromRecovery(pchannel types.PChannelInfo, recoverInf
PartitionID: rawMeta.GetPartitionId(),
SegmentID: m.GetSegmentID(),
}
if _, ok := partitionToSegmentManagers[rawMeta.GetPartitionId()]; !ok {
partitionToSegmentManagers[rawMeta.GetPartitionId()] = make(map[int64]*segmentAllocManager, 2)
uniqueKey := PartitionUniqueKey{
CollectionID: rawMeta.GetCollectionId(),
PartitionID: rawMeta.GetPartitionId(),
}
partitionToSegmentManagers[rawMeta.GetPartitionId()][rawMeta.GetSegmentId()] = m
if _, ok := partitionToSegmentManagers[uniqueKey]; !ok {
partitionToSegmentManagers[uniqueKey] = make(map[int64]*segmentAllocManager, 2)
}
partitionToSegmentManagers[uniqueKey][rawMeta.GetSegmentId()] = m
}
return partitionToSegmentManagers, growingBelongs
}
@ -151,6 +157,8 @@ func newCollectionInfos(recoverInfos *recovery.RecoverySnapshot) map[int64]*Coll
for _, partition := range vchannelInfo.CollectionInfo.Partitions {
currentPartition[partition.PartitionId] = struct{}{}
}
// add all partitions id into the collection info.
currentPartition[common.AllPartitionsID] = struct{}{}
collectionInfoMap[vchannelInfo.CollectionInfo.CollectionId] = &CollectionInfo{
VChannel: vchannelInfo.Vchannel,
PartitionIDs: currentPartition,
@ -170,7 +178,7 @@ type shardManagerImpl struct {
cancel context.CancelFunc
wal *syncutil.Future[wal.WAL]
pchannel types.PChannelInfo
partitionManagers map[int64]*partitionManager // map partitionID to partition manager
partitionManagers map[PartitionUniqueKey]*partitionManager // map partitionID to partition manager
collections map[int64]*CollectionInfo // map collectionID to collectionInfo
metrics *metricsutil.SegmentAssignMetrics
txnManager TxnManager
@ -197,7 +205,8 @@ func (m *shardManagerImpl) Close() {
}
func (m *shardManagerImpl) updateMetrics() {
m.metrics.UpdatePartitionCount(len(m.partitionManagers))
// the partition managers contains the all partitions id, so we need to subtract the collections count.
m.metrics.UpdatePartitionCount(len(m.partitionManagers) - len(m.collections))
m.metrics.UpdateCollectionCount(len(m.collections))
}
@ -210,5 +219,7 @@ func newCollectionInfo(vchannel string, partitionIDs []int64) *CollectionInfo {
for _, partitionID := range partitionIDs {
info.PartitionIDs[partitionID] = struct{}{}
}
// add all partitions id into the collection info.
info.PartitionIDs[common.AllPartitionsID] = struct{}{}
return info
}

View File

@ -59,12 +59,13 @@ func (m *shardManagerImpl) CreateCollection(msg message.ImmutableCreateCollectio
}
m.collections[collectionID] = newCollectionInfo(vchannel, partitionIDs)
for _, partitionID := range partitionIDs {
if _, ok := m.partitionManagers[partitionID]; ok {
for partitionID := range m.collections[collectionID].PartitionIDs {
uniqueKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
if _, ok := m.partitionManagers[uniqueKey]; ok {
logger.Warn("partition already exists", zap.Int64("partitionID", partitionID))
continue
}
m.partitionManagers[partitionID] = newPartitionSegmentManager(
m.partitionManagers[uniqueKey] = newPartitionSegmentManager(
m.ctx,
m.Logger(),
m.wal,
@ -103,7 +104,8 @@ func (m *shardManagerImpl) DropCollection(msg message.ImmutableDropCollectionMes
partitionIDs := make([]int64, 0, len(collectionInfo.PartitionIDs))
segmentIDs := make([]int64, 0, len(collectionInfo.PartitionIDs))
for partitionID := range collectionInfo.PartitionIDs {
pm, ok := m.partitionManagers[partitionID]
uniqueKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
pm, ok := m.partitionManagers[uniqueKey]
if !ok {
logger.Warn("partition not exists", zap.Int64("partitionID", partitionID))
continue
@ -112,7 +114,7 @@ func (m *shardManagerImpl) DropCollection(msg message.ImmutableDropCollectionMes
segments := pm.FlushAndDropPartition(policy.PolicyCollectionRemoved())
partitionIDs = append(partitionIDs, partitionID)
segmentIDs = append(segmentIDs, segments...)
delete(m.partitionManagers, partitionID)
delete(m.partitionManagers, uniqueKey)
}
logger.Info("collection removed", zap.Int64s("partitionIDs", partitionIDs), zap.Int64s("segmentIDs", segmentIDs))
m.updateMetrics()

View File

@ -20,17 +20,17 @@ type ShardManager interface {
DropCollection(msg message.ImmutableDropCollectionMessageV1)
CheckIfPartitionCanBeCreated(collectionID int64, partitionID int64) error
CheckIfPartitionCanBeCreated(uniquePartitionKey PartitionUniqueKey) error
CheckIfPartitionExists(collectionID int64, partitionID int64) error
CheckIfPartitionExists(uniquePartitionKey PartitionUniqueKey) error
CreatePartition(msg message.ImmutableCreatePartitionMessageV1)
DropPartition(msg message.ImmutableDropPartitionMessageV1)
CheckIfSegmentCanBeCreated(collectionID int64, partitionID int64, segmentID int64) error
CheckIfSegmentCanBeCreated(uniquePartitionKey PartitionUniqueKey, segmentID int64) error
CheckIfSegmentCanBeFlushed(collecionID int64, partitionID int64, segmentID int64) error
CheckIfSegmentCanBeFlushed(uniquePartitionKey PartitionUniqueKey, segmentID int64) error
CreateSegment(msg message.ImmutableCreateSegmentMessageV2)
@ -40,7 +40,7 @@ type ShardManager interface {
ApplyDelete(msg message.MutableDeleteMessageV1) error
WaitUntilGrowingSegmentReady(collectionID int64, partitonID int64) (<-chan struct{}, error)
WaitUntilGrowingSegmentReady(uniquePartitionKey PartitionUniqueKey) (<-chan struct{}, error)
FlushAndFenceSegmentAllocUntil(collectionID int64, timetick uint64) ([]int64, error)

View File

@ -9,39 +9,39 @@ import (
)
// CheckIfPartitionCanBeCreated checks if a partition can be created.
func (m *shardManagerImpl) CheckIfPartitionCanBeCreated(collectionID int64, partitionID int64) error {
func (m *shardManagerImpl) CheckIfPartitionCanBeCreated(uniquePartitionKey PartitionUniqueKey) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.checkIfPartitionCanBeCreated(collectionID, partitionID)
return m.checkIfPartitionCanBeCreated(uniquePartitionKey)
}
// checkIfPartitionCanBeCreated checks if a partition can be created.
func (m *shardManagerImpl) checkIfPartitionCanBeCreated(collectionID int64, partitionID int64) error {
if _, ok := m.collections[collectionID]; !ok {
func (m *shardManagerImpl) checkIfPartitionCanBeCreated(uniquePartitionKey PartitionUniqueKey) error {
if _, ok := m.collections[uniquePartitionKey.CollectionID]; !ok {
return ErrCollectionNotFound
}
if _, ok := m.collections[collectionID].PartitionIDs[partitionID]; ok {
if _, ok := m.collections[uniquePartitionKey.CollectionID].PartitionIDs[uniquePartitionKey.PartitionID]; ok {
return ErrPartitionExists
}
return nil
}
// CheckIfPartitionExists checks if a partition can be dropped.
func (m *shardManagerImpl) CheckIfPartitionExists(collectionID int64, partitionID int64) error {
func (m *shardManagerImpl) CheckIfPartitionExists(uniquePartitionKey PartitionUniqueKey) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.checkIfPartitionExists(collectionID, partitionID)
return m.checkIfPartitionExists(uniquePartitionKey)
}
// checkIfPartitionExists checks if a partition can be dropped.
func (m *shardManagerImpl) checkIfPartitionExists(collectionID int64, partitionID int64) error {
if _, ok := m.collections[collectionID]; !ok {
func (m *shardManagerImpl) checkIfPartitionExists(uniquePartitionKey PartitionUniqueKey) error {
if _, ok := m.collections[uniquePartitionKey.CollectionID]; !ok {
return ErrCollectionNotFound
}
if _, ok := m.collections[collectionID].PartitionIDs[partitionID]; !ok {
if _, ok := m.collections[uniquePartitionKey.CollectionID].PartitionIDs[uniquePartitionKey.PartitionID]; !ok {
return ErrPartitionNotFound
}
return nil
@ -58,17 +58,18 @@ func (m *shardManagerImpl) CreatePartition(msg message.ImmutableCreatePartitionM
m.mu.Lock()
defer m.mu.Unlock()
if err := m.checkIfPartitionCanBeCreated(collectionID, partitionID); err != nil {
uniquePartitionKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
if err := m.checkIfPartitionCanBeCreated(uniquePartitionKey); err != nil {
logger.Warn("partition can not be created", zap.Error(err))
return
}
m.collections[collectionID].PartitionIDs[partitionID] = struct{}{}
if _, ok := m.partitionManagers[partitionID]; ok {
if _, ok := m.partitionManagers[uniquePartitionKey]; ok {
logger.Warn("partition manager already exists")
return
}
m.partitionManagers[partitionID] = newPartitionSegmentManager(
m.partitionManagers[uniquePartitionKey] = newPartitionSegmentManager(
m.ctx,
m.Logger(),
m.wal,
@ -95,19 +96,20 @@ func (m *shardManagerImpl) DropPartition(msg message.ImmutableDropPartitionMessa
m.mu.Lock()
defer m.mu.Unlock()
if err := m.checkIfPartitionExists(collectionID, partitionID); err != nil {
uniquePartitionKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
if err := m.checkIfPartitionExists(uniquePartitionKey); err != nil {
logger.Warn("partition can not be dropped", zap.Error(err))
return
}
delete(m.collections[collectionID].PartitionIDs, partitionID)
pm, ok := m.partitionManagers[partitionID]
pm, ok := m.partitionManagers[uniquePartitionKey]
if !ok {
logger.Warn("partition not exists", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
return
}
delete(m.partitionManagers, partitionID)
delete(m.partitionManagers, uniquePartitionKey)
segmentIDs := pm.FlushAndDropPartition(policy.PolicyPartitionRemoved())
m.Logger().Info("partition removed", zap.Int64s("segmentIDs", segmentIDs))
m.updateMetrics()

View File

@ -33,43 +33,43 @@ func (r *AssignSegmentResult) Ack() {
}
// CheckIfSegmentCanBeCreated checks if a segment can be created for the specified collection and partition.
func (m *shardManagerImpl) CheckIfSegmentCanBeCreated(collectionID int64, partitionID int64, segmentID int64) error {
func (m *shardManagerImpl) CheckIfSegmentCanBeCreated(uniquePartitionKey PartitionUniqueKey, segmentID int64) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.checkIfSegmentCanBeCreated(collectionID, partitionID, segmentID)
return m.checkIfSegmentCanBeCreated(uniquePartitionKey, segmentID)
}
// checkIfSegmentCanBeCreated checks if a segment can be created for the specified collection and partition.
func (m *shardManagerImpl) checkIfSegmentCanBeCreated(collectionID int64, partitionID int64, segmentID int64) error {
func (m *shardManagerImpl) checkIfSegmentCanBeCreated(uniquePartitionKey PartitionUniqueKey, segmentID int64) error {
// segment can be created only if the collection and partition exists.
if err := m.checkIfPartitionExists(collectionID, partitionID); err != nil {
if err := m.checkIfPartitionExists(uniquePartitionKey); err != nil {
return err
}
if m := m.partitionManagers[partitionID].GetSegmentManager(segmentID); m != nil {
if m := m.partitionManagers[uniquePartitionKey].GetSegmentManager(segmentID); m != nil {
return ErrSegmentExists
}
return nil
}
// CheckIfSegmentCanBeDropped checks if a segment can be flushed.
func (m *shardManagerImpl) CheckIfSegmentCanBeFlushed(collecionID int64, partitionID int64, segmentID int64) error {
func (m *shardManagerImpl) CheckIfSegmentCanBeFlushed(uniquePartitionKey PartitionUniqueKey, segmentID int64) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.checkIfSegmentCanBeFlushed(collecionID, partitionID, segmentID)
return m.checkIfSegmentCanBeFlushed(uniquePartitionKey, segmentID)
}
// checkIfSegmentCanBeFlushed checks if a segment can be flushed.
func (m *shardManagerImpl) checkIfSegmentCanBeFlushed(collecionID int64, partitionID int64, segmentID int64) error {
if err := m.checkIfPartitionExists(collecionID, partitionID); err != nil {
func (m *shardManagerImpl) checkIfSegmentCanBeFlushed(uniquePartitionKey PartitionUniqueKey, segmentID int64) error {
if err := m.checkIfPartitionExists(uniquePartitionKey); err != nil {
return err
}
// segment can be flushed only if the segment exists, and its state is flushed.
// pm must exists, because we have checked the partition exists.
pm := m.partitionManagers[partitionID]
pm := m.partitionManagers[uniquePartitionKey]
sm := pm.GetSegmentManager(segmentID)
if sm == nil {
return ErrSegmentNotFound
@ -86,13 +86,14 @@ func (m *shardManagerImpl) CreateSegment(msg message.ImmutableCreateSegmentMessa
m.mu.Lock()
defer m.mu.Unlock()
if err := m.checkIfSegmentCanBeCreated(msg.Header().CollectionId, msg.Header().PartitionId, msg.Header().SegmentId); err != nil {
uniquePartitionKey := PartitionUniqueKey{CollectionID: msg.Header().CollectionId, PartitionID: msg.Header().PartitionId}
if err := m.checkIfSegmentCanBeCreated(uniquePartitionKey, msg.Header().SegmentId); err != nil {
logger.Warn("segment already exists")
return
}
s := newSegmentAllocManager(m.pchannel, msg)
pm, ok := m.partitionManagers[s.GetPartitionID()]
pm, ok := m.partitionManagers[uniquePartitionKey]
if !ok {
panic("critical error: partition manager not found when a segment is created")
}
@ -108,12 +109,17 @@ func (m *shardManagerImpl) FlushSegment(msg message.ImmutableFlushMessageV2) {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.checkIfSegmentCanBeFlushed(collectionID, partitionID, segmentID); err != nil {
uniquePartitionKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
if err := m.checkIfSegmentCanBeFlushed(uniquePartitionKey, segmentID); err != nil {
logger.Warn("segment can not be flushed", zap.Error(err))
return
}
pm := m.partitionManagers[partitionID]
pm, ok := m.partitionManagers[uniquePartitionKey]
if !ok {
logger.Warn("partition not found when FlushSegment")
return
}
pm.MustRemoveFlushedSegment(segmentID)
}
@ -122,7 +128,8 @@ func (m *shardManagerImpl) AssignSegment(req *AssignSegmentRequest) (*AssignSegm
m.mu.Lock()
defer m.mu.Unlock()
pm, ok := m.partitionManagers[req.PartitionID]
uniqueKey := PartitionUniqueKey{CollectionID: req.CollectionID, PartitionID: req.PartitionID}
pm, ok := m.partitionManagers[uniqueKey]
if !ok {
return nil, ErrPartitionNotFound
}
@ -144,14 +151,14 @@ func (m *shardManagerImpl) ApplyDelete(msg message.MutableDeleteMessageV1) error
}
// WaitUntilGrowingSegmentReady waits until the growing segment is ready.
func (m *shardManagerImpl) WaitUntilGrowingSegmentReady(collectionID int64, partitonID int64) (<-chan struct{}, error) {
func (m *shardManagerImpl) WaitUntilGrowingSegmentReady(uniquePartitionKey PartitionUniqueKey) (<-chan struct{}, error) {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.checkIfPartitionExists(collectionID, partitonID); err != nil {
if err := m.checkIfPartitionExists(uniquePartitionKey); err != nil {
return nil, err
}
return m.partitionManagers[partitonID].WaitPendingGrowingSegmentReady(), nil
return m.partitionManagers[uniquePartitionKey].WaitPendingGrowingSegmentReady(), nil
}
// FlushAndFenceSegmentAllocUntil flush all segment that contains the message which timetick is less than the incoming timetick.
@ -173,7 +180,8 @@ func (m *shardManagerImpl) FlushAndFenceSegmentAllocUntil(collectionID int64, ti
// collect all partitions
for partitionID := range collectionInfo.PartitionIDs {
// Seal all segments and fence assign to the partition manager.
pm, ok := m.partitionManagers[partitionID]
uniqueKey := PartitionUniqueKey{CollectionID: collectionID, PartitionID: partitionID}
pm, ok := m.partitionManagers[uniqueKey]
if !ok {
logger.Warn("partition not found when FlushAndFenceSegmentAllocUntil", zap.Int64("partitionID", partitionID))
continue
@ -195,7 +203,7 @@ func (m *shardManagerImpl) AsyncFlushSegment(signal utils.SealSegmentSignal) {
m.mu.Lock()
defer m.mu.Unlock()
pm, ok := m.partitionManagers[signal.SegmentBelongs.PartitionID]
pm, ok := m.partitionManagers[signal.SegmentBelongs.PartitionUniqueKey()]
if !ok {
logger.Warn("partition not found when AsyncMustSeal, may be already dropped")
return

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/stats"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/recovery"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
@ -126,33 +127,35 @@ func TestShardManager(t *testing.T) {
err = m.CheckIfCollectionExists(1)
assert.NoError(t, err)
err = m.CheckIfPartitionCanBeCreated(1, 2)
err = m.CheckIfPartitionCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: common.AllPartitionsID})
assert.ErrorIs(t, err, ErrPartitionExists)
err = m.CheckIfPartitionCanBeCreated(3, 9)
err = m.CheckIfPartitionCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: 2})
assert.ErrorIs(t, err, ErrPartitionExists)
err = m.CheckIfPartitionCanBeCreated(PartitionUniqueKey{CollectionID: 3, PartitionID: 9})
assert.ErrorIs(t, err, ErrCollectionNotFound)
err = m.CheckIfPartitionCanBeCreated(1, 7)
err = m.CheckIfPartitionCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: 7})
assert.NoError(t, err)
err = m.CheckIfPartitionExists(1, 7)
err = m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 1, PartitionID: 7})
assert.ErrorIs(t, err, ErrPartitionNotFound)
err = m.CheckIfPartitionExists(1, 2)
err = m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 1, PartitionID: 2})
assert.NoError(t, err)
err = m.CheckIfSegmentCanBeCreated(1, 2, 1001)
err = m.CheckIfSegmentCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: 2}, 1001)
assert.ErrorIs(t, err, ErrSegmentExists)
err = m.CheckIfSegmentCanBeCreated(3, 2, 1001)
err = m.CheckIfSegmentCanBeCreated(PartitionUniqueKey{CollectionID: 3, PartitionID: 2}, 1001)
assert.ErrorIs(t, err, ErrCollectionNotFound)
err = m.CheckIfSegmentCanBeCreated(1, 4, 1001)
err = m.CheckIfSegmentCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: 4}, 1001)
assert.ErrorIs(t, err, ErrPartitionNotFound)
err = m.CheckIfSegmentCanBeCreated(1, 2, 1003)
err = m.CheckIfSegmentCanBeCreated(PartitionUniqueKey{CollectionID: 1, PartitionID: 2}, 1003)
assert.NoError(t, err)
err = m.CheckIfSegmentCanBeFlushed(1, 2, 1001)
err = m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 1, PartitionID: 2}, 1001)
assert.ErrorIs(t, err, ErrSegmentOnGrowing)
err = m.CheckIfSegmentCanBeFlushed(1, 2, 1003)
err = m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 1, PartitionID: 2}, 1003)
assert.ErrorIs(t, err, ErrSegmentNotFound)
err = m.CheckIfSegmentCanBeFlushed(3, 8, 1001)
err = m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 3, PartitionID: 8}, 1001)
assert.ErrorIs(t, err, ErrCollectionNotFound)
err = m.CheckIfSegmentCanBeFlushed(1, 7, 1001)
err = m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 1, PartitionID: 7}, 1001)
assert.ErrorIs(t, err, ErrPartitionNotFound)
// Test Create and Drop
@ -169,8 +172,9 @@ func TestShardManager(t *testing.T) {
IntoImmutableMessage(rmq.NewRmqID(1))
m.CreateCollection(message.MustAsImmutableCreateCollectionMessageV1(createCollectionMsg))
assert.NoError(t, m.CheckIfCollectionExists(7))
assert.NoError(t, m.CheckIfPartitionExists(7, 8))
assert.NoError(t, m.CheckIfPartitionExists(7, 9))
assert.NoError(t, m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 7, PartitionID: 8}))
assert.NoError(t, m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 7, PartitionID: 9}))
assert.NoError(t, m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 7, PartitionID: common.AllPartitionsID}))
createPartitionMsg := message.NewCreatePartitionMessageBuilderV1().
WithVChannel("v3").
@ -184,7 +188,7 @@ func TestShardManager(t *testing.T) {
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(2))
m.CreatePartition(message.MustAsImmutableCreatePartitionMessageV1(createPartitionMsg))
assert.NoError(t, m.CheckIfPartitionExists(7, 10))
assert.NoError(t, m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 7, PartitionID: 10}))
createSegmentMsg := message.NewCreateSegmentMessageBuilderV2().
WithVChannel("v3").
@ -200,8 +204,8 @@ func TestShardManager(t *testing.T) {
WithTimeTick(600).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(3))
m.partitionManagers[10].onAllocating = make(chan struct{})
ch, err := m.WaitUntilGrowingSegmentReady(7, 10)
m.partitionManagers[PartitionUniqueKey{CollectionID: 7, PartitionID: 10}].onAllocating = make(chan struct{})
ch, err := m.WaitUntilGrowingSegmentReady(PartitionUniqueKey{CollectionID: 7, PartitionID: 10})
assert.NoError(t, err)
select {
case <-time.After(10 * time.Millisecond):
@ -209,9 +213,9 @@ func TestShardManager(t *testing.T) {
t.Error("segment should not be ready")
}
m.CreateSegment(message.MustAsImmutableCreateSegmentMessageV2(createSegmentMsg))
assert.ErrorIs(t, m.CheckIfSegmentCanBeFlushed(7, 10, 1003), ErrSegmentOnGrowing)
assert.ErrorIs(t, m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 7, PartitionID: 10}, 1003), ErrSegmentOnGrowing)
<-ch
ch, err = m.WaitUntilGrowingSegmentReady(7, 10)
ch, err = m.WaitUntilGrowingSegmentReady(PartitionUniqueKey{CollectionID: 7, PartitionID: 10})
assert.NoError(t, err)
<-ch
@ -229,13 +233,14 @@ func TestShardManager(t *testing.T) {
IntoImmutableMessage(rmq.NewRmqID(4))
m.AsyncFlushSegment(utils.SealSegmentSignal{
SegmentBelongs: utils.SegmentBelongs{
CollectionID: 7,
PartitionID: 10,
SegmentID: 1003,
},
SealPolicy: policy.PolicyCapacity(),
})
m.FlushSegment(message.MustAsImmutableFlushMessageV2(flushSegmentMsg))
assert.ErrorIs(t, m.CheckIfSegmentCanBeFlushed(7, 10, 1003), ErrSegmentNotFound)
assert.ErrorIs(t, m.CheckIfSegmentCanBeFlushed(PartitionUniqueKey{CollectionID: 7, PartitionID: 10}, 1003), ErrSegmentNotFound)
dropPartitionMsg := message.NewDropPartitionMessageBuilderV1().
WithVChannel("v1").
@ -249,7 +254,7 @@ func TestShardManager(t *testing.T) {
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(7))
m.DropPartition(message.MustAsImmutableDropPartitionMessageV1(dropPartitionMsg))
assert.ErrorIs(t, m.CheckIfPartitionExists(1, 2), ErrPartitionNotFound)
assert.ErrorIs(t, m.CheckIfPartitionExists(PartitionUniqueKey{CollectionID: 1, PartitionID: 2}), ErrPartitionNotFound)
dropCollectionMsg := message.NewDropCollectionMessageBuilderV1().
WithVChannel("v1").

View File

@ -1,5 +1,12 @@
package shards
import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils"
type (
PartitionUniqueKey = utils.PartitionUniqueKey
SegmentBelongs = utils.SegmentBelongs
)
type TxnManager interface {
RecoverDone() <-chan struct{}
}

View File

@ -4,9 +4,21 @@ import (
"fmt"
"time"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
)
// PartitionUniqueKey is the unique key of a partition.
type PartitionUniqueKey struct {
CollectionID int64
PartitionID int64 // -1 means all partitions, see common.AllPartitionsID.
}
// IsAllPartitions returns true if the partition is all partitions.
func (k *PartitionUniqueKey) IsAllPartitions() bool {
return k.PartitionID == common.AllPartitionsID
}
// SegmentBelongs is the info of segment belongs to a channel.
type SegmentBelongs struct {
PChannel string
@ -16,6 +28,14 @@ type SegmentBelongs struct {
SegmentID int64
}
// PartitionUniqueKey returns the partition unique key of the segment belongs.
func (s *SegmentBelongs) PartitionUniqueKey() PartitionUniqueKey {
return PartitionUniqueKey{
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
}
}
// SegmentStats is the usage stats of a segment.
// The SegmentStats is imprecise, so it is not promised to be recoverable for performance.
type SegmentStats struct {