shengjh 4cf6e079bc Add etcd watcher
Signed-off-by: shengjh <1572099106@qq.com>
2020-09-11 18:55:43 +08:00

59 lines
1.2 KiB
Go

package informer
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/pkg/master/common"
"github.com/czs007/suvlim/pkg/master/mock"
)
func NewPulsarClient() PulsarClient {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: common.PULSAR_URL,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
return PulsarClient{
Client: client,
}
}
type PulsarClient struct {
Client pulsar.Client
}
func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: common.PULSAR_TOPIC,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.TODO())
if err != nil {
log.Fatal(err)
}
m, _ := mock.SegmentUnMarshal(msg.Payload())
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegementID)
ssChan <- m
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
return nil
}