mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
Fix querynodev2 concurrent load logic (#26959)
Fix logic error from #26926 function `waitSegmentLoadDone` shall return error when context is done Make delegator control concurrency for each same segment Related to #26908 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ad1cc00a18
commit
2a5d574a0d
@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/lifetime"
|
"github.com/milvus-io/milvus/pkg/util/lifetime"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
@ -113,6 +114,7 @@ type shardDelegator struct {
|
|||||||
//dispatcherClient msgdispatcher.Client
|
//dispatcherClient msgdispatcher.Client
|
||||||
factory msgstream.Factory
|
factory msgstream.Factory
|
||||||
|
|
||||||
|
sf conc.Singleflight[struct{}]
|
||||||
loader segments.Loader
|
loader segments.Loader
|
||||||
tsCond *sync.Cond
|
tsCond *sync.Cond
|
||||||
latestTsafe *atomic.Uint64
|
latestTsafe *atomic.Uint64
|
||||||
|
|||||||
@ -356,7 +356,38 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||||||
|
|
||||||
req.Base.TargetID = req.GetDstNodeID()
|
req.Base.TargetID = req.GetDstNodeID()
|
||||||
log.Info("worker loads segments...")
|
log.Info("worker loads segments...")
|
||||||
err = worker.LoadSegments(ctx, req)
|
|
||||||
|
sLoad := func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
||||||
|
segmentID := req.GetInfos()[0].GetSegmentID()
|
||||||
|
nodeID := req.GetDstNodeID()
|
||||||
|
_, err, _ := sd.sf.Do(fmt.Sprintf("%d-%d", nodeID, segmentID), func() (struct{}, error) {
|
||||||
|
err := worker.LoadSegments(ctx, req)
|
||||||
|
return struct{}{}, err
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// separate infos into different load task
|
||||||
|
if len(req.GetInfos()) > 1 {
|
||||||
|
var reqs []*querypb.LoadSegmentsRequest
|
||||||
|
for _, info := range req.GetInfos() {
|
||||||
|
newReq := typeutil.Clone(req)
|
||||||
|
newReq.Infos = []*querypb.SegmentLoadInfo{info}
|
||||||
|
reqs = append(reqs, newReq)
|
||||||
|
}
|
||||||
|
|
||||||
|
group, ctx := errgroup.WithContext(ctx)
|
||||||
|
for _, req := range reqs {
|
||||||
|
req := req
|
||||||
|
group.Go(func() error {
|
||||||
|
return sLoad(ctx, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = group.Wait()
|
||||||
|
} else {
|
||||||
|
err = sLoad(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("worker failed to load segments", zap.Error(err))
|
log.Warn("worker failed to load segments", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -177,6 +177,11 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
|||||||
infos := loader.prepare(segmentType, version, segments...)
|
infos := loader.prepare(segmentType, version, segments...)
|
||||||
defer loader.unregister(infos...)
|
defer loader.unregister(infos...)
|
||||||
|
|
||||||
|
log.With(
|
||||||
|
zap.Int64s("requestSegments", lo.Map(segments, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
||||||
|
zap.Int64s("preparedSegments", lo.Map(infos, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
||||||
|
)
|
||||||
|
|
||||||
// continue to wait other task done
|
// continue to wait other task done
|
||||||
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
|
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
|
||||||
|
|
||||||
@ -412,6 +417,10 @@ func (loader *segmentLoader) freeRequest(resource LoadResource) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
|
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
|
||||||
|
log := log.Ctx(ctx).With(
|
||||||
|
zap.String("segmentType", segmentType.String()),
|
||||||
|
zap.Int64s("segmentIDs", segmentIDs),
|
||||||
|
)
|
||||||
for _, segmentID := range segmentIDs {
|
for _, segmentID := range segmentIDs {
|
||||||
if loader.manager.Segment.GetWithType(segmentID, segmentType) != nil {
|
if loader.manager.Segment.GetWithType(segmentID, segmentType) != nil {
|
||||||
continue
|
continue
|
||||||
@ -440,6 +449,11 @@ func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentTyp
|
|||||||
result.cond.L.Unlock()
|
result.cond.L.Unlock()
|
||||||
close(signal)
|
close(signal)
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
log.Warn("failed to wait segment loaded due to context done", zap.Int64("segmentID", segmentID))
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
if result.status.Load() == failure {
|
if result.status.Load() == failure {
|
||||||
log.Warn("failed to wait segment loaded", zap.Int64("segmentID", segmentID))
|
log.Warn("failed to wait segment loaded", zap.Int64("segmentID", segmentID))
|
||||||
return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded")
|
return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded")
|
||||||
|
|||||||
@ -643,6 +643,29 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
|||||||
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
|
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
|
||||||
suite.Error(err)
|
suite.Error(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
suite.Run("wait_timeout", func() {
|
||||||
|
|
||||||
|
suite.SetupTest()
|
||||||
|
|
||||||
|
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
|
||||||
|
SegmentID: suite.segmentID,
|
||||||
|
PartitionID: suite.partitionID,
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
NumOfRows: 100,
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, suite.segmentID)
|
||||||
|
suite.Error(err)
|
||||||
|
suite.True(merr.IsCanceledOrTimeout(err))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSegmentLoader(t *testing.T) {
|
func TestSegmentLoader(t *testing.T) {
|
||||||
|
|||||||
@ -516,23 +516,26 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() {
|
|||||||
suite.TestWatchDmChannelsInt64()
|
suite.TestWatchDmChannelsInt64()
|
||||||
// data
|
// data
|
||||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
|
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
|
||||||
req := &querypb.LoadSegmentsRequest{
|
infos := suite.genSegmentLoadInfos(schema)
|
||||||
Base: &commonpb.MsgBase{
|
for _, info := range infos {
|
||||||
MsgID: rand.Int63(),
|
req := &querypb.LoadSegmentsRequest{
|
||||||
TargetID: suite.node.session.ServerID,
|
Base: &commonpb.MsgBase{
|
||||||
},
|
MsgID: rand.Int63(),
|
||||||
CollectionID: suite.collectionID,
|
TargetID: suite.node.session.ServerID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
},
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
CollectionID: suite.collectionID,
|
||||||
Schema: schema,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
Infos: []*querypb.SegmentLoadInfo{info},
|
||||||
NeedTransfer: true,
|
Schema: schema,
|
||||||
}
|
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||||
|
NeedTransfer: true,
|
||||||
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
status, err := suite.node.LoadSegments(ctx, req)
|
status, err := suite.node.LoadSegments(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
||||||
@ -547,24 +550,28 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
|||||||
}
|
}
|
||||||
suite.node.manager.Collection = segments.NewCollectionManager()
|
suite.node.manager.Collection = segments.NewCollectionManager()
|
||||||
suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta)
|
suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta)
|
||||||
req := &querypb.LoadSegmentsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
MsgID: rand.Int63(),
|
|
||||||
TargetID: suite.node.session.ServerID,
|
|
||||||
},
|
|
||||||
CollectionID: suite.collectionID,
|
|
||||||
DstNodeID: suite.node.session.ServerID,
|
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
|
||||||
Schema: schema,
|
|
||||||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
|
||||||
NeedTransfer: true,
|
|
||||||
LoadMeta: loadMeta,
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadSegment
|
infos := suite.genSegmentLoadInfos(schema)
|
||||||
status, err := suite.node.LoadSegments(ctx, req)
|
for _, info := range infos {
|
||||||
suite.NoError(err)
|
req := &querypb.LoadSegmentsRequest{
|
||||||
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgID: rand.Int63(),
|
||||||
|
TargetID: suite.node.session.ServerID,
|
||||||
|
},
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
DstNodeID: suite.node.session.ServerID,
|
||||||
|
Infos: []*querypb.SegmentLoadInfo{info},
|
||||||
|
Schema: schema,
|
||||||
|
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||||
|
NeedTransfer: true,
|
||||||
|
LoadMeta: loadMeta,
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadSegment
|
||||||
|
status, err := suite.node.LoadSegments(ctx, req)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user