mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 03:13:22 +08:00
341 lines
9.3 KiB
Go
341 lines
9.3 KiB
Go
package producer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
|
|
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
// ProducerOptions is the options for creating a producer.
|
|
type ProducerOptions struct {
|
|
// The produce target
|
|
Assignment *types.PChannelInfoAssigned
|
|
}
|
|
|
|
// CreateProducer create a new producer client.
|
|
func CreateProducer(
|
|
ctx context.Context,
|
|
opts *ProducerOptions,
|
|
handler streamingpb.StreamingNodeHandlerServiceClient,
|
|
) (Producer, error) {
|
|
ctx = createProduceRequest(ctx, opts)
|
|
streamClient, err := handler.Produce(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Initialize the producer client.
|
|
produceClient := &produceGrpcClient{
|
|
streamClient,
|
|
}
|
|
|
|
// Recv the first response from server.
|
|
// It must be a create response.
|
|
resp, err := produceClient.Recv()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
createResp := resp.GetCreate()
|
|
if createResp == nil {
|
|
return nil, status.NewInvalidRequestSeq("first message arrive must be create response")
|
|
}
|
|
|
|
// Initialize the producer client finished.
|
|
cli := &producerImpl{
|
|
assignment: *opts.Assignment,
|
|
walName: createResp.GetWalName(),
|
|
logger: log.With(
|
|
zap.String("walName", createResp.GetWalName()),
|
|
zap.String("pchannel", opts.Assignment.Channel.Name),
|
|
zap.Int64("term", opts.Assignment.Channel.Term),
|
|
zap.Int64("streamingNodeID", opts.Assignment.Node.ServerID)),
|
|
lifetime: typeutil.NewLifetime(),
|
|
idAllocator: typeutil.NewIDAllocator(),
|
|
grpcStreamClient: produceClient,
|
|
pendingRequests: sync.Map{},
|
|
isFenced: atomic.NewBool(false),
|
|
available: lifetime.NewSafeChan(),
|
|
requestCh: make(chan *produceRequest),
|
|
sendExitCh: make(chan struct{}),
|
|
recvExitCh: make(chan struct{}),
|
|
finishedCh: make(chan struct{}),
|
|
}
|
|
|
|
// Start the producer client.
|
|
go cli.execute()
|
|
return cli, nil
|
|
}
|
|
|
|
// createProduceRequest creates the produce request.
|
|
func createProduceRequest(ctx context.Context, opts *ProducerOptions) context.Context {
|
|
// select server to consume.
|
|
ctx = contextutil.WithPickServerID(ctx, opts.Assignment.Node.ServerID)
|
|
// select channel to consume.
|
|
return contextutil.WithCreateProducer(ctx, &streamingpb.CreateProducerRequest{
|
|
Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel),
|
|
})
|
|
}
|
|
|
|
// Expected message sequence:
|
|
// CreateProducer
|
|
// ProduceRequest 1 -> ProduceResponse Or Error 1
|
|
// ProduceRequest 2 -> ProduceResponse Or Error 2
|
|
// ProduceRequest 3 -> ProduceResponse Or Error 3
|
|
// CloseProducer
|
|
type producerImpl struct {
|
|
assignment types.PChannelInfoAssigned
|
|
walName string
|
|
logger *log.MLogger
|
|
lifetime *typeutil.Lifetime
|
|
idAllocator *typeutil.IDAllocator
|
|
grpcStreamClient *produceGrpcClient
|
|
|
|
pendingRequests sync.Map
|
|
isFenced *atomic.Bool
|
|
available lifetime.SafeChan
|
|
requestCh chan *produceRequest
|
|
sendExitCh chan struct{}
|
|
recvExitCh chan struct{}
|
|
finishedCh chan struct{}
|
|
}
|
|
|
|
type produceRequest struct {
|
|
ctx context.Context
|
|
msg message.MutableMessage
|
|
respCh chan produceResponse
|
|
}
|
|
|
|
type produceResponse struct {
|
|
result *types.AppendResult
|
|
err error
|
|
}
|
|
|
|
// Append sends the produce message to server.
|
|
func (p *producerImpl) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
|
|
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
|
|
return nil, status.NewOnShutdownError("producer client is shutting down")
|
|
}
|
|
defer p.lifetime.Done()
|
|
|
|
// reject the message if the producer is fenced.
|
|
if p.isFenced.Load() {
|
|
return nil, status.NewChannelFenced(p.assignment.Channel.Name)
|
|
}
|
|
respCh := make(chan produceResponse, 1)
|
|
req := &produceRequest{
|
|
ctx: ctx,
|
|
msg: msg,
|
|
respCh: respCh,
|
|
}
|
|
|
|
// Send the produce message to server.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case p.requestCh <- req:
|
|
case <-p.sendExitCh:
|
|
return nil, status.NewInner("producer send arm is closed")
|
|
case <-p.recvExitCh:
|
|
return nil, status.NewInner("producer recv arm is closed")
|
|
}
|
|
|
|
// Wait for the response from server or context timeout.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case resp := <-respCh:
|
|
if resp.err != nil {
|
|
if s := status.AsStreamingError(resp.err); s.IsFenced() {
|
|
if p.isFenced.CompareAndSwap(false, true) {
|
|
p.logger.Warn("producer client is fenced", zap.Error(resp.err))
|
|
p.available.Close()
|
|
}
|
|
}
|
|
}
|
|
return resp.result, resp.err
|
|
}
|
|
}
|
|
|
|
// execute executes the producer client.
|
|
func (p *producerImpl) execute() {
|
|
defer close(p.finishedCh)
|
|
|
|
errSendCh := p.startSend()
|
|
errRecvCh := p.startRecv()
|
|
|
|
// Wait for send and recv arm to exit.
|
|
err := <-errRecvCh
|
|
<-errSendCh
|
|
|
|
// Clear all pending request.
|
|
p.pendingRequests.Range(func(key, value interface{}) bool {
|
|
value.(*produceRequest).respCh <- produceResponse{
|
|
err: status.NewUnknownError(fmt.Sprintf("request sent but no response returned, %v", err)),
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// IsAvailable returns whether the producer is available.
|
|
func (p *producerImpl) IsAvailable() bool {
|
|
return !p.available.IsClosed()
|
|
}
|
|
|
|
// IsLocal returns if the producer is local.
|
|
func (p *producerImpl) IsLocal() bool {
|
|
return false
|
|
}
|
|
|
|
// Available returns a channel that will be closed when the producer is unavailable.
|
|
func (p *producerImpl) Available() <-chan struct{} {
|
|
return p.available.CloseCh()
|
|
}
|
|
|
|
// Close close the producer client.
|
|
func (p *producerImpl) Close() {
|
|
// Wait for all message has been sent.
|
|
p.lifetime.SetState(typeutil.LifetimeStateStopped)
|
|
p.lifetime.Wait()
|
|
close(p.requestCh)
|
|
|
|
// Wait for send and recv arm to exit.
|
|
<-p.finishedCh
|
|
}
|
|
|
|
// startSend starts the send loop.
|
|
func (p *producerImpl) startSend() <-chan struct{} {
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
_ = p.sendLoop()
|
|
close(ch)
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
// startRecv starts the recv loop.
|
|
func (p *producerImpl) startRecv() <-chan error {
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- p.recvLoop()
|
|
}()
|
|
return errCh
|
|
}
|
|
|
|
// sendLoop sends the produce message to server.
|
|
func (p *producerImpl) sendLoop() (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
p.logger.Warn("send arm of stream closed by unexpected error", zap.Error(err))
|
|
} else {
|
|
p.logger.Info("send arm of stream closed")
|
|
}
|
|
if err := p.grpcStreamClient.CloseSend(); err != nil {
|
|
p.logger.Warn("failed to close send", zap.Error(err))
|
|
}
|
|
close(p.sendExitCh)
|
|
p.available.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-p.recvExitCh:
|
|
return errors.New("recv arm of stream closed")
|
|
case req, ok := <-p.requestCh:
|
|
if !ok {
|
|
// all message has been sent, sent close response.
|
|
return p.grpcStreamClient.SendClose()
|
|
}
|
|
requestID := p.idAllocator.Allocate()
|
|
// Store the request to pending request map.
|
|
p.pendingRequests.Store(requestID, req)
|
|
// Send the produce message to server.
|
|
if err := p.grpcStreamClient.SendProduceMessage(requestID, req.msg); err != nil {
|
|
// If send failed, remove the request from pending request map and return error to client.
|
|
p.notifyRequest(requestID, produceResponse{
|
|
err: err,
|
|
})
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// recvLoop receives the produce response from server.
|
|
func (p *producerImpl) recvLoop() (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
p.logger.Warn("recv arm of stream closed by unexpected error", zap.Error(err))
|
|
} else {
|
|
p.logger.Info("recv arm of stream closed")
|
|
}
|
|
close(p.recvExitCh)
|
|
}()
|
|
|
|
for {
|
|
resp, err := p.grpcStreamClient.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch resp := resp.Response.(type) {
|
|
case *streamingpb.ProduceResponse_Produce:
|
|
var result produceResponse
|
|
switch produceResp := resp.Produce.Response.(type) {
|
|
case *streamingpb.ProduceMessageResponse_Result:
|
|
msgID, err := message.UnmarshalMessageID(
|
|
p.walName,
|
|
produceResp.Result.GetId().GetId(),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
result = produceResponse{
|
|
result: &types.AppendResult{
|
|
MessageID: msgID,
|
|
TimeTick: produceResp.Result.GetTimetick(),
|
|
TxnCtx: message.NewTxnContextFromProto(produceResp.Result.GetTxnContext()),
|
|
Extra: produceResp.Result.GetExtra(),
|
|
},
|
|
}
|
|
case *streamingpb.ProduceMessageResponse_Error:
|
|
result = produceResponse{
|
|
err: status.New(produceResp.Error.Code, produceResp.Error.Cause),
|
|
}
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
p.notifyRequest(resp.Produce.RequestId, result)
|
|
case *streamingpb.ProduceResponse_Close:
|
|
// recv io.EOF after this message.
|
|
default:
|
|
// skip message here.
|
|
p.logger.Error("unknown response type", zap.Any("response", resp))
|
|
}
|
|
}
|
|
}
|
|
|
|
// notifyRequest notify the request has been returned from server.
|
|
func (p *producerImpl) notifyRequest(requestID int64, resp produceResponse) {
|
|
pendingRequest, loaded := p.pendingRequests.LoadAndDelete(requestID)
|
|
if loaded {
|
|
p.logger.Debug("recv send produce message from server", zap.Int64("requestID", requestID))
|
|
pendingRequest.(*produceRequest).respCh <- resp
|
|
}
|
|
}
|