enhance: Return FlushAllMsg in response (#46347)

issue: https://github.com/milvus-io/milvus/issues/45919

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-12-16 10:35:16 +08:00 committed by GitHub
parent 675a6b9ba0
commit 889505872a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 2546 additions and 2411 deletions

View File

@ -6,7 +6,7 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256
github.com/quasilyte/go-ruleguard/dsl v0.3.23
github.com/samber/lo v1.27.0

View File

@ -330,8 +330,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc h1:ZbtRmUjs+YIcULnIVPwdmOrLa9rpH58gnsCHyaLhqtw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece h1:s0TFMZBxADKSzIr7LW/TE3L/WgCuo7QOfzkYX92Xog0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

View File

@ -558,6 +558,65 @@ func (_c *MilvusServiceServer_AlterCollectionFunction_Call) RunAndReturn(run fun
return _c
}
// AlterCollectionSchema provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) AlterCollectionSchema(_a0 context.Context, _a1 *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for AlterCollectionSchema")
}
var r0 *milvuspb.AlterCollectionSchemaResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) *milvuspb.AlterCollectionSchemaResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.AlterCollectionSchemaResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MilvusServiceServer_AlterCollectionSchema_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionSchema'
type MilvusServiceServer_AlterCollectionSchema_Call struct {
*mock.Call
}
// AlterCollectionSchema is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.AlterCollectionSchemaRequest
func (_e *MilvusServiceServer_Expecter) AlterCollectionSchema(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_AlterCollectionSchema_Call {
return &MilvusServiceServer_AlterCollectionSchema_Call{Call: _e.mock.On("AlterCollectionSchema", _a0, _a1)}
}
func (_c *MilvusServiceServer_AlterCollectionSchema_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionSchemaRequest)) *MilvusServiceServer_AlterCollectionSchema_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionSchemaRequest))
})
return _c
}
func (_c *MilvusServiceServer_AlterCollectionSchema_Call) Return(_a0 *milvuspb.AlterCollectionSchemaResponse, _a1 error) *MilvusServiceServer_AlterCollectionSchema_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MilvusServiceServer_AlterCollectionSchema_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error)) *MilvusServiceServer_AlterCollectionSchema_Call {
_c.Call.Return(run)
return _c
}
// AlterDatabase provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) AlterDatabase(_a0 context.Context, _a1 *milvuspb.AlterDatabaseRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.18.0
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece
github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect

4
go.sum
View File

@ -799,8 +799,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc h1:ZbtRmUjs+YIcULnIVPwdmOrLa9rpH58gnsCHyaLhqtw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece h1:s0TFMZBxADKSzIr7LW/TE3L/WgCuo7QOfzkYX92Xog0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=

View File

