diff --git a/.devcontainer.json b/.devcontainer.json index a7368ab60c..565eb008c7 100644 --- a/.devcontainer.json +++ b/.devcontainer.json @@ -2,7 +2,7 @@ "name": "Milvus Distributed Dev Container Definition", "dockerComposeFile": ["./docker-compose-vscode.yml"], "service": "ubuntu", - "initializeCommand": "scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down || true", + "initializeCommand": "scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down all -v || true && docker-compose -f docker-compose-vscode.yml pull --ignore-pull-failures ubuntu", "workspaceFolder": "/go/src/github.com/zilliztech/milvus-distributed", "shutdownAction": "stopCompose", "extensions": [ diff --git a/.gitignore b/.gitignore index 082a135a38..ef85f5678c 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ pulsar/client-cpp/build/* # vscode generated files .vscode docker-compose-vscode.yml -docker-compose-vscode.yml.tmp +docker-compose-vscode.yml.bak cmake-build-debug cmake-build-release diff --git a/internal/indexbuilder/client/client.go b/internal/indexbuilder/client/client.go index 622fde9b15..c18e02017b 100644 --- a/internal/indexbuilder/client/client.go +++ b/internal/indexbuilder/client/client.go @@ -2,6 +2,10 @@ package indexbuilderclient import ( "context" + "encoding/json" + "fmt" + "github.com/zilliztech/milvus-distributed/internal/errors" + "log" "time" "google.golang.org/grpc" @@ -54,20 +58,59 @@ func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[st if c.tryConnect() != nil { panic("BuildIndexWithoutID: failed to connect index builder") } + parseMap := func(mStr string) (map[string]string, error) { + buffer := make(map[string]interface{}) + err := json.Unmarshal([]byte(mStr), &buffer) + if err != nil { + return nil, errors.New("Unmarshal params failed") + } + ret := make(map[string]string) + for key, value := range buffer { + valueStr := fmt.Sprintf("%v", value) + ret[key] = valueStr + } + return ret, nil + } var typeParamsKV []*commonpb.KeyValuePair - for typeParam := range typeParams { - typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{ - Key: typeParam, - Value: typeParams[typeParam], - }) + for key := range typeParams { + if key == "params" { + mapParams, err := parseMap(typeParams[key]) + if err != nil { + log.Println("parse params error: ", err) + } + for pk, pv := range mapParams { + typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{ + Key: pk, + Value: pv, + }) + } + } else { + typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{ + Key: key, + Value: typeParams[key], + }) + } } var indexParamsKV []*commonpb.KeyValuePair - for indexParam := range indexParams { - indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ - Key: indexParam, - Value: indexParams[indexParam], - }) + for key := range indexParams { + if key == "params" { + mapParams, err := parseMap(indexParams[key]) + if err != nil { + log.Println("parse params error: ", err) + } + for pk, pv := range mapParams { + indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ + Key: pk, + Value: pv, + }) + } + } else { + indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ + Key: key, + Value: indexParams[key], + }) + } } ctx := context.TODO() diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 847ef6b29a..28e16ec7ec 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -126,7 +126,7 @@ func TestSegmentManager_AssignSegment(t *testing.T) { } } - time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) + time.Sleep(time.Duration(Params.SegIDAssignExpiration)) timestamp, err := globalTsoAllocator() assert.Nil(t, err) err = mt.UpdateSegment(&pb.SegmentMeta{ @@ -156,122 +156,3 @@ func TestSegmentManager_AssignSegment(t *testing.T) { assert.Nil(t, err) assert.NotEqualValues(t, 0, segMeta.CloseTime) } - -func TestSegmentManager_SycnWritenode(t *testing.T) { - ctx, cancelFunc := context.WithCancel(context.TODO()) - defer cancelFunc() - - Init() - Params.TopicNum = 5 - Params.QueryNodeNum = 3 - Params.SegmentSize = 536870912 / 1024 / 1024 - Params.SegmentSizeFactor = 0.75 - Params.DefaultRecordSize = 1024 - Params.MinSegIDAssignCnt = 1048576 / 1024 - Params.SegIDAssignExpiration = 2000 - etcdAddress := Params.EtcdAddress - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) - assert.Nil(t, err) - rootPath := "/test/root" - _, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix()) - assert.Nil(t, err) - - kvBase := etcdkv.NewEtcdKV(cli, rootPath) - defer kvBase.Close() - mt, err := NewMetaTable(kvBase) - assert.Nil(t, err) - - collName := "segmgr_test_coll" - var collID int64 = 1001 - partitionTag := "test_part" - schema := &schemapb.CollectionSchema{ - Name: collName, - Fields: []*schemapb.FieldSchema{ - {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32}, - {FieldID: 2, Name: "f2", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{ - {Key: "dim", Value: "128"}, - }}, - }, - } - err = mt.AddCollection(&pb.CollectionMeta{ - ID: collID, - Schema: schema, - CreateTime: 0, - SegmentIDs: []UniqueID{}, - PartitionTags: []string{}, - }) - assert.Nil(t, err) - err = mt.AddPartition(collID, partitionTag) - assert.Nil(t, err) - - var cnt int64 - globalIDAllocator := func() (UniqueID, error) { - val := atomic.AddInt64(&cnt, 1) - return val, nil - } - globalTsoAllocator := func() (Timestamp, error) { - val := atomic.AddInt64(&cnt, 1) - phy := time.Now().UnixNano() / int64(time.Millisecond) - ts := tsoutil.ComposeTS(phy, val) - return ts, nil - } - syncWriteChan := make(chan *msgstream.TimeTickMsg) - syncProxyChan := make(chan *msgstream.TimeTickMsg) - - segAssigner := NewSegmentAssigner(ctx, mt, globalTsoAllocator, syncProxyChan) - mockScheduler := &MockFlushScheduler{} - segManager, err := NewSegmentManager(ctx, mt, globalIDAllocator, globalTsoAllocator, syncWriteChan, mockScheduler, segAssigner) - assert.Nil(t, err) - - segManager.Start() - defer segManager.Close() - sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) - assert.Nil(t, err) - maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord)) - - req := []*internalpb.SegIDRequest{ - {Count: maxCount, ChannelID: 1, CollName: collName, PartitionTag: partitionTag}, - {Count: maxCount, ChannelID: 2, CollName: collName, PartitionTag: partitionTag}, - {Count: maxCount, ChannelID: 3, CollName: collName, PartitionTag: partitionTag}, - } - assignSegment, err := segManager.AssignSegment(req) - assert.Nil(t, err) - timestamp, err := globalTsoAllocator() - assert.Nil(t, err) - for i := 0; i < len(assignSegment); i++ { - assert.EqualValues(t, maxCount, assignSegment[i].Count) - assert.EqualValues(t, i+1, assignSegment[i].ChannelID) - - err = mt.UpdateSegment(&pb.SegmentMeta{ - SegmentID: assignSegment[i].SegID, - CollectionID: collID, - PartitionTag: partitionTag, - ChannelStart: 0, - ChannelEnd: 1, - CloseTime: timestamp, - NumRows: int64(maxCount), - MemSize: 500000, - }) - assert.Nil(t, err) - } - - time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) - - timestamp, err = globalTsoAllocator() - assert.Nil(t, err) - tsMsg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: timestamp, EndTimestamp: timestamp, HashValues: []uint32{}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - MsgType: internalpb.MsgType_kTimeTick, - PeerID: 1, - Timestamp: timestamp, - }, - } - syncWriteChan <- tsMsg - time.Sleep(300 * time.Millisecond) - - status := segManager.collStatus[collID] - assert.Empty(t, status.segments) -} diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go index 4a45e7fb8a..32b276bcf5 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_index_service.go @@ -100,7 +100,6 @@ func (lis *loadIndexService) start() { continue } // 1. use msg's index paths to get index bytes - fmt.Println("start load index") var indexBuffer [][]byte var err error fn := func() error { @@ -139,13 +138,6 @@ func (lis *loadIndexService) start() { } } -func (lis *loadIndexService) close() { - if lis.loadIndexMsgStream != nil { - lis.loadIndexMsgStream.Close() - } - lis.cancel() -} - func (lis *loadIndexService) printIndexParams(index []*commonpb.KeyValuePair) { fmt.Println("=================================================") for i := 0; i < len(index); i++ { diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index 852d976366..b214b40824 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -22,29 +22,26 @@ import ( "github.com/zilliztech/milvus-distributed/internal/querynode/client" ) -func TestLoadIndexService_FloatVector(t *testing.T) { +func TestLoadIndexService(t *testing.T) { node := newQueryNode() collectionID := rand.Int63n(1000000) segmentID := rand.Int63n(1000000) initTestMeta(t, node, "collection0", collectionID, segmentID) // loadIndexService and statsService - suffix := "-test-search" + strconv.FormatInt(rand.Int63n(1000000), 10) oldSearchChannelNames := Params.SearchChannelNames - newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix) + var newSearchChannelNames []string + for _, channel := range oldSearchChannelNames { + newSearchChannelNames = append(newSearchChannelNames, channel+"new") + } Params.SearchChannelNames = newSearchChannelNames oldSearchResultChannelNames := Params.SearchChannelNames - newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix) + var newSearchResultChannelNames []string + for _, channel := range oldSearchResultChannelNames { + newSearchResultChannelNames = append(newSearchResultChannelNames, channel+"new") + } Params.SearchResultChannelNames = newSearchResultChannelNames - - oldLoadIndexChannelNames := Params.LoadIndexChannelNames - newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix) - Params.LoadIndexChannelNames = newLoadIndexChannelNames - - oldStatsChannelName := Params.StatsChannelName - newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix) - Params.StatsChannelName = newStatsChannelNames[0] go node.Start() //generate insert data @@ -331,319 +328,9 @@ func TestLoadIndexService_FloatVector(t *testing.T) { } Params.SearchChannelNames = oldSearchChannelNames Params.SearchResultChannelNames = oldSearchResultChannelNames - Params.LoadIndexChannelNames = oldLoadIndexChannelNames - Params.StatsChannelName = oldStatsChannelName fmt.Println("loadIndex floatVector test Done!") defer assert.Equal(t, findFiledStats, true) <-node.queryNodeLoopCtx.Done() node.Close() } - -func TestLoadIndexService_BinaryVector(t *testing.T) { - node := newQueryNode() - collectionID := rand.Int63n(1000000) - segmentID := rand.Int63n(1000000) - initTestMeta(t, node, "collection0", collectionID, segmentID, true) - - // loadIndexService and statsService - suffix := "-test-search-binary" + strconv.FormatInt(rand.Int63n(1000000), 10) - oldSearchChannelNames := Params.SearchChannelNames - newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix) - Params.SearchChannelNames = newSearchChannelNames - - oldSearchResultChannelNames := Params.SearchChannelNames - newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix) - Params.SearchResultChannelNames = newSearchResultChannelNames - - oldLoadIndexChannelNames := Params.LoadIndexChannelNames - newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix) - Params.LoadIndexChannelNames = newLoadIndexChannelNames - - oldStatsChannelName := Params.StatsChannelName - newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix) - Params.StatsChannelName = newStatsChannelNames[0] - go node.Start() - - const msgLength = 1000 - const receiveBufSize = 1024 - const DIM = 128 - - // generator index data - var indexRowData []byte - for n := 0; n < msgLength; n++ { - for i := 0; i < DIM/8; i++ { - indexRowData = append(indexRowData, byte(rand.Intn(8))) - } - } - - //generator insert data - var insertRowBlob []*commonpb.Blob - var timestamps []uint64 - var rowIDs []int64 - var hashValues []uint32 - offset := 0 - for n := 0; n < msgLength; n++ { - rowData := make([]byte, 0) - rowData = append(rowData, indexRowData[offset:offset+(DIM/8)]...) - offset += DIM / 8 - age := make([]byte, 4) - binary.LittleEndian.PutUint32(age, 1) - rowData = append(rowData, age...) - blob := &commonpb.Blob{ - Value: rowData, - } - insertRowBlob = append(insertRowBlob, blob) - timestamps = append(timestamps, uint64(n)) - rowIDs = append(rowIDs, int64(n)) - hashValues = append(hashValues, uint32(n)) - } - - var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: hashValues, - }, - InsertRequest: internalpb.InsertRequest{ - MsgType: internalpb.MsgType_kInsert, - ReqID: 0, - CollectionName: "collection0", - PartitionTag: "default", - SegmentID: segmentID, - ChannelID: int64(0), - ProxyID: int64(0), - Timestamps: timestamps, - RowIDs: rowIDs, - RowData: insertRowBlob, - }, - } - insertMsgPack := msgstream.MsgPack{ - BeginTs: 0, - EndTs: math.MaxUint64, - Msgs: []msgstream.TsMsg{insertMsg}, - } - - // generate timeTick - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - MsgType: internalpb.MsgType_kTimeTick, - PeerID: UniqueID(0), - Timestamp: math.MaxUint64, - }, - } - timeTickMsgPack := &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{timeTickMsg}, - } - - // pulsar produce - insertChannels := Params.InsertChannelNames - ddChannels := Params.DDChannelNames - - insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize) - insertStream.SetPulsarClient(Params.PulsarAddress) - insertStream.CreatePulsarProducers(insertChannels) - ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize) - ddStream.SetPulsarClient(Params.PulsarAddress) - ddStream.CreatePulsarProducers(ddChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - - err := insertMsgStream.Produce(&insertMsgPack) - assert.NoError(t, err) - err = insertMsgStream.Broadcast(timeTickMsgPack) - assert.NoError(t, err) - err = ddMsgStream.Broadcast(timeTickMsgPack) - assert.NoError(t, err) - - //generate search data and send search msg - searchRowData := indexRowData[42*(DIM/8) : 43*(DIM/8)] - dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"JACCARD\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }" - placeholderValue := servicepb.PlaceholderValue{ - Tag: "$0", - Type: servicepb.PlaceholderType_VECTOR_BINARY, - Values: [][]byte{searchRowData}, - } - placeholderGroup := servicepb.PlaceholderGroup{ - Placeholders: []*servicepb.PlaceholderValue{&placeholderValue}, - } - placeGroupByte, err := proto.Marshal(&placeholderGroup) - if err != nil { - log.Print("marshal placeholderGroup failed") - } - query := servicepb.Query{ - CollectionName: "collection0", - PartitionTags: []string{"default"}, - Dsl: dslString, - PlaceholderGroup: placeGroupByte, - } - queryByte, err := proto.Marshal(&query) - if err != nil { - log.Print("marshal query failed") - } - blob := commonpb.Blob{ - Value: queryByte, - } - fn := func(n int64) *msgstream.MsgPack { - searchMsg := &msgstream.SearchMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - SearchRequest: internalpb.SearchRequest{ - MsgType: internalpb.MsgType_kSearch, - ReqID: n, - ProxyID: int64(1), - Timestamp: uint64(msgLength), - ResultChannelID: int64(0), - Query: &blob, - }, - } - return &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{searchMsg}, - } - } - searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize) - searchStream.SetPulsarClient(Params.PulsarAddress) - searchStream.CreatePulsarProducers(newSearchChannelNames) - searchStream.Start() - err = searchStream.Produce(fn(1)) - assert.NoError(t, err) - - //get search result - searchResultStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize) - searchResultStream.SetPulsarClient(Params.PulsarAddress) - unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() - searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize) - searchResultStream.Start() - searchResult := searchResultStream.Consume() - assert.NotNil(t, searchResult) - unMarshaledHit := servicepb.Hits{} - err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit) - assert.Nil(t, err) - - // gen load index message pack - indexParams := make(map[string]string) - indexParams["index_type"] = "BIN_IVF_FLAT" - indexParams["index_mode"] = "cpu" - indexParams["dim"] = "128" - indexParams["k"] = "10" - indexParams["nlist"] = "100" - indexParams["nprobe"] = "10" - indexParams["m"] = "4" - indexParams["nbits"] = "8" - indexParams["metric_type"] = "JACCARD" - indexParams["SLICE_SIZE"] = "4" - - var indexParamsKV []*commonpb.KeyValuePair - for key, value := range indexParams { - indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - - // generator index - typeParams := make(map[string]string) - typeParams["dim"] = "128" - index, err := indexbuilder.NewCIndex(typeParams, indexParams) - assert.Nil(t, err) - err = index.BuildBinaryVecIndexWithoutIds(indexRowData) - assert.Equal(t, err, nil) - - option := &minioKV.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - BucketName: Params.MinioBucketName, - CreateBucket: true, - } - - minioKV, err := minioKV.NewMinIOKV(node.queryNodeLoopCtx, option) - assert.Equal(t, err, nil) - //save index to minio - binarySet, err := index.Serialize() - assert.Equal(t, err, nil) - indexPaths := make([]string, 0) - for _, index := range binarySet { - path := strconv.Itoa(int(segmentID)) + "/" + index.Key - indexPaths = append(indexPaths, path) - minioKV.Save(path, string(index.Value)) - } - - //test index search result - indexResult, err := index.QueryOnBinaryVecIndexWithParam(searchRowData, indexParams) - assert.Equal(t, err, nil) - - // create loadIndexClient - fieldID := UniqueID(100) - loadIndexChannelNames := Params.LoadIndexChannelNames - client := client.NewLoadIndexClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames) - client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams) - - // init message stream consumer and do checks - statsMs := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize) - statsMs.SetPulsarClient(Params.PulsarAddress) - statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, msgstream.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize) - statsMs.Start() - - findFiledStats := false - for { - receiveMsg := msgstream.MsgStream(statsMs).Consume() - assert.NotNil(t, receiveMsg) - assert.NotEqual(t, len(receiveMsg.Msgs), 0) - - for _, msg := range receiveMsg.Msgs { - statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg) - if statsMsg.FieldStats == nil || len(statsMsg.FieldStats) == 0 { - continue - } - findFiledStats = true - assert.Equal(t, ok, true) - assert.Equal(t, len(statsMsg.FieldStats), 1) - fieldStats0 := statsMsg.FieldStats[0] - assert.Equal(t, fieldStats0.FieldID, fieldID) - assert.Equal(t, fieldStats0.CollectionID, collectionID) - assert.Equal(t, len(fieldStats0.IndexStats), 1) - indexStats0 := fieldStats0.IndexStats[0] - params := indexStats0.IndexParams - // sort index params by key - sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key }) - indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV) - assert.Equal(t, indexEqual, true) - } - - if findFiledStats { - break - } - } - - err = searchStream.Produce(fn(2)) - assert.NoError(t, err) - searchResult = searchResultStream.Consume() - assert.NotNil(t, searchResult) - err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit) - assert.Nil(t, err) - - idsIndex := indexResult.IDs() - idsSegment := unMarshaledHit.IDs - assert.Equal(t, len(idsIndex), len(idsSegment)) - for i := 0; i < len(idsIndex); i++ { - assert.Equal(t, idsIndex[i], idsSegment[i]) - } - Params.SearchChannelNames = oldSearchChannelNames - Params.SearchResultChannelNames = oldSearchResultChannelNames - Params.LoadIndexChannelNames = oldLoadIndexChannelNames - Params.StatsChannelName = oldStatsChannelName - fmt.Println("loadIndex binaryVector test Done!") - - defer assert.Equal(t, findFiledStats, true) - <-node.queryNodeLoopCtx.Done() - node.Close() -} diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 41f9391e8b..819d2b8554 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -97,9 +97,6 @@ func (node *QueryNode) Close() { if node.searchService != nil { node.searchService.close() } - if node.loadIndexService != nil { - node.loadIndexService.close() - } if node.statsService != nil { node.statsService.close() } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 1217fa3da3..34ec092f52 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -35,7 +35,7 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", - Value: "128", + Value: "16", }, }, IndexParams: []*commonpb.KeyValuePair{ @@ -92,12 +92,8 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar return &collectionMeta } -func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collectionID UniqueID, segmentID UniqueID, optional ...bool) { - isBinary := false - if len(optional) > 0 { - isBinary = optional[0] - } - collectionMeta := genTestCollectionMeta(collectionName, collectionID, isBinary) +func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collectionID UniqueID, segmentID UniqueID) { + collectionMeta := genTestCollectionMeta(collectionName, collectionID, false) schemaBlob := proto.MarshalTextString(collectionMeta.Schema) assert.NotEqual(t, "", schemaBlob) diff --git a/scripts/init_devcontainer.sh b/scripts/init_devcontainer.sh index afbca6fd34..6b30db984e 100755 --- a/scripts/init_devcontainer.sh +++ b/scripts/init_devcontainer.sh @@ -8,15 +8,6 @@ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symli done ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" -unameOut="$(uname -s)" -case "${unameOut}" in - Linux*) machine=Linux;; - Darwin*) machine=Mac;; - CYGWIN*) machine=Cygwin;; - MINGW*) machine=MinGw;; - *) machine="UNKNOWN:${unameOut}" -esac - # Attempt to run in the container with the same UID/GID as we have on the host, # as this results in the correct permissions on files created in the shared # volumes. This isn't always possible, however, as IDs less than 100 are @@ -30,12 +21,8 @@ gid=$(id -g) [ "$uid" -lt 500 ] && uid=501 [ "$gid" -lt 500 ] && gid=$uid -awk 'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1' $ROOT_DIR/docker-compose.yml > $ROOT_DIR/docker-compose-vscode.yml.tmp +awk 'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1' $ROOT_DIR/docker-compose.yml > $ROOT_DIR/docker-compose-vscode.yml.bak -awk 'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1' $ROOT_DIR/docker-compose-vscode.yml.tmp > $ROOT_DIR/docker-compose-vscode.yml +awk 'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1' $ROOT_DIR/docker-compose-vscode.yml.bak > $ROOT_DIR/docker-compose-vscode.yml -if [ "${machine}" == "Mac" ];then - sed -i '' "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml -else - sed -i "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml -fi \ No newline at end of file +sed -i '.bak' "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml