mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Fill nil schema so that Milvus can watch channel for those upgraded from 2.2 to 2.4 #35695 (#35694)
See also: [#35701 ](https://github.com/milvus-io/milvus/issues/35701) --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
3e1052f889
commit
f12e368a76
@ -74,6 +74,9 @@ func (ch *channelMeta) UpdateWatchInfo(info *datapb.ChannelWatchInfo) {
|
||||
zap.Any("old watch info", ch.WatchInfo),
|
||||
zap.Any("new watch info", info))
|
||||
ch.WatchInfo = proto.Clone(info).(*datapb.ChannelWatchInfo)
|
||||
if ch.Schema == nil {
|
||||
ch.Schema = info.GetSchema()
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *channelMeta) GetWatchInfo() *datapb.ChannelWatchInfo {
|
||||
@ -221,7 +224,7 @@ func (c *StateChannel) Clone() *StateChannel {
|
||||
|
||||
func (c *StateChannel) String() string {
|
||||
// schema maybe too large to print
|
||||
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v", c.Name, c.CollectionID, c.StartPositions)
|
||||
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v, Schema: %v", c.Name, c.CollectionID, c.StartPositions, c.Schema)
|
||||
}
|
||||
|
||||
func (c *StateChannel) GetName() string {
|
||||
@ -259,6 +262,12 @@ func (c *StateChannel) UpdateWatchInfo(info *datapb.ChannelWatchInfo) {
|
||||
}
|
||||
|
||||
c.Info = proto.Clone(info).(*datapb.ChannelWatchInfo)
|
||||
if c.Schema == nil {
|
||||
log.Info("Channel updating watch info for nil schema in old info",
|
||||
zap.Any("old watch info", c.Info),
|
||||
zap.Any("new watch info", info))
|
||||
c.Schema = info.GetSchema()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *StateChannel) Assign(nodeID int64) {
|
||||
|
||||
@ -700,11 +700,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
|
||||
return err
|
||||
}
|
||||
|
||||
schema := ch.GetSchema()
|
||||
if schema == nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
collInfo, err := m.h.GetCollection(ctx, ch.GetCollectionID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
schema = collInfo.Schema
|
||||
}
|
||||
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
Vchan: reduceVChanSize(vcInfo),
|
||||
StartTs: startTs,
|
||||
State: inferStateByOpType(op.Type),
|
||||
Schema: ch.GetSchema(),
|
||||
Schema: schema,
|
||||
OpID: opID,
|
||||
}
|
||||
ch.UpdateWatchInfo(info)
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
kvmock "github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
@ -707,6 +708,80 @@ func (s *ChannelManagerSuite) TestStartup() {
|
||||
s.checkAssignment(m, 2, "ch3", ToWatch)
|
||||
}
|
||||
|
||||
func (s *ChannelManagerSuite) TestStartupNilSchema() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
"ch2": 1,
|
||||
"ch3": 3,
|
||||
}
|
||||
var keys, values []string
|
||||
for channel, nodeID := range chNodes {
|
||||
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", nodeID, channel))
|
||||
info := generateWatchInfo(channel, datapb.ChannelWatchState_ToRelease)
|
||||
info.Schema = nil
|
||||
bs, err := proto.Marshal(info)
|
||||
s.Require().NoError(err)
|
||||
values = append(values, string(bs))
|
||||
}
|
||||
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
|
||||
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
|
||||
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
|
||||
s.Require().NoError(err)
|
||||
err = m.Startup(context.TODO(), nil, []int64{1, 3})
|
||||
s.Require().NoError(err)
|
||||
|
||||
for ch, node := range chNodes {
|
||||
channel, got := m.GetChannel(node, ch)
|
||||
s.Require().True(got)
|
||||
s.Nil(channel.GetSchema())
|
||||
s.Equal(ch, channel.GetName())
|
||||
log.Info("Recovered nil schema channel", zap.Any("channel", channel))
|
||||
}
|
||||
|
||||
s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(
|
||||
&collectionInfo{ID: 111, Schema: &schemapb.CollectionSchema{Name: "coll111"}},
|
||||
nil,
|
||||
)
|
||||
|
||||
err = m.DeleteNode(1)
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = m.DeleteNode(3)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.checkAssignment(m, bufferID, "ch1", Standby)
|
||||
s.checkAssignment(m, bufferID, "ch2", Standby)
|
||||
s.checkAssignment(m, bufferID, "ch3", Standby)
|
||||
|
||||
for ch := range chNodes {
|
||||
channel, got := m.GetChannel(bufferID, ch)
|
||||
s.Require().True(got)
|
||||
s.NotNil(channel.GetSchema())
|
||||
s.Equal(ch, channel.GetName())
|
||||
|
||||
s.NotNil(channel.GetWatchInfo())
|
||||
s.NotNil(channel.GetWatchInfo().Schema)
|
||||
log.Info("Recovered non-nil schema channel", zap.Any("channel", channel))
|
||||
}
|
||||
|
||||
err = m.AddNode(7)
|
||||
s.Require().NoError(err)
|
||||
s.checkAssignment(m, 7, "ch1", ToWatch)
|
||||
s.checkAssignment(m, 7, "ch2", ToWatch)
|
||||
s.checkAssignment(m, 7, "ch3", ToWatch)
|
||||
|
||||
for ch := range chNodes {
|
||||
channel, got := m.GetChannel(7, ch)
|
||||
s.Require().True(got)
|
||||
s.NotNil(channel.GetSchema())
|
||||
s.Equal(ch, channel.GetName())
|
||||
|
||||
s.NotNil(channel.GetWatchInfo())
|
||||
s.NotNil(channel.GetWatchInfo().Schema)
|
||||
log.Info("non-nil schema channel", zap.Any("channel", channel))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/kv/predicates"
|
||||
@ -38,7 +39,8 @@ func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.Chan
|
||||
Vchan: &datapb.VchannelInfo{
|
||||
ChannelName: name,
|
||||
},
|
||||
State: state,
|
||||
Schema: &schemapb.CollectionSchema{},
|
||||
State: state,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
@ -36,7 +37,10 @@ func getChannel(name string, collID int64) *StateChannel {
|
||||
return &StateChannel{
|
||||
Name: name,
|
||||
CollectionID: collID,
|
||||
Info: &datapb.ChannelWatchInfo{Vchan: &datapb.VchannelInfo{}},
|
||||
Info: &datapb.ChannelWatchInfo{
|
||||
Vchan: &datapb.VchannelInfo{ChannelName: name, CollectionID: collID},
|
||||
},
|
||||
Schema: &schemapb.CollectionSchema{Name: "coll1"},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user