mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
52fb48a33c
commit
0141156d6b
@ -243,7 +243,7 @@ func (c *ChannelManager) unwatchDroppedChannels() {
|
||||
nodeChannels := c.store.GetNodesChannels()
|
||||
for _, nodeChannel := range nodeChannels {
|
||||
for _, ch := range nodeChannel.Channels {
|
||||
if !c.h.CheckShouldDropChannel(ch.Name) {
|
||||
if !c.h.CheckShouldDropChannel(ch.Name, ch.CollectionID) {
|
||||
continue
|
||||
}
|
||||
err := c.remove(nodeChannel.NodeID, ch)
|
||||
@ -765,7 +765,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
|
||||
|
||||
reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
|
||||
|
||||
if c.isMarkedDrop(channelName) {
|
||||
if c.isMarkedDrop(channelName, ch.CollectionID) {
|
||||
if err := c.remove(originNodeID, ch); err != nil {
|
||||
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
|
||||
}
|
||||
@ -812,7 +812,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
|
||||
|
||||
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
|
||||
|
||||
if c.isMarkedDrop(channelName) {
|
||||
if c.isMarkedDrop(channelName, chToCleanUp.CollectionID) {
|
||||
if err := c.remove(nodeID, chToCleanUp); err != nil {
|
||||
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
|
||||
}
|
||||
@ -870,8 +870,8 @@ func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID)
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func (c *ChannelManager) isMarkedDrop(channelName string) bool {
|
||||
return c.h.CheckShouldDropChannel(channelName)
|
||||
func (c *ChannelManager) isMarkedDrop(channelName string, collectionID UniqueID) bool {
|
||||
return c.h.CheckShouldDropChannel(channelName, collectionID)
|
||||
}
|
||||
|
||||
func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
|
||||
|
||||
@ -18,12 +18,14 @@ package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
@ -35,7 +37,7 @@ type Handler interface {
|
||||
GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error)
|
||||
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
|
||||
GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
|
||||
CheckShouldDropChannel(channel string) bool
|
||||
CheckShouldDropChannel(channel string, collectionID UniqueID) bool
|
||||
FinishDropChannel(channel string) error
|
||||
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
|
||||
}
|
||||
@ -317,6 +319,26 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
|
||||
}
|
||||
}
|
||||
|
||||
// HasCollection returns whether the collection exist from user's perspective.
|
||||
func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID) (bool, error) {
|
||||
var hasCollection bool
|
||||
ctx2, cancel := context.WithTimeout(ctx, time.Minute*30)
|
||||
defer cancel()
|
||||
if err := retry.Do(ctx2, func() error {
|
||||
has, err := h.s.hasCollection(ctx2, collectionID)
|
||||
if err != nil {
|
||||
log.RatedInfo(60, "datacoord ServerHandler HasCollection retry failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
hasCollection = has
|
||||
return nil
|
||||
}, retry.Attempts(100000)); err != nil {
|
||||
log.Error("datacoord ServerHandler HasCollection finally failed")
|
||||
panic("datacoord ServerHandler HasCollection finally failed")
|
||||
}
|
||||
return hasCollection, nil
|
||||
}
|
||||
|
||||
// GetCollection returns collection info with specified collection id
|
||||
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
|
||||
coll := h.s.meta.GetCollection(collectionID)
|
||||
@ -333,9 +355,20 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
|
||||
}
|
||||
|
||||
// CheckShouldDropChannel returns whether specified channel is marked to be removed
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) ||
|
||||
!h.s.meta.catalog.ChannelExists(h.s.ctx, channel)
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
|
||||
if h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) {
|
||||
return true
|
||||
}
|
||||
// collectionID parse from channelName
|
||||
has, err := h.HasCollection(h.s.ctx, collectionID)
|
||||
if err != nil {
|
||||
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection", zap.Bool("shouldDropChannel", !has),
|
||||
zap.String("channel", channel))
|
||||
|
||||
return !has
|
||||
}
|
||||
|
||||
// FinishDropChannel cleans up the remove flag for channels
|
||||
|
||||
@ -844,7 +844,7 @@ func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID Unique
|
||||
}
|
||||
}
|
||||
|
||||
func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
|
||||
func (h *mockHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@ -965,6 +965,54 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasCollection communicates with RootCoord and check whether this collection exist from the user's perspective.
|
||||
func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, error) {
|
||||
resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
|
||||
commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()),
|
||||
),
|
||||
DbName: "",
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if resp == nil {
|
||||
return false, errNilResponse
|
||||
}
|
||||
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if resp.Status.ErrorCode == commonpb.ErrorCode_CollectionNotExists {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("code:%s, reason:%s", resp.Status.GetErrorCode().String(), resp.Status.GetReason())
|
||||
}
|
||||
|
||||
// hasCollectionInternal communicates with RootCoord and check whether this collection's meta exist in rootcoord.
|
||||
func (s *Server) hasCollectionInternal(ctx context.Context, collectionID int64) (bool, error) {
|
||||
resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
|
||||
commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()),
|
||||
),
|
||||
DbName: "",
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if resp == nil {
|
||||
return false, errNilResponse
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Server) reCollectSegmentStats(ctx context.Context) {
|
||||
if s.channelManager == nil {
|
||||
log.Error("null channel manager found, which should NOT happen in non-testing environment")
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metautil"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
@ -2465,8 +2466,32 @@ func TestGetQueryVChanPositions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestShouldDropChannel(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
type myRootCoord struct {
|
||||
mocks.RootCoord
|
||||
}
|
||||
myRoot := &myRootCoord{}
|
||||
myRoot.EXPECT().Init().Return(nil)
|
||||
myRoot.EXPECT().Start().Return(nil)
|
||||
myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
|
||||
Count: 1,
|
||||
}, nil)
|
||||
|
||||
myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)),
|
||||
Count: 1,
|
||||
}, nil)
|
||||
|
||||
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoord, error) {
|
||||
return myRoot, nil
|
||||
}
|
||||
|
||||
opt := SetRootCoordCreator(crt)
|
||||
svr := newTestServer(t, nil, opt)
|
||||
defer closeTestServer(t, svr)
|
||||
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
@ -2595,15 +2620,53 @@ func TestShouldDropChannel(t *testing.T) {
|
||||
assert.True(t, r)
|
||||
})
|
||||
*/
|
||||
t.Run("channel name not in kv", func(t *testing.T) {
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
|
||||
t.Run("channel name not in kv, collection not exist", func(t *testing.T) {
|
||||
//myRoot.code = commonpb.ErrorCode_CollectionNotExists
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
|
||||
CollectionID: -1,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch99", -1))
|
||||
})
|
||||
|
||||
t.Run("channel in remove flag", func(t *testing.T) {
|
||||
t.Run("channel name not in kv, collection exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0))
|
||||
})
|
||||
|
||||
t.Run("collection name in kv, collection exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0))
|
||||
})
|
||||
|
||||
t.Run("collection name in kv, collection not exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
|
||||
CollectionID: -1,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", -1))
|
||||
})
|
||||
|
||||
t.Run("channel in remove flag, collection exist", func(t *testing.T) {
|
||||
err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1")
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0))
|
||||
})
|
||||
}
|
||||
|
||||
@ -3977,14 +4040,21 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
svr := CreateServer(context.TODO(), factory, opts...)
|
||||
svr := CreateServer(context.TODO(), factory)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
|
||||
return newMockDataNodeClient(0, receiveCh)
|
||||
}
|
||||
|
||||
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
|
||||
return newMockRootCoordService(), nil
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(svr)
|
||||
}
|
||||
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
indexCoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
svr.indexCoord = indexCoord
|
||||
|
||||
@ -208,9 +208,9 @@ func TestFlowGraphManager(t *testing.T) {
|
||||
|
||||
fm.dropAll()
|
||||
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
|
||||
Params.DataNodeCfg.MemoryForceSyncEnable = true
|
||||
for _, test := range tests {
|
||||
Params.DataNodeCfg.MemoryWatermark = test.watermark
|
||||
Params.DataNodeCfg.MemoryForceSyncEnable = true
|
||||
for i, memorySize := range test.memorySizes {
|
||||
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
|
||||
vchan := &datapb.VchannelInfo{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user