enhance: Add granular flush targets support for FlushAll operation (#44234)

issue: #44156
Enhance FlushAll functionality to support targeting specific collections
within databases instead of only database-level flushing.

Changes include:

- Add FlushAllTarget message in data_coord.proto for granular targeting
- Support collection-specific flush operations within databases
- Maintain backward compatibility with deprecated db_name field

This enhancement allows users to flush specific collections without
affecting other collections in the same database, providing more precise
control over data persistence operations.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-09-19 18:38:01 +08:00 committed by GitHub
parent 6d4961b978
commit 92d2fb6360
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 5574 additions and 3649 deletions

View File

@ -6,15 +6,13 @@ require (
github.com/blang/semver/v4 v4.0.0 github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1 github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c
github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e
github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0 github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.17.1 github.com/tidwall/gjson v1.17.1
go.opentelemetry.io/otel v1.28.0
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
google.golang.org/grpc v1.65.0 google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.34.2
) )
@ -89,6 +87,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
go.etcd.io/etcd/server/v3 v3.5.5 // indirect go.etcd.io/etcd/server/v3 v3.5.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect
@ -99,6 +98,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.36.0 // indirect golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.38.0 // indirect golang.org/x/net v0.38.0 // indirect
golang.org/x/sync v0.12.0 // indirect golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect golang.org/x/sys v0.31.0 // indirect

View File

@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 h1:NqAUKq44SQs+xuOImvQjBa4EfEFvixOD/qBDQDoazN4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM=
github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

4
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9 github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c
github.com/minio/minio-go/v7 v7.0.73 github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
@ -73,7 +73,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/magiconair/properties v1.8.7 github.com/magiconair/properties v1.8.7
github.com/milvus-io/milvus/client/v2 v2.6.0 github.com/milvus-io/milvus/client/v2 v2.0.0-00010101000000-000000000000
github.com/milvus-io/milvus/pkg/v2 v2.5.7 github.com/milvus-io/milvus/pkg/v2 v2.5.7
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0 github.com/remeh/sizedwaitgroup v1.0.0

4
go.sum
View File

