mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
parent
c9ac005257
commit
e2096965c7
@ -168,7 +168,7 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
|
||||
for _, s := range dists {
|
||||
distMap[s.GetID()] = struct{}{}
|
||||
}
|
||||
for sid := range leaderView.Segments {
|
||||
for sid, s := range leaderView.Segments {
|
||||
_, ok := distMap[sid]
|
||||
existInCurrentTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
|
||||
existInNextTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
|
||||
@ -183,6 +183,7 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
|
||||
ret = append(ret, &querypb.SyncAction{
|
||||
Type: querypb.SyncType_Remove,
|
||||
SegmentID: sid,
|
||||
NodeID: s.NodeID,
|
||||
})
|
||||
}
|
||||
return ret
|
||||
|
||||
@ -341,6 +341,7 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
|
||||
{
|
||||
Type: querypb.SyncType_Remove,
|
||||
SegmentID: 3,
|
||||
NodeID: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -393,6 +394,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
|
||||
{
|
||||
Type: querypb.SyncType_Remove,
|
||||
SegmentID: 3,
|
||||
NodeID: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@ -550,10 +550,6 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
||||
|
||||
log.Info("delegator start to release segments")
|
||||
// alter distribution first
|
||||
if force {
|
||||
targetNodeID = wildcardNodeID
|
||||
}
|
||||
|
||||
var sealed, growing []SegmentEntry
|
||||
convertSealed := func(segmentID int64, _ int) SegmentEntry {
|
||||
return SegmentEntry{
|
||||
|
||||
@ -967,7 +967,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||
}, nil
|
||||
}
|
||||
|
||||
var removeSegments []int64
|
||||
removeActions := make([]*querypb.SyncAction, 0)
|
||||
addSegments := make(map[int64][]*querypb.SegmentLoadInfo)
|
||||
for _, action := range req.GetActions() {
|
||||
log := log.With(zap.String("Action",
|
||||
@ -977,7 +977,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||
log.Info("sync action")
|
||||
switch action.GetType() {
|
||||
case querypb.SyncType_Remove:
|
||||
removeSegments = append(removeSegments, action.GetSegmentID())
|
||||
removeActions = append(removeActions, action)
|
||||
case querypb.SyncType_Set:
|
||||
addSegments[action.GetNodeID()] = append(addSegments[action.GetNodeID()], action.GetInfo())
|
||||
case querypb.SyncType_Amend:
|
||||
@ -1003,12 +1003,6 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
if len(removeSegments) > 0 {
|
||||
shardDelegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
||||
SegmentIDs: removeSegments,
|
||||
Scope: querypb.DataScope_Historical,
|
||||
}, true)
|
||||
}
|
||||
|
||||
for nodeID, infos := range addSegments {
|
||||
err := shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
||||
@ -1021,6 +1015,14 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||
}
|
||||
}
|
||||
|
||||
for _, action := range removeActions {
|
||||
shardDelegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
||||
NodeID: action.GetNodeID(),
|
||||
SegmentIDs: []int64{action.GetSegmentID()},
|
||||
Scope: querypb.DataScope_Historical,
|
||||
}, true)
|
||||
}
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
||||
@ -1181,6 +1181,60 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
|
||||
ctx := context.Background()
|
||||
// prepare
|
||||
// watch dmchannel and load some segments
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
|
||||
delegator, ok := suite.node.delegators.Get(suite.vchannel)
|
||||
suite.True(ok)
|
||||
sealedSegments, _, version := delegator.GetDistribution().GetCurrent()
|
||||
suite.Len(sealedSegments[0].Segments, 3)
|
||||
delegator.GetDistribution().FinishUsage(version)
|
||||
|
||||
// data
|
||||
req := &querypb.SyncDistributionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
Channel: suite.vchannel,
|
||||
}
|
||||
|
||||
releaseAction := &querypb.SyncAction{
|
||||
Type: querypb.SyncType_Remove,
|
||||
SegmentID: sealedSegments[0].Segments[0].SegmentID,
|
||||
NodeID: 100,
|
||||
}
|
||||
|
||||
// expect one segments in distribution
|
||||
req.Actions = []*querypb.SyncAction{releaseAction}
|
||||
status, err := suite.node.SyncDistribution(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
sealedSegments, _, version = delegator.GetDistribution().GetCurrent()
|
||||
suite.Len(sealedSegments[0].Segments, 3)
|
||||
delegator.GetDistribution().FinishUsage(version)
|
||||
|
||||
releaseAction = &querypb.SyncAction{
|
||||
Type: querypb.SyncType_Remove,
|
||||
SegmentID: sealedSegments[0].Segments[0].SegmentID,
|
||||
NodeID: sealedSegments[0].Segments[0].NodeID,
|
||||
}
|
||||
|
||||
// expect one segments in distribution
|
||||
req.Actions = []*querypb.SyncAction{releaseAction}
|
||||
status, err = suite.node.SyncDistribution(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
sealedSegments, _, version = delegator.GetDistribution().GetCurrent()
|
||||
suite.Len(sealedSegments[0].Segments, 2)
|
||||
delegator.GetDistribution().FinishUsage(version)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSyncDistribution_Failed() {
|
||||
ctx := context.Background()
|
||||
// prepare
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user