mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Unify Network for UnitTest
Signed-off-by: quicksilver <zhifeng.zhang@zilliz.com>
This commit is contained in:
parent
8ff82c2fd5
commit
859ff62bc9
@ -7,7 +7,8 @@
|
|||||||
"remoteUser": "debugger",
|
"remoteUser": "debugger",
|
||||||
"remoteEnv": {"CCACHE_COMPILERCHECK":"content", "CCACHE_MAXSIZE": "2G", "CCACHE_COMPRESS": "1", "CCACHE_COMPRESSLEVEL": "5"},
|
"remoteEnv": {"CCACHE_COMPILERCHECK":"content", "CCACHE_MAXSIZE": "2G", "CCACHE_COMPRESS": "1", "CCACHE_COMPRESSLEVEL": "5"},
|
||||||
"extensions": [
|
"extensions": [
|
||||||
"ms-vscode.cpptools",
|
"ms-vscode.cmake-tools",
|
||||||
"golang.go"
|
"ms-vscode.cpptools",
|
||||||
]
|
"golang.go"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
4
.env
4
.env
@ -3,6 +3,6 @@ ARCH=amd64
|
|||||||
UBUNTU=18.04
|
UBUNTU=18.04
|
||||||
DATE_VERSION=20201120-092740
|
DATE_VERSION=20201120-092740
|
||||||
LATEST_DATE_VERSION=latest
|
LATEST_DATE_VERSION=latest
|
||||||
PULSAR_ADDRESS=pulsar://localhost:6650
|
PULSAR_ADDRESS=pulsar://pulsar:6650
|
||||||
ETCD_ADDRESS=localhost:2379
|
ETCD_ADDRESS=etcd:2379
|
||||||
MASTER_ADDRESS=localhost:53100
|
MASTER_ADDRESS=localhost:53100
|
||||||
|
|||||||
2
.github/workflows/main.yaml
vendored
2
.github/workflows/main.yaml
vendored
@ -57,7 +57,7 @@ jobs:
|
|||||||
- name: Start Service
|
- name: Start Service
|
||||||
shell: bash
|
shell: bash
|
||||||
run: |
|
run: |
|
||||||
cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose up -d
|
cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose -p milvus-distributed up -d
|
||||||
- name: Build and UnitTest
|
- name: Build and UnitTest
|
||||||
env:
|
env:
|
||||||
CHECK_BUILDER: "1"
|
CHECK_BUILDER: "1"
|
||||||
|
|||||||
@ -34,7 +34,7 @@ func main() {
|
|||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := svr.Run(); err != nil {
|
if err := svr.Start(); err != nil {
|
||||||
log.Fatal("run server failed", zap.Error(err))
|
log.Fatal("run server failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -8,6 +8,8 @@ services:
|
|||||||
- "2379:2379"
|
- "2379:2379"
|
||||||
- "2380:2380"
|
- "2380:2380"
|
||||||
- "4001:4001"
|
- "4001:4001"
|
||||||
|
networks:
|
||||||
|
- milvus
|
||||||
|
|
||||||
pulsar:
|
pulsar:
|
||||||
image: apachepulsar/pulsar:latest
|
image: apachepulsar/pulsar:latest
|
||||||
@ -15,6 +17,11 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "6650:6650"
|
- "6650:6650"
|
||||||
- "18080:8080"
|
- "18080:8080"
|
||||||
|
networks:
|
||||||
|
- milvus
|
||||||
|
|
||||||
|
networks:
|
||||||
|
milvus:
|
||||||
|
|
||||||
# pd0:
|
# pd0:
|
||||||
# image: pingcap/pd:latest
|
# image: pingcap/pd:latest
|
||||||
|
|||||||
@ -10,7 +10,6 @@ x-ccache: &ccache
|
|||||||
services:
|
services:
|
||||||
ubuntu:
|
ubuntu:
|
||||||
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
||||||
network_mode: "host"
|
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: build/docker/env/cpu/ubuntu${UBUNTU}/Dockerfile
|
dockerfile: build/docker/env/cpu/ubuntu${UBUNTU}/Dockerfile
|
||||||
@ -29,6 +28,8 @@ services:
|
|||||||
command: &ubuntu-command >
|
command: &ubuntu-command >
|
||||||
/bin/bash -c "
|
/bin/bash -c "
|
||||||
make check-proto-product && make verifiers && make unittest"
|
make check-proto-product && make verifiers && make unittest"
|
||||||
|
networks:
|
||||||
|
- milvus
|
||||||
|
|
||||||
gdbserver:
|
gdbserver:
|
||||||
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}-${DATE_VERSION}
|
||||||
@ -52,3 +53,8 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "7776:22"
|
- "7776:22"
|
||||||
- "7777:7777"
|
- "7777:7777"
|
||||||
|
networks:
|
||||||
|
- milvus
|
||||||
|
|
||||||
|
networks:
|
||||||
|
milvus:
|
||||||
|
|||||||
@ -48,7 +48,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("insert timeout")
|
return errors.New("insert timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DmQueue.Enqueue(it)
|
return p.sched.DmQueue.Enqueue(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -96,7 +96,7 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(cct)
|
return p.sched.DdQueue.Enqueue(cct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -144,7 +144,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DqQueue.Enqueue(qt)
|
return p.sched.DqQueue.Enqueue(qt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -189,7 +189,7 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(dct)
|
return p.sched.DdQueue.Enqueue(dct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -230,7 +230,7 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(hct)
|
return p.sched.DdQueue.Enqueue(hct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -275,7 +275,7 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(dct)
|
return p.sched.DdQueue.Enqueue(dct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -319,7 +319,7 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create collection timeout")
|
return errors.New("create collection timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(sct)
|
return p.sched.DdQueue.Enqueue(sct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := fn()
|
err := fn()
|
||||||
@ -369,7 +369,7 @@ func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("create partition timeout")
|
return errors.New("create partition timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(cpt)
|
return p.sched.DdQueue.Enqueue(cpt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -415,7 +415,7 @@ func (p *Proxy) DropPartition(ctx context.Context, in *servicepb.PartitionName)
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("drop partition timeout")
|
return errors.New("drop partition timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(dpt)
|
return p.sched.DdQueue.Enqueue(dpt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -461,7 +461,7 @@ func (p *Proxy) HasPartition(ctx context.Context, in *servicepb.PartitionName) (
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("has partition timeout")
|
return errors.New("has partition timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(hpt)
|
return p.sched.DdQueue.Enqueue(hpt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -513,7 +513,7 @@ func (p *Proxy) DescribePartition(ctx context.Context, in *servicepb.PartitionNa
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("describe partion timeout")
|
return errors.New("describe partion timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(dpt)
|
return p.sched.DdQueue.Enqueue(dpt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -566,7 +566,7 @@ func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionNam
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("show partition timeout")
|
return errors.New("show partition timeout")
|
||||||
default:
|
default:
|
||||||
return p.taskSch.DdQueue.Enqueue(spt)
|
return p.sched.DdQueue.Enqueue(spt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@ -12,7 +12,6 @@ import (
|
|||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
@ -29,7 +28,7 @@ type Proxy struct {
|
|||||||
grpcServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
masterConn *grpc.ClientConn
|
masterConn *grpc.ClientConn
|
||||||
masterClient masterpb.MasterClient
|
masterClient masterpb.MasterClient
|
||||||
taskSch *TaskScheduler
|
sched *TaskScheduler
|
||||||
tick *timeTick
|
tick *timeTick
|
||||||
|
|
||||||
idAllocator *allocator.IDAllocator
|
idAllocator *allocator.IDAllocator
|
||||||
@ -38,7 +37,6 @@ type Proxy struct {
|
|||||||
|
|
||||||
manipulationMsgStream *msgstream.PulsarMsgStream
|
manipulationMsgStream *msgstream.PulsarMsgStream
|
||||||
queryMsgStream *msgstream.PulsarMsgStream
|
queryMsgStream *msgstream.PulsarMsgStream
|
||||||
queryResultMsgStream *msgstream.PulsarMsgStream
|
|
||||||
|
|
||||||
// Add callback functions at different stages
|
// Add callback functions at different stages
|
||||||
startCallbacks []func()
|
startCallbacks []func()
|
||||||
@ -62,9 +60,6 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
|
|||||||
bufSize := int64(1000)
|
bufSize := int64(1000)
|
||||||
manipulationChannels := []string{"manipulation"}
|
manipulationChannels := []string{"manipulation"}
|
||||||
queryChannels := []string{"query"}
|
queryChannels := []string{"query"}
|
||||||
queryResultChannels := []string{"QueryResult"}
|
|
||||||
queryResultSubName := "QueryResultSubject"
|
|
||||||
unmarshal := msgstream.NewUnmarshalDispatcher()
|
|
||||||
|
|
||||||
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||||
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
|
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
|
||||||
@ -74,13 +69,6 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
|
|||||||
p.queryMsgStream.SetPulsarClient(pulsarAddress)
|
p.queryMsgStream.SetPulsarClient(pulsarAddress)
|
||||||
p.queryMsgStream.CreatePulsarProducers(queryChannels)
|
p.queryMsgStream.CreatePulsarProducers(queryChannels)
|
||||||
|
|
||||||
p.queryResultMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
|
||||||
p.queryResultMsgStream.SetPulsarClient(pulsarAddress)
|
|
||||||
p.queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
|
|
||||||
queryResultSubName,
|
|
||||||
unmarshal,
|
|
||||||
bufSize)
|
|
||||||
|
|
||||||
masterAddr := Params.MasterAddress()
|
masterAddr := Params.MasterAddress()
|
||||||
idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
|
idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
|
||||||
|
|
||||||
@ -101,7 +89,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
|
|||||||
}
|
}
|
||||||
p.segAssigner = segAssigner
|
p.segAssigner = segAssigner
|
||||||
|
|
||||||
p.taskSch, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
|
p.sched, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -122,17 +110,19 @@ func (p *Proxy) startProxy() error {
|
|||||||
initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator)
|
initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator)
|
||||||
p.manipulationMsgStream.Start()
|
p.manipulationMsgStream.Start()
|
||||||
p.queryMsgStream.Start()
|
p.queryMsgStream.Start()
|
||||||
p.queryResultMsgStream.Start()
|
p.sched.Start()
|
||||||
p.taskSch.Start()
|
|
||||||
p.idAllocator.Start()
|
p.idAllocator.Start()
|
||||||
p.tsoAllocator.Start()
|
p.tsoAllocator.Start()
|
||||||
p.segAssigner.Start()
|
p.segAssigner.Start()
|
||||||
|
|
||||||
// Run callbacks
|
// Start callbacks
|
||||||
for _, cb := range p.startCallbacks {
|
for _, cb := range p.startCallbacks {
|
||||||
cb()
|
cb()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.proxyLoopWg.Add(1)
|
||||||
|
go p.grpcLoop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,65 +163,8 @@ func (p *Proxy) connectMaster() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Proxy) queryResultLoop() {
|
func (p *Proxy) Start() error {
|
||||||
defer p.proxyLoopWg.Done()
|
return p.startProxy()
|
||||||
defer p.proxyLoopCancel()
|
|
||||||
|
|
||||||
queryResultBuf := make(map[UniqueID][]*internalpb.SearchResult)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msgPack, ok := <-p.queryResultMsgStream.Chan():
|
|
||||||
if !ok {
|
|
||||||
log.Print("buf chan closed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if msgPack == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, tsMsg := range msgPack.Msgs {
|
|
||||||
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
|
|
||||||
reqID := searchResultMsg.GetReqID()
|
|
||||||
_, ok = queryResultBuf[reqID]
|
|
||||||
if !ok {
|
|
||||||
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
|
|
||||||
}
|
|
||||||
queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult)
|
|
||||||
if len(queryResultBuf[reqID]) == 4 {
|
|
||||||
// TODO: use the number of query node instead
|
|
||||||
t := p.taskSch.getTaskByReqID(reqID)
|
|
||||||
if t != nil {
|
|
||||||
qt, ok := t.(*QueryTask)
|
|
||||||
if ok {
|
|
||||||
log.Printf("address of query task: %p", qt)
|
|
||||||
qt.resultBuf <- queryResultBuf[reqID]
|
|
||||||
delete(queryResultBuf, reqID)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("task with reqID %v is nil", reqID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-p.proxyLoopCtx.Done():
|
|
||||||
log.Print("proxy server is closed ...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Proxy) startProxyLoop() {
|
|
||||||
p.proxyLoopWg.Add(2)
|
|
||||||
go p.grpcLoop()
|
|
||||||
go p.queryResultLoop()
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Proxy) Run() error {
|
|
||||||
if err := p.startProxy(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
p.startProxyLoop()
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Proxy) stopProxyLoop() {
|
func (p *Proxy) stopProxyLoop() {
|
||||||
@ -246,14 +179,12 @@ func (p *Proxy) stopProxyLoop() {
|
|||||||
|
|
||||||
p.segAssigner.Close()
|
p.segAssigner.Close()
|
||||||
|
|
||||||
p.taskSch.Close()
|
p.sched.Close()
|
||||||
|
|
||||||
p.manipulationMsgStream.Close()
|
p.manipulationMsgStream.Close()
|
||||||
|
|
||||||
p.queryMsgStream.Close()
|
p.queryMsgStream.Close()
|
||||||
|
|
||||||
p.queryResultMsgStream.Close()
|
|
||||||
|
|
||||||
p.proxyLoopWg.Wait()
|
p.proxyLoopWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -97,7 +97,7 @@ func startProxy(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change to wait until master is ready
|
// TODO: change to wait until master is ready
|
||||||
if err := svr.Run(); err != nil {
|
if err := svr.Start(); err != nil {
|
||||||
log.Fatal("run proxy failed", zap.Error(err))
|
log.Fatal("run proxy failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,6 +8,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskQueue interface {
|
type TaskQueue interface {
|
||||||
@ -140,7 +142,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
|
|||||||
queue.utLock.Lock()
|
queue.utLock.Lock()
|
||||||
defer queue.utLock.Unlock()
|
defer queue.utLock.Unlock()
|
||||||
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
|
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
|
||||||
if e.Value.(task).EndTs() >= ts {
|
if e.Value.(task).EndTs() < ts {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -148,7 +150,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
|
|||||||
queue.atLock.Lock()
|
queue.atLock.Lock()
|
||||||
defer queue.atLock.Unlock()
|
defer queue.atLock.Unlock()
|
||||||
for ats := range queue.activeTasks {
|
for ats := range queue.activeTasks {
|
||||||
if ats >= ts {
|
if ats < ts {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -357,6 +359,68 @@ func (sched *TaskScheduler) queryLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sched *TaskScheduler) queryResultLoop() {
|
||||||
|
defer sched.wg.Done()
|
||||||
|
|
||||||
|
// TODO: use config instead
|
||||||
|
pulsarAddress := "pulsar://localhost:6650"
|
||||||
|
bufSize := int64(1000)
|
||||||
|
queryResultChannels := []string{"QueryResult"}
|
||||||
|
queryResultSubName := "QueryResultSubject"
|
||||||
|
unmarshal := msgstream.NewUnmarshalDispatcher()
|
||||||
|
|
||||||
|
queryResultMsgStream := msgstream.NewPulsarMsgStream(sched.ctx, bufSize)
|
||||||
|
queryResultMsgStream.SetPulsarClient(pulsarAddress)
|
||||||
|
queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
|
||||||
|
queryResultSubName,
|
||||||
|
unmarshal,
|
||||||
|
bufSize)
|
||||||
|
|
||||||
|
queryResultMsgStream.Start()
|
||||||
|
defer queryResultMsgStream.Close()
|
||||||
|
|
||||||
|
queryResultBuf := make(map[UniqueID][]*internalpb.SearchResult)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msgPack, ok := <-queryResultMsgStream.Chan():
|
||||||
|
if !ok {
|
||||||
|
log.Print("buf chan closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if msgPack == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, tsMsg := range msgPack.Msgs {
|
||||||
|
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
|
||||||
|
reqID := searchResultMsg.GetReqID()
|
||||||
|
_, ok = queryResultBuf[reqID]
|
||||||
|
if !ok {
|
||||||
|
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
|
||||||
|
}
|
||||||
|
queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult)
|
||||||
|
if len(queryResultBuf[reqID]) == 4 {
|
||||||
|
// TODO: use the number of query node instead
|
||||||
|
t := sched.getTaskByReqID(reqID)
|
||||||
|
if t != nil {
|
||||||
|
qt, ok := t.(*QueryTask)
|
||||||
|
if ok {
|
||||||
|
log.Printf("address of query task: %p", qt)
|
||||||
|
qt.resultBuf <- queryResultBuf[reqID]
|
||||||
|
delete(queryResultBuf, reqID)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("task with reqID %v is nil", reqID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-sched.ctx.Done():
|
||||||
|
log.Print("proxy server is closed ...")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sched *TaskScheduler) Start() error {
|
func (sched *TaskScheduler) Start() error {
|
||||||
sched.wg.Add(1)
|
sched.wg.Add(1)
|
||||||
go sched.definitionLoop()
|
go sched.definitionLoop()
|
||||||
@ -367,6 +431,9 @@ func (sched *TaskScheduler) Start() error {
|
|||||||
sched.wg.Add(1)
|
sched.wg.Add(1)
|
||||||
go sched.queryLoop()
|
go sched.queryLoop()
|
||||||
|
|
||||||
|
sched.wg.Add(1)
|
||||||
|
go sched.queryResultLoop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -11,9 +11,3 @@ formatThis() {
|
|||||||
formatThis "${CorePath}/src"
|
formatThis "${CorePath}/src"
|
||||||
formatThis "${CorePath}/unittest"
|
formatThis "${CorePath}/unittest"
|
||||||
|
|
||||||
if test -z "$(git status | grep -E "*\.c|*\.h")"; then
|
|
||||||
exit 0
|
|
||||||
else
|
|
||||||
echo "Please format your code by clang-format!"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user