@ -786,8 +786,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 h1:NqAUKq44SQs+xuOImvQjBa4EfEFvixOD/qBDQDoazN4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
@ -198,9 +199,49 @@ func (s *Server) flushCollection(ctx context.Context, collectionID UniqueID, flu
FlushSegmentIDs: flushSegmentIDs, FlushSegmentIDs: flushSegmentIDs,
FlushTs: flushTs, FlushTs: flushTs,
ChannelCps: channelCPs, ChannelCps: channelCPs,
DbName: coll.DatabaseName,
CollectionName: coll.Schema.GetName(),
}, nil }, nil
} }
func resolveCollectionsToFlush(ctx context.Context, s *Server, req *datapb.FlushAllRequest) ([]int64, error) {
collectionsToFlush := make([]int64, 0)
if len(req.GetFlushTargets()) > 0 {
// Use flush_targets from request
for _, target := range req.GetFlushTargets() {
collectionsToFlush = append(collectionsToFlush, target.GetCollectionIds()...)
}
} else if req.GetDbName() != "" {
// Backward compatibility: use deprecated db_name field
showColRsp, err := s.broker.ShowCollectionIDs(ctx, req.GetDbName())
if err != nil {
log.Warn("failed to ShowCollectionIDs", zap.String("db", req.GetDbName()), zap.Error(err))
return nil, err
}
for _, dbCollection := range showColRsp.GetDbCollections() {
collectionsToFlush = append(collectionsToFlush, dbCollection.GetCollectionIDs()...)
}
} else {
// Flush all databases
dbsResp, err := s.broker.ListDatabases(ctx)
if err != nil {
return nil, err
}
for _, dbName := range dbsResp.GetDbNames() {
showColRsp, err := s.broker.ShowCollectionIDs(ctx, dbName)
if err != nil {
log.Warn("failed to ShowCollectionIDs", zap.String("db", dbName), zap.Error(err))
return nil, err
}
for _, dbCollection := range showColRsp.GetDbCollections() {
collectionsToFlush = append(collectionsToFlush, dbCollection.GetCollectionIDs()...)
}
}
}
return collectionsToFlush, nil
}
func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*datapb.FlushAllResponse, error) { func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*datapb.FlushAllResponse, error) {
log := log.Ctx(ctx) log := log.Ctx(ctx)
log.Info("receive flushAll request") log.Info("receive flushAll request")
@ -214,13 +255,6 @@ func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*da
}, nil }, nil
} }
resp, err := s.broker.ShowCollectionIDs(ctx, req.GetDbName())
if err != nil {
return &datapb.FlushAllResponse{
Status: merr.Status(err),
}, nil
}
// generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed // generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed
ts, err := s.allocator.AllocTimestamp(ctx) ts, err := s.allocator.AllocTimestamp(ctx)
if err != nil { if err != nil {
@ -228,23 +262,33 @@ func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*da
return nil, err return nil, err
} }
dbCollections := resp.GetDbCollections() // resolve collections to flush
collectionsToFlush, err := resolveCollectionsToFlush(ctx, s, req)
if err != nil {
return &datapb.FlushAllResponse{
Status: merr.Status(err),
}, nil
}
var mu sync.Mutex
flushInfos := make([]*datapb.FlushResult, 0)
wg := errgroup.Group{} wg := errgroup.Group{}
// limit goroutine number to 100 // limit goroutine number to 100
wg.SetLimit(100) wg.SetLimit(100)
for _, dbCollection := range dbCollections { for _, cid := range collectionsToFlush {
for _, collectionID := range dbCollection.GetCollectionIDs() {
cid := collectionID
wg.Go(func() error { wg.Go(func() error {
_, err := s.flushCollection(ctx, cid, ts, nil) flushResult, err := s.flushCollection(ctx, cid, ts, nil)
if err != nil { if err != nil {
log.Warn("failed to flush collection", zap.Int64("collectionID", cid), zap.Error(err)) log.Warn("failed to flush collection", zap.Int64("collectionID", cid), zap.Error(err))
return err return err
} }
mu.Lock()
flushInfos = append(flushInfos, flushResult)
mu.Unlock()
return nil return nil
}) })
} }
}
err = wg.Wait() err = wg.Wait()
if err != nil { if err != nil {
return &datapb.FlushAllResponse{ return &datapb.FlushAllResponse{
@ -255,6 +299,7 @@ func (s *Server) FlushAll(ctx context.Context, req *datapb.FlushAllRequest) (*da
return &datapb.FlushAllResponse{ return &datapb.FlushAllResponse{
Status: merr.Success(), Status: merr.Success(),
FlushTs: ts, FlushTs: ts,
FlushResults: flushInfos,
}, nil }, nil
} }
@ -1514,7 +1559,10 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
}, nil }, nil
} }
resp := &milvuspb.GetFlushAllStateResponse{Status: merr.Success()} resp := &milvuspb.GetFlushAllStateResponse{
Status: merr.Success(),
FlushStates: make([]*milvuspb.FlushAllState, 0),
}
dbsRsp, err := s.broker.ListDatabases(ctx) dbsRsp, err := s.broker.ListDatabases(ctx)
if err != nil { if err != nil {
@ -1522,43 +1570,96 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
dbNames := dbsRsp.DbNames
if req.GetDbName() != "" { // Determine which databases to check
dbNames = lo.Filter(dbNames, func(dbName string, _ int) bool { var targetDbs []string
return dbName == req.GetDbName() if len(req.GetFlushTargets()) > 0 {
}) // Use flush_targets from request
if len(dbNames) == 0 { for _, target := range req.GetFlushTargets() {
if target.GetDbName() != "" {
if !lo.Contains(dbsRsp.DbNames, target.GetDbName()) {
resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(target.GetDbName()))
return resp, nil
}
targetDbs = append(targetDbs, target.GetDbName())
}
}
} else if req.GetDbName() != "" {
if !lo.Contains(dbsRsp.DbNames, req.GetDbName()) {
resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(req.GetDbName())) resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(req.GetDbName()))
return resp, nil return resp, nil
} }
// Backward compatibility: use deprecated db_name field
targetDbs = []string{req.GetDbName()}
} else {
// Check all databases
targetDbs = dbsRsp.DbNames
}
// Remove duplicates
targetDbs = lo.Uniq(targetDbs)
allFlushed := true
for _, dbName := range targetDbs {
flushState := &milvuspb.FlushAllState{
DbName: dbName,
CollectionFlushStates: make(map[string]bool),
}
// Get collections to check for this database
var targetCollections []string
if len(req.GetFlushTargets()) > 0 {
// Check if specific collections are requested for this db
for _, target := range req.GetFlushTargets() {
if target.GetDbName() == dbName && len(target.GetCollectionNames()) > 0 {
targetCollections = target.GetCollectionNames()
break
}
}
} }
for _, dbName := range dbsRsp.DbNames {
showColRsp, err := s.broker.ShowCollections(ctx, dbName) showColRsp, err := s.broker.ShowCollections(ctx, dbName)
if err != nil { if err != nil {
log.Warn("failed to ShowCollections", zap.Error(err)) log.Warn("failed to ShowCollections", zap.String("db", dbName), zap.Error(err))
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
for _, collection := range showColRsp.GetCollectionIds() { for idx, collectionID := range showColRsp.GetCollectionIds() {
describeColRsp, err := s.broker.DescribeCollectionInternal(ctx, collection) collectionName := ""
if idx < len(showColRsp.GetCollectionNames()) {
collectionName = showColRsp.GetCollectionNames()[idx]
}
// If specific collections are requested, skip others
if len(targetCollections) > 0 && !lo.Contains(targetCollections, collectionName) {
continue
}
describeColRsp, err := s.broker.DescribeCollectionInternal(ctx, collectionID)
if err != nil { if err != nil {
log.Warn("failed to DescribeCollectionInternal", zap.Error(err)) log.Warn("failed to DescribeCollectionInternal",
zap.Int64("collectionID", collectionID), zap.Error(err))
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
collectionFlushed := true
for _, channel := range describeColRsp.GetVirtualChannelNames() { for _, channel := range describeColRsp.GetVirtualChannelNames() {
channelCP := s.meta.GetChannelCheckpoint(channel) channelCP := s.meta.GetChannelCheckpoint(channel)
if channelCP == nil || channelCP.GetTimestamp() < req.GetFlushAllTs() { if channelCP == nil || channelCP.GetTimestamp() < req.GetFlushAllTs() {
resp.Flushed = false collectionFlushed = false
allFlushed = false
break
}
}
flushState.CollectionFlushStates[collectionName] = collectionFlushed
}
return resp, nil resp.FlushStates = append(resp.FlushStates, flushState)
} }
}
} resp.Flushed = allFlushed
}
resp.Flushed = true
return resp, nil return resp, nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -3758,15 +3758,12 @@ func (node *Proxy) FlushAll(ctx context.Context, request *milvuspb.FlushAllReque
log.Debug( log.Debug(
rpcDone(method), rpcDone(method),
zap.Uint64("FlushAllTs", ft.result.GetFlushTs()), zap.Uint64("FlushAllTs", ft.result.GetFlushAllTs()),
zap.Uint64("BeginTs", ft.BeginTs()), zap.Uint64("BeginTs", ft.BeginTs()),
zap.Uint64("EndTs", ft.EndTs())) zap.Uint64("EndTs", ft.EndTs()))
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.FlushAllResponse{ return ft.result, nil
Status: merr.Success(),
FlushAllTs: ft.result.GetFlushTs(),
}, nil
} }
// GetDdChannel returns the used channel for dd operations. // GetDdChannel returns the used channel for dd operations.

View File

@ -416,6 +416,9 @@ func TestProxy_FlushAll_NoDatabase(t *testing.T) {
mixcoord := &grpcmixcoordclient.Client{} mixcoord := &grpcmixcoordclient.Client{}
node.mixCoord = mixcoord node.mixCoord = mixcoord
mockey.Mock((*grpcmixcoordclient.Client).FlushAll).To(func(ctx context.Context, req *datapb.FlushAllRequest, opts ...grpc.CallOption) (*datapb.FlushAllResponse, error) {
return &datapb.FlushAllResponse{Status: successStatus}, nil
}).Build()
resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{}) resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{})
@ -455,6 +458,9 @@ func TestProxy_FlushAll_WithDefaultDatabase(t *testing.T) {
mixcoord := &grpcmixcoordclient.Client{} mixcoord := &grpcmixcoordclient.Client{}
node.mixCoord = mixcoord node.mixCoord = mixcoord
mockey.Mock((*grpcmixcoordclient.Client).FlushAll).To(func(ctx context.Context, req *datapb.FlushAllRequest, opts ...grpc.CallOption) (*datapb.FlushAllResponse, error) {
return &datapb.FlushAllResponse{Status: successStatus}, nil
}).Build()
resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{DbName: "default"}) resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{DbName: "default"})
@ -489,12 +495,15 @@ func TestProxy_FlushAll_DatabaseNotExist(t *testing.T) {
mixcoord := &grpcmixcoordclient.Client{} mixcoord := &grpcmixcoordclient.Client{}
node.mixCoord = mixcoord node.mixCoord = mixcoord
mockey.Mock((*grpcmixcoordclient.Client).FlushAll).To(func(ctx context.Context, req *datapb.FlushAllRequest, opts ...grpc.CallOption) (*datapb.FlushAllResponse, error) {
return &datapb.FlushAllResponse{Status: merr.Success()}, nil
}).Build()
resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{DbName: "default2"}) resp, err := node.FlushAll(context.Background(), &milvuspb.FlushAllRequest{DbName: "default2"})
// Assert: Verify results // Assert: Verify results
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEqual(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) assert.NotEqual(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_MetaFailed)
}) })
} }

