diff --git a/go.mod b/go.mod index dc47ab0181..0342ecf1a3 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 github.com/minio/minio-go/v7 v7.0.61 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index ca8be93560..fea632642f 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1 h1:QiECOpWEIlpaheel61axbwp7VThu2mALRI0AuPPNAow= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 h1:jgXBS8x8DTriF2pEI0RH/A+eJ8NI1f51iJcdiYEZOBg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI= github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index cb88bb8b6a..8abffb1f5a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -86,6 +86,7 @@ type collectionInfo struct { CreatedAt Timestamp DatabaseName string DatabaseID int64 + VChannelNames []string } // NewMeta creates meta from provided `kv.TxnKV` @@ -212,6 +213,7 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo { Properties: clonedProperties, DatabaseName: coll.DatabaseName, DatabaseID: coll.DatabaseID, + VChannelNames: coll.VChannelNames, } return cloneColl diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index d6042f1927..de0a8a453f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1160,6 +1160,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i CreatedAt: resp.GetCreatedTimestamp(), DatabaseName: resp.GetDbName(), DatabaseID: resp.GetDbId(), + VChannelNames: resp.GetVirtualChannelNames(), } s.meta.AddCollection(collInfo) return nil diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 7ca6f68e03..6535af3a20 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -81,6 +81,25 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F }, nil } + channelCPs := make(map[string]*msgpb.MsgPosition, 0) + coll, err := s.handler.GetCollection(ctx, req.GetCollectionID()) + if err != nil { + log.Warn("fail to get collection", zap.Error(err)) + return &datapb.FlushResponse{ + Status: merr.Status(err), + }, nil + } + if coll == nil { + return &datapb.FlushResponse{ + Status: merr.Status(merr.WrapErrCollectionNotFound(req.GetCollectionID())), + }, nil + } + // channel checkpoints must be gotten before sealSegment, make sure checkpoints is earlier than segment's endts + for _, vchannel := range coll.VChannelNames { + cp := s.meta.GetChannelCheckpoint(vchannel) + channelCPs[vchannel] = cp + } + // generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed ts, err := s.allocator.allocTimestamp(ctx) if err != nil { @@ -159,6 +178,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F TimeOfSeal: timeOfSeal.Unix(), FlushSegmentIDs: flushSegmentIDs, FlushTs: ts, + ChannelCps: channelCPs, }, nil } @@ -1558,6 +1578,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt StartPositions: req.GetStartPositions(), Properties: properties, DatabaseID: req.GetDbID(), + VChannelNames: req.GetVChannels(), } s.meta.AddCollection(collInfo) return merr.Success(), nil diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index f29e92ad7d..6e573239b7 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -789,6 +789,32 @@ func (s *ServerSuite) TestFlush_NormalCase() { s.EqualValues(segID, ids[0]) } +func (s *ServerSuite) TestFlush_CollectionNotExist() { + req := &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + } + + resp, err := s.testServer.Flush(context.TODO(), req) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_CollectionNotExists, resp.GetStatus().GetErrorCode()) + + mockHandler := NewNMockHandler(s.T()) + mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything). + Return(nil, errors.New("mock error")) + s.testServer.handler = mockHandler + + resp2, err2 := s.testServer.Flush(context.TODO(), req) + s.NoError(err2) + s.EqualValues(commonpb.ErrorCode_UnexpectedError, resp2.GetStatus().GetErrorCode()) +} + func (s *ServerSuite) TestFlush_ClosedServer() { s.TearDownTest() req := &datapb.FlushRequest{ @@ -822,6 +848,7 @@ func (s *ServerSuite) TestFlush_RollingUpgrade() { Return(merr.WrapErrServiceUnimplemented(grpcStatus.Error(codes.Unimplemented, "mock grpc unimplemented error"))) mockCluster.EXPECT().Close().Maybe() s.testServer.cluster = mockCluster + s.testServer.meta.AddCollection(&collectionInfo{ID: 0}) s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{ 1: {"channel-1"}, }).Once() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 508f440233..b8a14bc4df 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -145,6 +145,7 @@ message FlushResponse { repeated int64 flushSegmentIDs = 5; // old flushed segment int64 timeOfSeal = 6; uint64 flush_ts = 7; + map channel_cps = 8; } message FlushChannelsRequest { @@ -635,6 +636,7 @@ message AlterCollectionRequest { repeated common.KeyDataPair start_positions = 4; repeated common.KeyValuePair properties = 5; int64 dbID = 6; + repeated string vChannels = 7; } message GcConfirmRequest { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 71b9330b37..e76c621d53 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -1425,6 +1426,7 @@ func (t *flushTask) Execute(ctx context.Context) error { flushColl2Segments := make(map[string]*schemapb.LongArray) coll2SealTimes := make(map[string]int64) coll2FlushTs := make(map[string]Timestamp) + channelCps := make(map[string]*msgpb.MsgPosition) for _, collName := range t.CollectionNames { collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName) if err != nil { @@ -1448,6 +1450,7 @@ func (t *flushTask) Execute(ctx context.Context) error { flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()} coll2SealTimes[collName] = resp.GetTimeOfSeal() coll2FlushTs[collName] = resp.GetFlushTs() + channelCps = resp.GetChannelCps() } SendReplicateMessagePack(ctx, t.replicateMsgStream, t.FlushRequest) t.result = &milvuspb.FlushResponse{ @@ -1457,6 +1460,7 @@ func (t *flushTask) Execute(ctx context.Context) error { FlushCollSegIDs: flushColl2Segments, CollSealTimes: coll2SealTimes, CollFlushTs: coll2FlushTs, + ChannelCps: channelCps, } return nil } diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index 264679ee77..c1fa30d0ac 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -255,6 +255,7 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv StartPositions: colMeta.StartPositions, Properties: req.GetProperties(), DbID: db.ID, + VChannels: colMeta.VirtualChannelNames, } resp, err := b.s.dataCoord.BroadcastAlteredCollection(ctx, dcReq) diff --git a/pkg/go.mod b/pkg/go.mod index 83b0f3cce0..526788cf8f 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 - github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index 9a999f0569..6644969e73 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -486,8 +486,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1 h1:QiECOpWEIlpaheel61axbwp7VThu2mALRI0AuPPNAow= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 h1:jgXBS8x8DTriF2pEI0RH/A+eJ8NI1f51iJcdiYEZOBg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=