@ -219,30 +219,34 @@ func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*da
return pchannel
})
res, err := broadcaster.Broadcast(ctx, message.NewFlushAllMessageBuilderV2().
broadcastFlushAllMsg := message.NewFlushAllMessageBuilderV2().
WithHeader(&message.FlushAllMessageHeader{}).
WithBody(&message.FlushAllMessageBody{}).
WithBroadcast(broadcastPChannels).
MustBuildBroadcast(),
)
MustBuildBroadcast()
res, err := broadcaster.Broadcast(ctx, broadcastFlushAllMsg)
if err != nil {
log.Ctx(ctx).Warn("broadcast FlushAllMessage fail", zap.Error(err))
return &datapb.FlushAllResponse{
Status: merr.Status(err),
}, nil
}
flushAllTss := make(map[string]uint64, len(res.AppendResults))
for appendChannel, result := range res.AppendResults {
flushAllMsgs := make(map[string]*commonpb.ImmutableMessage, len(res.AppendResults))
msgs := broadcastFlushAllMsg.SplitIntoMutableMessage()
for _, msg := range msgs {
appendResult := res.GetAppendResult(msg.VChannel())
// if is control channel, convert it to physical channel.
// it's ok to call ToPhysicalChannel even if it's a physical channel,
// so no need to check if it's a control channel here.
channel := funcutil.ToPhysicalChannel(appendChannel)
flushAllTss[channel] = result.TimeTick
channel := funcutil.ToPhysicalChannel(msg.VChannel())
flushAllMsgs[channel] = msg.WithTimeTick(appendResult.TimeTick).
WithLastConfirmed(appendResult.LastConfirmedMessageID).
IntoImmutableMessage(appendResult.MessageID).
IntoImmutableMessageProto()
}
log.Ctx(ctx).Info("FlushAll successfully", zap.Strings("broadcastedPChannels", broadcastPChannels), zap.Any("flushAllTss", flushAllTss))
log.Ctx(ctx).Info("FlushAll successfully", zap.Strings("broadcastedPChannels", broadcastPChannels), zap.Any("flushAllMsgs", flushAllMsgs))
return &datapb.FlushAllResponse{
Status: merr.Success(),
FlushAllTss: flushAllTss,
Status: merr.Success(),
FlushAllMsgs: flushAllMsgs,
ClusterInfo: &milvuspb.ClusterInfo{
ClusterId: Params.CommonCfg.ClusterID.GetValue(),
Cchannel: controlChannel,

View File

@ -2137,6 +2137,7 @@ func TestServer_FlushAll(t *testing.T) {
LastConfirmedMessageID: rmq.NewRmqID(1),
}
}
msg.WithBroadcastID(1)
retry.Do(context.Background(), func() error {
log.Info("broadcast message", log.FieldMessage(msg))
return registry.CallMessageAckCallback(context.Background(), msg, results)

View File

@ -564,6 +564,65 @@ func (_c *MockProxy_AlterCollectionFunction_Call) RunAndReturn(run func(context.
return _c
}
// AlterCollectionSchema provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) AlterCollectionSchema(_a0 context.Context, _a1 *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for AlterCollectionSchema")
}
var r0 *milvuspb.AlterCollectionSchemaResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) *milvuspb.AlterCollectionSchemaResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.AlterCollectionSchemaResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionSchemaRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockProxy_AlterCollectionSchema_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollectionSchema'
type MockProxy_AlterCollectionSchema_Call struct {
*mock.Call
}
// AlterCollectionSchema is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.AlterCollectionSchemaRequest
func (_e *MockProxy_Expecter) AlterCollectionSchema(_a0 interface{}, _a1 interface{}) *MockProxy_AlterCollectionSchema_Call {
return &MockProxy_AlterCollectionSchema_Call{Call: _e.mock.On("AlterCollectionSchema", _a0, _a1)}
}
func (_c *MockProxy_AlterCollectionSchema_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionSchemaRequest)) *MockProxy_AlterCollectionSchema_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionSchemaRequest))
})
return _c
}
func (_c *MockProxy_AlterCollectionSchema_Call) Return(_a0 *milvuspb.AlterCollectionSchemaResponse, _a1 error) *MockProxy_AlterCollectionSchema_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockProxy_AlterCollectionSchema_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionSchemaRequest) (*milvuspb.AlterCollectionSchemaResponse, error)) *MockProxy_AlterCollectionSchema_Call {
_c.Call.Return(run)
return _c
}
// AlterDatabase provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) AlterDatabase(_a0 context.Context, _a1 *milvuspb.AlterDatabaseRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/crypto"
@ -4172,22 +4173,22 @@ func (node *Proxy) FlushAll(ctx context.Context, request *milvuspb.FlushAllReque
method := "FlushAll"
tr := timerecord.NewTimeRecorder(method)
log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))
logger := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))
log.Debug(rpcReceived(method))
logger.Debug(rpcReceived(method))
if err := node.sched.dcQueue.Enqueue(ft); err != nil {
log.Warn(rpcFailedToEnqueue(method), zap.Error(err))
logger.Warn(rpcFailedToEnqueue(method), zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
log.Debug(rpcEnqueued(method),
logger.Debug(rpcEnqueued(method),
zap.Uint64("BeginTs", ft.BeginTs()),
zap.Uint64("EndTs", ft.EndTs()))
if err := ft.WaitToFinish(); err != nil {
log.Warn(
logger.Warn(
rpcFailedToWaitToFinish(method),
zap.Error(err),
zap.Uint64("BeginTs", ft.BeginTs()),
@ -4197,8 +4198,10 @@ func (node *Proxy) FlushAll(ctx context.Context, request *milvuspb.FlushAllReque
return resp, nil
}
log.Debug(rpcDone(method),
zap.Any("FlushAllTss", ft.result.GetFlushAllTss()))
logger.Debug(rpcDone(method))
for channel, msg := range ft.result.GetFlushAllMsgs() {
logger.Debug("flushall message", zap.String("channel", channel), log.FieldMessage(message.MilvusMessageToImmutableMessage(msg)))
}
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return ft.result, nil

View File

@ -35,9 +35,9 @@ func (t *flushAllTask) Execute(ctx context.Context) error {
return fmt.Errorf("failed to call flush all to data coordinator: %s", err.Error())
}
t.result = &milvuspb.FlushAllResponse{
Status: merr.Success(),
FlushAllTss: resp.GetFlushAllTss(),
ClusterInfo: resp.GetClusterInfo(),
Status: merr.Success(),
FlushAllMsgs: resp.GetFlushAllMsgs(),
ClusterInfo: resp.GetClusterInfo(),
}
return nil
}

View File

@ -22,7 +22,7 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12
github.com/klauspost/compress v1.18.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece
github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3
github.com/prometheus/client_golang v1.20.5

View File

@ -621,8 +621,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc h1:ZbtRmUjs+YIcULnIVPwdmOrLa9rpH58gnsCHyaLhqtw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece h1:s0TFMZBxADKSzIr7LW/TE3L/WgCuo7QOfzkYX92Xog0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=

View File

@ -199,11 +199,11 @@ message FlushAllTarget {
}
message FlushAllResponse {
common.Status status = 1;
uint64 flushTs = 2 [deprecated = true];
repeated FlushResult flush_results = 3 [deprecated = true];
map<string, uint64> flush_all_tss = 4; // pchannel -> FlushAllMsg'ts
milvus.ClusterInfo cluster_info = 5;
common.Status status = 1;
uint64 flushTs = 2 [deprecated = true];
repeated FlushResult flush_results = 3 [deprecated = true];
map<string, common.ImmutableMessage> flush_all_msgs = 4; // pchannel -> FlushAllMsg
milvus.ClusterInfo cluster_info = 5;
}
message FlushChannelsRequest {

File diff suppressed because it is too large Load Diff

View File

@ -54,7 +54,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect

View File

@ -332,8 +332,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc h1:ZbtRmUjs+YIcULnIVPwdmOrLa9rpH58gnsCHyaLhqtw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251210064308-0f971c5ee7dc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece h1:s0TFMZBxADKSzIr7LW/TE3L/WgCuo7QOfzkYX92Xog0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251215075310-deda9c0dcece/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

View File

@ -23,8 +23,8 @@ import (
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/tests/integration"
@ -41,7 +42,10 @@ type FlushAllSuite struct {
integration.MiniClusterSuite
}
func (s *FlushAllSuite) WaitForFlushAll(ctx context.Context, flushTss map[string]uint64) {
func (s *FlushAllSuite) WaitForFlushAll(ctx context.Context, flushAllMsgs map[string]*commonpb.ImmutableMessage) {
flushTss := lo.MapValues(flushAllMsgs, func(msg *commonpb.ImmutableMessage, _ string) uint64 {
return message.MilvusMessageToImmutableMessage(msg).TimeTick()
})
flushed := func() bool {
resp, err := s.Cluster.MilvusClient.GetFlushAllState(ctx, &milvuspb.GetFlushAllStateRequest{
FlushAllTss: flushTss,
@ -130,8 +134,8 @@ func (s *FlushAllSuite) TestFlushAll() {
// flush all
flushAllResp, err := c.MilvusClient.FlushAll(ctx, &milvuspb.FlushAllRequest{})
s.NoError(merr.CheckRPCCall(flushAllResp, err))
log.Info("FlushAll succeed", zap.Any("flushAllTss", flushAllResp.GetFlushAllTss()))
s.WaitForFlushAll(ctx, flushAllResp.GetFlushAllTss())
log.Info("FlushAll succeed")
s.WaitForFlushAll(ctx, flushAllResp.GetFlushAllMsgs())
// show and validate segments
for collectionName, dbName := range collectionNames {