View File

@ -22,7 +22,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -33,7 +32,7 @@ type flushAllTask struct {
*milvuspb.FlushAllRequest *milvuspb.FlushAllRequest
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
result *datapb.FlushAllResponse result *milvuspb.FlushAllResponse
chMgr channelsMgr chMgr channelsMgr
} }

View File

@ -18,80 +18,187 @@ package proxy
import ( import (
"context" "context"
"fmt"
"sync"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
func (t *flushAllTask) Execute(ctx context.Context) error { func (t *flushAllTask) Execute(ctx context.Context) error {
dbNames := make([]string, 0) flushTs := t.BeginTs()
if t.GetDbName() != "" { timeOfSeal, _ := tsoutil.ParseTS(flushTs)
dbNames = append(dbNames, t.GetDbName())
} else { // Note: for now, flush will send flush signal to wal on streamnode, then get flush segment list from datacoord
listResp, err := t.mixCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{ // so we need to expand flush collection names to make sure that flushed collection list is same as each other
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)), targets, err := t.expandFlushCollectionNames(ctx)
})
if err != nil { if err != nil {
log.Info("flush all task by streaming service failed, list databases failed", zap.Error(err))
return err return err
} }
dbNames = listResp.GetDbNames()
// send flush signal to wal on streamnode
onFlushSegmentMap, err := t.sendManualFlushAllToWal(ctx, targets, flushTs)
if err != nil {
return err
} }
flushTs := t.BeginTs() // get flush detail info from datacoord
resp, err := t.mixCoord.FlushAll(ctx, &datapb.FlushAllRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_Flush)),
DbName: t.GetDbName(),
FlushTargets: targets,
})
if err = merr.CheckRPCCall(resp, err); err != nil {
return fmt.Errorf("failed to call flush all to data coordinator: %s", err.Error())
}
dbResultsMap := lo.GroupBy(resp.GetFlushResults(), func(result *datapb.FlushResult) string {
return result.GetDbName()
})
results := make([]*milvuspb.FlushAllResult, 0)
for dbName, dbResults := range dbResultsMap {
results = append(results, &milvuspb.FlushAllResult{
DbName: dbName,
CollectionResults: lo.Map(dbResults, func(result *datapb.FlushResult, _ int) *milvuspb.FlushCollectionResult {
onFlushSegmentIDs := onFlushSegmentMap[result.GetCollectionID()]
// Remove the flushed segments from onFlushSegmentIDs
flushedSegmentSet := typeutil.NewUniqueSet(result.GetFlushSegmentIDs()...)
filteredSegments := make([]int64, 0, len(onFlushSegmentIDs))
for _, id := range onFlushSegmentIDs {
if !flushedSegmentSet.Contain(id) {
filteredSegments = append(filteredSegments, id)
}
}
onFlushSegmentIDs = filteredSegments
return &milvuspb.FlushCollectionResult{
CollectionName: result.GetCollectionName(),
SegmentIds: &schemapb.LongArray{Data: onFlushSegmentIDs},
FlushSegmentIds: &schemapb.LongArray{Data: result.GetFlushSegmentIDs()},
SealTime: timeOfSeal.Unix(),
FlushTs: flushTs,
ChannelCps: result.GetChannelCps(),
}
}),
})
}
t.result = &milvuspb.FlushAllResponse{
Status: merr.Success(),
FlushAllTs: flushTs,
FlushResults: results,
}
return nil
}
// todo: refine this by sending a single FlushAll message to wal
func (t *flushAllTask) sendManualFlushAllToWal(ctx context.Context, flushTargets []*datapb.FlushAllTarget, flushTs Timestamp) (map[int64][]int64, error) {
wg := errgroup.Group{} wg := errgroup.Group{}
// limit goroutine number to 100 // limit goroutine number to 100
wg.SetLimit(100) wg.SetLimit(100)
for _, dbName := range dbNames {
showColRsp, err := t.mixCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections)),
DbName: dbName,
})
if err := merr.CheckRPCCall(showColRsp, err); err != nil {
log.Info("flush all task by streaming service failed, show collections failed", zap.String("dbName", dbName), zap.Error(err))
return err
}
collections := showColRsp.GetCollectionNames() var mu sync.Mutex
for _, collName := range collections { results := make(map[int64][]int64)
coll := collName
for _, target := range flushTargets {
for _, coll := range target.CollectionIds {
collID := coll
wg.Go(func() error { wg.Go(func() error {
collID, err := globalMetaCache.GetCollectionID(t.ctx, t.DbName, coll)
if err != nil {
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
vchannels, err := t.chMgr.getVChannels(collID) vchannels, err := t.chMgr.getVChannels(collID)
if err != nil { if err != nil {
return err return err
} }
onFlushSegmentIDs := make([]int64, 0)
// Ask the streamingnode to flush segments. // Ask the streamingnode to flush segments.
for _, vchannel := range vchannels { for _, vchannel := range vchannels {
_, err := sendManualFlushToWAL(ctx, collID, vchannel, flushTs) segmentIDs, err := sendManualFlushToWAL(ctx, collID, vchannel, flushTs)
if err != nil { if err != nil {
return err return err
} }
onFlushSegmentIDs = append(onFlushSegmentIDs, segmentIDs...)
} }
mu.Lock()
results[collID] = onFlushSegmentIDs
mu.Unlock()
return nil return nil
}) })
} }
} }
err := wg.Wait() err := wg.Wait()
if err != nil { if err != nil {
return err return nil, err
} }
t.result = &datapb.FlushAllResponse{ return results, nil
Status: merr.Success(),
FlushTs: flushTs,
} }
return nil
func (t *flushAllTask) expandFlushCollectionNames(ctx context.Context) ([]*datapb.FlushAllTarget, error) {
// Determine which databases and collections to flush
targets := make([]*datapb.FlushAllTarget, 0)
if len(t.GetFlushTargets()) > 0 {
// Use flush_targets from request
for _, target := range t.GetFlushTargets() {
collectionIDs := make([]int64, 0)
for _, collectionName := range target.GetCollectionNames() {
collectionID, err := globalMetaCache.GetCollectionID(ctx, target.GetDbName(), collectionName)
if err != nil {
return nil, err
}
collectionIDs = append(collectionIDs, collectionID)
}
targets = append(targets, &datapb.FlushAllTarget{
DbName: target.GetDbName(),
CollectionIds: collectionIDs,
})
}
} else if t.GetDbName() != "" {
// Backward compatibility: use deprecated db_name field
targets = append(targets, &datapb.FlushAllTarget{
DbName: t.GetDbName(),
CollectionIds: []int64{},
})
} else {
// Flush all databases
listResp, err := t.mixCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)),
})
if err != nil {
log.Info("flush all task by streaming service failed, list databases failed", zap.Error(err))
return nil, err
}
for _, dbName := range listResp.GetDbNames() {
targets = append(targets, &datapb.FlushAllTarget{
DbName: dbName,
CollectionIds: []int64{},
})
}
}
// If CollectionNames is empty, it means flush all collections in this database
for _, target := range targets {
collectionNames := target.GetCollectionIds()
if len(collectionNames) == 0 {
showColRsp, err := t.mixCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections)),
DbName: target.GetDbName(),
})
if err != nil {
return nil, err
}
target.CollectionIds = showColRsp.GetCollectionIds()
}
}
return targets, nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
func (t *flushTask) Execute(ctx context.Context) error { func (t *flushTask) Execute(ctx context.Context) error {
@ -79,14 +80,14 @@ func (t *flushTask) Execute(ctx context.Context) error {
} }
// Remove the flushed segments from onFlushSegmentIDs // Remove the flushed segments from onFlushSegmentIDs
for _, segID := range resp.GetFlushSegmentIDs() { flushedSegmentSet := typeutil.NewUniqueSet(resp.GetFlushSegmentIDs()...)
for i, id := range onFlushSegmentIDs { filteredSegments := make([]int64, 0, len(onFlushSegmentIDs))
if id == segID { for _, id := range onFlushSegmentIDs {
onFlushSegmentIDs = append(onFlushSegmentIDs[:i], onFlushSegmentIDs[i+1:]...) if !flushedSegmentSet.Contain(id) {
break filteredSegments = append(filteredSegments, id)
}
} }
} }
onFlushSegmentIDs = filteredSegments
coll2Segments[collName] = &schemapb.LongArray{Data: onFlushSegmentIDs} coll2Segments[collName] = &schemapb.LongArray{Data: onFlushSegmentIDs}
flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()} flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()}

