diff --git a/internal/master/timesync/timesync.go b/internal/master/timesync/timesync.go index 31b965507f..b26d01ced3 100644 --- a/internal/master/timesync/timesync.go +++ b/internal/master/timesync/timesync.go @@ -54,7 +54,7 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { } func (ttBarrier *softTimeTickBarrier) Start() error { - ttBarrier.closeCh = make(chan struct{}) + ttBarrier.closeCh = make(chan struct{}, 1) go func() { for { select { @@ -165,7 +165,7 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { } func (ttBarrier *hardTimeTickBarrier) Start() error { - ttBarrier.closeCh = make(chan struct{}) + ttBarrier.closeCh = make(chan struct{}, 1) go func() { // Last timestamp synchronized diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index b73cdc3fd7..96a76d4fe6 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -16,7 +16,7 @@ import ( ) const ( - reqTimeoutInterval = time.Second * 2 + reqTimeoutInterval = time.Second * 10 ) func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) { @@ -103,7 +103,7 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), - }, err + }, nil } err = cct.WaitToFinish() @@ -111,7 +111,7 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), - }, err + }, nil } return cct.result, nil @@ -196,7 +196,7 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), - }, err + }, nil } err = dct.WaitToFinish() @@ -204,7 +204,7 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), - }, err + }, nil } return dct.result, nil @@ -239,7 +239,7 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } err = hct.WaitToFinish() @@ -249,7 +249,7 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } return hct.result, nil @@ -284,7 +284,7 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } err = dct.WaitToFinish() @@ -294,7 +294,7 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } return dct.result, nil @@ -328,7 +328,7 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } err = sct.WaitToFinish() @@ -338,7 +338,7 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: err.Error(), }, - }, err + }, nil } return sct.result, nil diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2b94f48221..2fb13168a9 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -157,7 +157,7 @@ func (p *Proxy) connectMaster() error { panic(err) } log.Printf("Proxy connected to master, master_addr=%s", masterAddr) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, masterAddr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { @@ -198,10 +198,12 @@ func (p *Proxy) queryResultLoop() { // TODO: use the number of query node instead t := p.taskSch.getTaskByReqID(reqID) if t != nil { - qt := t.(*QueryTask) - log.Printf("address of query task: %p", qt) - qt.resultBuf <- queryResultBuf[reqID] - delete(queryResultBuf, reqID) + qt, ok := t.(*QueryTask) + if ok { + log.Printf("address of query task: %p", qt) + qt.resultBuf <- queryResultBuf[reqID] + delete(queryResultBuf, reqID) + } } else { log.Printf("task with reqID %v is nil", reqID) } diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index f79caf0f1a..b4bfd4c339 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -13,4 +13,5 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # ignore Minio,S3 unittes MILVUS_DIR="${SCRIPTS_DIR}/../internal/" echo $MILVUS_DIR -go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." "${MILVUS_DIR}/proxy/..." -failfast +#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." "${MILVUS_DIR}/proxy/..." -failfast +go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." -failfast