diff --git a/Makefile b/Makefile index 61c2c19aff..0856c124cf 100644 --- a/Makefile +++ b/Makefile @@ -55,13 +55,13 @@ verifiers: cppcheck fmt lint ruleguard # Builds various components locally. build-go: - @echo "Building each component's binary to './'" + @echo "Building each component's binary to './bin'" @echo "Building query node ..." - @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null @echo "Building master ..." - @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null @echo "Building proxy ..." - @mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null build-cpp: @(env bash $(PWD)/scripts/core_build.sh) diff --git a/build/ci/jenkins/Jenkinsfile b/build/ci/jenkins/Jenkinsfile index c018fd573c..679a2f27ac 100644 --- a/build/ci/jenkins/Jenkinsfile +++ b/build/ci/jenkins/Jenkinsfile @@ -23,7 +23,7 @@ pipeline { stage ('Build and UnitTest') { agent { kubernetes { - label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-build" + label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-build" defaultContainer 'build-env' customWorkspace '/home/jenkins/agent/workspace' yamlFile "build/ci/jenkins/pod/build-env.yaml" @@ -46,7 +46,7 @@ pipeline { stage ('Publish Docker Images') { agent { kubernetes { - label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-publish" + label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-publish" defaultContainer 'publish-images' yamlFile "build/ci/jenkins/pod/docker-pod.yaml" } diff --git a/build/docker/deploy/docker-compose.yml b/build/docker/deploy/docker-compose.yml index ede0ca7284..e8b5044ebc 100644 --- a/build/docker/deploy/docker-compose.yml +++ b/build/docker/deploy/docker-compose.yml @@ -11,7 +11,6 @@ services: environment: PULSAR_ADDRESS: ${PULSAR_ADDRESS} ETCD_ADDRESS: ${ETCD_ADDRESS} - MASTER_ADDRESS: ${MASTER_ADDRESS} networks: - milvus ports: @@ -26,7 +25,6 @@ services: - ${SOURCE_REPO}/proxy:${SOURCE_TAG} environment: PULSAR_ADDRESS: ${PULSAR_ADDRESS} - ETCD_ADDRESS: ${ETCD_ADDRESS} MASTER_ADDRESS: ${MASTER_ADDRESS} ports: - "19530:19530" diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 527b5447ee..25c9e1163f 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -11,14 +11,14 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" ) -type MetaCache interface { +type Cache interface { Hit(collectionName string) bool Get(collectionName string) (*servicepb.CollectionDescription, error) Update(collectionName string) error - //Write(collectionName string, schema *servicepb.CollectionDescription) error + Remove(collectionName string) error } -var globalMetaCache MetaCache +var globalMetaCache Cache type SimpleMetaCache struct { mu sync.RWMutex @@ -30,29 +30,54 @@ type SimpleMetaCache struct { ctx context.Context } -func (smc *SimpleMetaCache) Hit(collectionName string) bool { - smc.mu.RLock() - defer smc.mu.RUnlock() - _, ok := smc.metas[collectionName] +func (metaCache *SimpleMetaCache) Hit(collectionName string) bool { + metaCache.mu.RLock() + defer metaCache.mu.RUnlock() + _, ok := metaCache.metas[collectionName] return ok } -func (smc *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) { - smc.mu.RLock() - defer smc.mu.RUnlock() - schema, ok := smc.metas[collectionName] +func (metaCache *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) { + metaCache.mu.RLock() + defer metaCache.mu.RUnlock() + schema, ok := metaCache.metas[collectionName] if !ok { return nil, errors.New("collection meta miss") } return schema, nil } -func (smc *SimpleMetaCache) Update(collectionName string) error { - reqID, err := smc.reqIDAllocator.AllocOne() +func (metaCache *SimpleMetaCache) Update(collectionName string) error { + reqID, err := metaCache.reqIDAllocator.AllocOne() if err != nil { return err } - ts, err := smc.tsoAllocator.AllocOne() + ts, err := metaCache.tsoAllocator.AllocOne() + if err != nil { + return err + } + hasCollectionReq := &internalpb.HasCollectionRequest{ + MsgType: internalpb.MsgType_kHasCollection, + ReqID: reqID, + Timestamp: ts, + ProxyID: metaCache.proxyID, + CollectionName: &servicepb.CollectionName{ + CollectionName: collectionName, + }, + } + has, err := metaCache.masterClient.HasCollection(metaCache.ctx, hasCollectionReq) + if err != nil { + return err + } + if !has.Value { + return errors.New("collection " + collectionName + " not exists") + } + + reqID, err = metaCache.reqIDAllocator.AllocOne() + if err != nil { + return err + } + ts, err = metaCache.tsoAllocator.AllocOne() if err != nil { return err } @@ -60,20 +85,32 @@ func (smc *SimpleMetaCache) Update(collectionName string) error { MsgType: internalpb.MsgType_kDescribeCollection, ReqID: reqID, Timestamp: ts, - ProxyID: smc.proxyID, + ProxyID: metaCache.proxyID, CollectionName: &servicepb.CollectionName{ CollectionName: collectionName, }, } - - resp, err := smc.masterClient.DescribeCollection(smc.ctx, req) + resp, err := metaCache.masterClient.DescribeCollection(metaCache.ctx, req) if err != nil { return err } - smc.mu.Lock() - defer smc.mu.Unlock() - smc.metas[collectionName] = resp + metaCache.mu.Lock() + defer metaCache.mu.Unlock() + metaCache.metas[collectionName] = resp + + return nil +} + +func (metaCache *SimpleMetaCache) Remove(collectionName string) error { + metaCache.mu.Lock() + defer metaCache.mu.Unlock() + + _, ok := metaCache.metas[collectionName] + if !ok { + return errors.New("cannot find collection: " + collectionName) + } + delete(metaCache.metas, collectionName) return nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 0c4beb4ac7..1283204003 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -291,7 +291,7 @@ func (dct *DropCollectionTask) Execute() error { } func (dct *DropCollectionTask) PostExecute() error { - return nil + return globalMetaCache.Remove(dct.CollectionName.CollectionName) } type QueryTask struct { @@ -329,6 +329,18 @@ func (qt *QueryTask) SetTs(ts Timestamp) { } func (qt *QueryTask) PreExecute() error { + collectionName := qt.query.CollectionName + if !globalMetaCache.Hit(collectionName) { + err := globalMetaCache.Update(collectionName) + if err != nil { + return err + } + } + _, err := globalMetaCache.Get(collectionName) + if err != nil { // err is not nil if collection not exists + return err + } + if err := ValidateCollectionName(qt.query.CollectionName); err != nil { return err } @@ -382,22 +394,37 @@ func (qt *QueryTask) PostExecute() error { log.Print("wait to finish failed, timeout!") return errors.New("wait to finish failed, timeout") case searchResults := <-qt.resultBuf: - rlen := len(searchResults) // query num - if rlen <= 0 { - qt.result = &servicepb.QueryResult{} - return nil + filterSearchResult := make([]*internalpb.SearchResult, 0) + var filterReason string + for _, partialSearchResult := range searchResults { + if partialSearchResult.Status.ErrorCode == commonpb.ErrorCode_SUCCESS { + filterSearchResult = append(filterSearchResult, partialSearchResult) + } else { + filterReason += partialSearchResult.Status.Reason + "\n" + } } - n := len(searchResults[0].Hits) // n + rlen := len(filterSearchResult) // query node num + if rlen <= 0 { + qt.result = &servicepb.QueryResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: filterReason, + }, + } + return errors.New(filterReason) + } + + n := len(filterSearchResult[0].Hits) // n if n <= 0 { qt.result = &servicepb.QueryResult{} return nil } hits := make([][]*servicepb.Hits, rlen) - for i, searchResult := range searchResults { + for i, partialSearchResult := range filterSearchResult { hits[i] = make([]*servicepb.Hits, n) - for j, bs := range searchResult.Hits { + for j, bs := range partialSearchResult.Hits { hits[i][j] = &servicepb.Hits{} err := proto.Unmarshal(bs, hits[i][j]) if err != nil { @@ -433,6 +460,17 @@ func (qt *QueryTask) PostExecute() error { } } choiceOffset := locs[choice] + // check if distance is valid, `invalid` here means very very big, + // in this process, distance here is the smallest, so the rest of distance are all invalid + if hits[choice][i].Scores[choiceOffset] >= float32(math.MaxFloat32) { + qt.result = &servicepb.QueryResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "topk in dsl greater than the row nums of collection", + }, + } + return nil + } reducedHits.IDs = append(reducedHits.IDs, hits[choice][i].IDs[choiceOffset]) if hits[choice][i].RowData != nil && len(hits[choice][i].RowData) > 0 { reducedHits.RowData = append(reducedHits.RowData, hits[choice][i].RowData[choiceOffset]) diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index 04b24bbf84..b8334c27f3 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -82,8 +82,17 @@ func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []strin func (gp *BaseTable) LoadYaml(fileName string) error { config := viper.New() _, fpath, _, _ := runtime.Caller(0) - configPath := path.Dir(fpath) + "/../../../configs/" - config.SetConfigFile(configPath + fileName) + configFile := path.Dir(fpath) + "/../../../configs/" + fileName + _, err := os.Stat(configFile) + if os.IsNotExist(err) { + runPath, err := os.Getwd() + if err != nil { + panic(err) + } + configFile = runPath + "/configs/" + fileName + } + + config.SetConfigFile(configFile) if err := config.ReadInConfig(); err != nil { panic(err) }