View File

@ -21,7 +21,7 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.9 github.com/klauspost/compress v1.17.9
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c
github.com/minio/minio-go/v7 v7.0.73 github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 github.com/panjf2000/ants/v2 v2.11.3
github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_golang v1.20.5

View File

@ -597,8 +597,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 h1:NqAUKq44SQs+xuOImvQjBa4EfEFvixOD/qBDQDoazN4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=

View File

@ -169,16 +169,34 @@ message FlushResult {
int64 timeOfSeal = 4; int64 timeOfSeal = 4;
uint64 flush_ts = 5; uint64 flush_ts = 5;
map<string, msg.MsgPosition> channel_cps = 6; map<string, msg.MsgPosition> channel_cps = 6;
string db_name = 7; // database name for this flush result
string collection_name = 8; // collection name for this flush result
} }
message FlushAllRequest { message FlushAllRequest {
common.MsgBase base = 1; common.MsgBase base = 1;
string dbName = 2; string dbName = 2; // Deprecated: use flush_targets instead
// List of specific databases and collections to flush
repeated FlushAllTarget flush_targets = 3;
}
// Specific collection to flush with database context
// This message allows targeting specific collections within a database for flush operations
message FlushAllTarget {
// Database name to target for flush operation
string db_name = 1;
// Collections within this database to flush
// If empty, flush all collections in this database
repeated int64 collection_ids = 3;
} }
message FlushAllResponse { message FlushAllResponse {
common.Status status = 1; common.Status status = 1;
uint64 flushTs = 2; uint64 flushTs = 2;
// Detailed flush results for each target
repeated FlushResult flush_results = 3;
} }
message FlushChannelsRequest { message FlushChannelsRequest {

File diff suppressed because it is too large Load Diff

View File

@ -139,7 +139,7 @@ go test -gcflags="all=-N -l" -race -cover -tags dynamic,test "${MILVUS_DIR}/root
function test_datacoord() function test_datacoord()
{ {
go test -gcflags="all=-N -l" -race -cover -tags dynamic,test "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -v -gcflags="all=-N -l" -race -cover -tags dynamic,test "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
} }
function test_querycoord() function test_querycoord()

View File

@ -51,7 +51,7 @@ require (
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 // indirect github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect

View File

@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2 h1:NqAUKq44SQs+xuOImvQjBa4EfEFvixOD/qBDQDoazN4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM=
github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=