diff --git a/conf/conf.go b/conf/conf.go index 81bd8d2ec6..f4f8eb179c 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -4,6 +4,8 @@ import ( "github.com/czs007/suvlim/storage/pkg/types" yaml "gopkg.in/yaml.v2" "io/ioutil" + "path" + "runtime" ) // yaml.MapSlice @@ -58,10 +60,14 @@ func init() { load_config() } +func getCurrentFileDir() string { + _, fpath, _, _ := runtime.Caller(0) + return path.Dir(fpath) +} + func load_config() { - //var config ServerConfig - filename := "../conf/config.yaml" - source, err := ioutil.ReadFile(filename) + filePath := path.Join(getCurrentFileDir(), "config.yaml") + source, err := ioutil.ReadFile(filePath) if err != nil { panic(err) } diff --git a/conf/config.yaml b/conf/config.yaml index d5b85b439b..97034f8890 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -15,8 +15,8 @@ master: etcd: address: localhost - port: 0 - rootpath: a + port: 2379 + rootpath: suvlim segthreshold: 10000 timesync: diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index 1c16d778ae..a5bbdcd8e0 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -8,7 +8,6 @@ #include #include #include -#include namespace milvus::dog_segment { @@ -335,21 +334,31 @@ void merge_into(int64_t queries, int64_t topk, float *distances, int64_t *uids, const float *new_distances, const int64_t *new_uids) { for(int64_t qn = 0; qn < queries; ++qn) { auto base = qn * topk; - auto dst_dis = distances + base; - auto dst_uids = uids + base; + auto src2_dis = distances + base; + auto src2_uids = uids + base; - auto src_dis = new_distances + base; - auto src_uids = new_uids + base; + auto src1_dis = new_distances + base; + auto src1_uids = new_uids + base; - std::vector buf_dis(2*topk); - std::vector buf_uids(2*topk); + std::vector buf_dis(topk); + std::vector buf_uids(topk); - auto zip_src = tbb::make_zip_iterator(src_dis, src_uids); - auto zip_dst = tbb::make_zip_iterator(dst_dis, dst_uids); - auto zip_buf = tbb::make_zip_iterator(buf_dis.data(), buf_uids.data()); - auto fuck = zip_src + 1; - std::merge(zip_dst, zip_dst + topk, zip_src, zip_src + topk, zip_buf); - std::copy_n(zip_buf, topk, zip_dst); + auto it1 = 0; + auto it2 = 0; + + for(auto buf = 0; buf < topk; ++buf){ + if(src1_dis[it1] <= src2_dis[it2]) { + buf_dis[buf] = src1_dis[it1]; + buf_uids[buf] = src1_uids[it1]; + ++it1; + } else { + buf_dis[buf] = src2_dis[it2]; + buf_uids[buf] = src2_uids[it2]; + ++it2; + } + } + std::copy_n(buf_dis.data(), topk, src2_dis); + std::copy_n(buf_uids.data(), topk, src2_uids); } } diff --git a/core/unittest/test_indexing.cpp b/core/unittest/test_indexing.cpp index d09737f2aa..430fa53552 100644 --- a/core/unittest/test_indexing.cpp +++ b/core/unittest/test_indexing.cpp @@ -18,7 +18,6 @@ #include #include -#include using std::cin; using std::cout; @@ -59,22 +58,32 @@ void merge_into(int64_t queries, int64_t topk, float *distances, int64_t *uids, const float *new_distances, const int64_t *new_uids) { for(int64_t qn = 0; qn < queries; ++qn) { auto base = qn * topk; - auto dst_dis = distances + base; - auto dst_uids = uids + base; + auto src2_dis = distances + base; + auto src2_uids = uids + base; - auto src_dis = new_distances + base; - auto src_uids = new_uids + base; + auto src1_dis = new_distances + base; + auto src1_uids = new_uids + base; - std::vector buf_dis(2*topk); - std::vector buf_uids(2*topk); + std::vector buf_dis(topk); + std::vector buf_uids(topk); - auto zip_src = tbb::make_zip_iterator(src_dis, src_uids); - auto zip_dst = tbb::make_zip_iterator(dst_dis, dst_uids); - auto zip_buf = tbb::make_zip_iterator(buf_dis.data(), buf_uids.data()); - auto fuck = zip_src + 1; - std::merge(zip_dst, zip_dst + topk, zip_src, zip_src + topk, zip_buf); - std::copy_n(zip_buf, topk, zip_dst); - } + auto it1 = 0; + auto it2 = 0; + + for(auto buf = 0; buf < topk; ++buf){ + if(src1_dis[it1] <= src2_dis[it2]) { + buf_dis[buf] = src1_dis[it1]; + buf_uids[buf] = src1_uids[it1]; + ++it1; + } else { + buf_dis[buf] = src2_dis[it2]; + buf_uids[buf] = src2_uids[it2]; + ++it2; + } + } + std::copy_n(buf_dis.data(), topk, src2_dis); + std::copy_n(buf_uids.data(), topk, src2_uids); + } } diff --git a/pkg/master/kv/etcd_kv.go b/pkg/master/kv/etcd_kv.go index 8b4d87305a..dcdd065dd5 100644 --- a/pkg/master/kv/etcd_kv.go +++ b/pkg/master/kv/etcd_kv.go @@ -21,20 +21,43 @@ var ( errTxnFailed = errors.New("failed to commit transaction") ) -type etcdKVBase struct { +type EtcdKVBase struct { client *clientv3.Client rootPath string } // NewEtcdKVBase creates a new etcd kv. -func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { - return &etcdKVBase{ +func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase { + return &EtcdKVBase{ client: client, rootPath: rootPath, } } -func (kv *etcdKVBase) Load(key string) (string, error) { +func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) { + key = path.Join(kv.rootPath, key) + println("in loadWithPrefix,", key) + resp, err := etcdutil.EtcdKVGet(kv.client, key,clientv3.WithPrefix()) + if err != nil { + return [] string {}, [] string {} + } + var keys []string + var values []string + for _,kvs := range resp.Kvs{ + //println(len(kvs.)) + if len(kvs.Key) <= 0{ + println("KKK") + continue + } + keys = append(keys, string(kvs.Key)) + values = append(values, string(kvs.Value)) + } + //println(keys) + //println(values) + return keys, values +} + +func (kv *EtcdKVBase) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) resp, err := etcdutil.EtcdKVGet(kv.client, key) @@ -49,7 +72,7 @@ func (kv *etcdKVBase) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } -func (kv *etcdKVBase) Save(key, value string) error { +func (kv *EtcdKVBase) Save(key, value string) error { key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) @@ -64,7 +87,7 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } -func (kv *etcdKVBase) Remove(key string) error { +func (kv *EtcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) @@ -79,12 +102,18 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } -func (kv *etcdKVBase) Watch(key string) clientv3.WatchChan { +func (kv *EtcdKVBase) Watch(key string) clientv3.WatchChan { key = path.Join(kv.rootPath, key) rch := kv.client.Watch(context.Background(), key) return rch } +func (kv *EtcdKVBase) WatchWithPrefix(key string) clientv3.WatchChan { + key = path.Join(kv.rootPath, key) + rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix()) + return rch +} + // SlowLogTxn wraps etcd transaction and log slow one. type SlowLogTxn struct { clientv3.Txn diff --git a/pkg/master/kv/kv.go b/pkg/master/kv/kv.go index 223fd2b9e2..0485299d10 100644 --- a/pkg/master/kv/kv.go +++ b/pkg/master/kv/kv.go @@ -7,4 +7,6 @@ type Base interface { Save(key, value string) error Remove(key string) error Watch(key string) clientv3.WatchChan + WatchWithPrefix(key string) clientv3.WatchChan + LoadWithPrefix(key string) ( []string, []string) } diff --git a/reader/main.go b/reader/main.go new file mode 100644 index 0000000000..14dabbe2a2 --- /dev/null +++ b/reader/main.go @@ -0,0 +1,26 @@ +package main + +import ( + reader "github.com/czs007/suvlim/reader/read_node" + "sync" +) + +func main() { + pulsarURL := "pulsar://localhost:6650" + + numOfQueryNode := 2 + + go reader.StartQueryNode(pulsarURL, numOfQueryNode, 0) + reader.StartQueryNode(pulsarURL, numOfQueryNode, 1) + + +} + +func main2() { + wg := sync.WaitGroup{} + //ctx, cancel := context.WithCancel(context.Background()) + //defer cancel() + wg.Add(1) + reader.StartQueryNode2() + wg.Wait() +} \ No newline at end of file diff --git a/reader/collection.go b/reader/read_node/collection.go similarity index 81% rename from reader/collection.go rename to reader/read_node/collection.go index 09991e9f9d..2d74a6cde0 100644 --- a/reader/collection.go +++ b/reader/read_node/collection.go @@ -2,9 +2,9 @@ package reader /* -#cgo CFLAGS: -I../core/include +#cgo CFLAGS: -I${SRCDIR}/../../core/include -#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib +#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib #include "collection_c.h" #include "partition_c.h" @@ -16,6 +16,7 @@ import "C" type Collection struct { CollectionPtr C.CCollection CollectionName string + CollectionID uint64 Partitions []*Partition } diff --git a/reader/index.go b/reader/read_node/index.go similarity index 100% rename from reader/index.go rename to reader/read_node/index.go diff --git a/reader/index_test.go b/reader/read_node/index_test.go similarity index 100% rename from reader/index_test.go rename to reader/read_node/index_test.go diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go new file mode 100644 index 0000000000..ace763020a --- /dev/null +++ b/reader/read_node/meta.go @@ -0,0 +1,233 @@ +package reader + +import ( + "context" + "fmt" + "github.com/czs007/suvlim/pkg/master/mock" + "reflect" + "strconv" + "strings" + "sync" + "time" + "github.com/czs007/suvlim/conf" + "github.com/czs007/suvlim/pkg/master/kv" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" +) + +const ( + CollectonPrefix = "/collection/" + SegmentPrefix = "/segment/" +) + + +func GetCollectionObjId(key string) string { + prefix := conf.Config.Etcd.Rootpath + CollectonPrefix + return strings.TrimPrefix(key, prefix) +} + +func GetSegmentObjId(key string) string { + prefix := conf.Config.Etcd.Rootpath + SegmentPrefix + return strings.TrimPrefix(key, prefix) +} + +func isCollectionObj(key string) bool { + prefix := conf.Config.Etcd.Rootpath + CollectonPrefix + prefix = strings.TrimSpace(prefix) + println("prefix is :$", prefix) + index := strings.Index(key, prefix) + println("index is :", index) + return index == 0 +} + +func isSegmentObj(key string) bool { + prefix := conf.Config.Etcd.Rootpath + SegmentPrefix + prefix = strings.TrimSpace(prefix) + index := strings.Index(key, prefix) + return index == 0 +} + +func printCollectionStruct(obj *mock.Collection){ + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) + typeOfS := v.Type() + + for i := 0; i< v.NumField(); i++ { + if typeOfS.Field(i).Name == "GrpcMarshalString"{ + continue + } + fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface()) + } +} + +func printSegmentStruct(obj *mock.Segment){ + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) + typeOfS := v.Type() + + for i := 0; i< v.NumField(); i++ { + fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface()) + } +} + +func (node *QueryNode) processCollectionCreate(id string, value string) { + println(fmt.Sprintf("Create Collection:$%s$", id)) + collection, err := mock.JSON2Collection(value) + if err != nil { + println("error of json 2 collection") + println(err.Error()) + } + printCollectionStruct(collection) +} + +func (node *QueryNode) processSegmentCreate(id string, value string) { + println("Create Segment: ", id) + segment, err := mock.JSON2Segment(value) + if err != nil { + println("error of json 2 segment") + println(err.Error()) + } + printSegmentStruct(segment) +} + +func (node *QueryNode) processCreate(key string, msg string) { + println("process create", key, ":", msg) + if isCollectionObj(key){ + objID := GetCollectionObjId(key) + node.processCollectionCreate(objID, msg) + }else if isSegmentObj(key){ + objID := GetSegmentObjId(key) + node.processSegmentCreate(objID, msg) + }else { + println("can not process create msg:", key) + } +} + +func (node *QueryNode) processSegmentModify(id string, value string) { + println("Modify Segment: ", id) + + segment, err := mock.JSON2Segment(value) + if err != nil { + println("error of json 2 segment") + println(err.Error()) + } + printSegmentStruct(segment) +} + +func (node *QueryNode) processCollectionModify(id string, value string) { + println("Modify Collection: ", id) + collection, err := mock.JSON2Collection(value) + if err != nil { + println("error of json 2 collection") + println(err.Error()) + } + printCollectionStruct(collection) +} + +func (node *QueryNode) processModify(key string, msg string){ + println("process modify") + if isCollectionObj(key){ + objID := GetCollectionObjId(key) + node.processCollectionModify(objID, msg) + }else if isSegmentObj(key){ + objID := GetSegmentObjId(key) + node.processSegmentModify(objID, msg) + }else { + println("can not process modify msg:", key) + } +} + + +func (node *QueryNode) processSegmentDelete(id string){ + println("Delete segment: ", id) + +} +func (node *QueryNode) processCollectionDelete(id string){ + println("Delete collection: ", id) +} + +func (node *QueryNode) processDelete(key string){ + println("process delete") + if isCollectionObj(key){ + objID := GetCollectionObjId(key) + node.processCollectionDelete(objID) + }else if isSegmentObj(key){ + objID := GetSegmentObjId(key) + node.processSegmentDelete(objID) + }else { + println("can not process delete msg:", key) + } +} + +func (node *QueryNode) processResp(resp clientv3.WatchResponse) error { + err := resp.Err() + if err != nil { + return err + } + for _, ev := range resp.Events { + if ev.IsCreate() { + key := string(ev.Kv.Key) + msg := string(ev.Kv.Value) + node.processCreate(key, msg) + } else if ev.IsModify() { + key := string(ev.Kv.Key) + msg := string(ev.Kv.Value) + node.processModify(key, msg) + } else if ev.Type == mvccpb.DELETE { + key := string(ev.Kv.Key) + node.processDelete(key) + } else { + println("Unrecognized etcd msg!") + } + } + return nil +} + +func (node *QueryNode) loadCollections() error { + keys, values := node.kvBase.LoadWithPrefix(CollectonPrefix) + for i:= range keys{ + objID := GetCollectionObjId(keys[i]) + node.processCollectionCreate(objID, values[i]) + } + return nil +} +func (node *QueryNode) loadSegments() error { + keys, values := node.kvBase.LoadWithPrefix(SegmentPrefix) + for i:= range keys{ + objID := GetSegmentObjId(keys[i]) + node.processSegmentCreate(objID, values[i]) + } + return nil +} + +func (node *QueryNode) InitFromMeta() error { + //pass + etcdAddr := "http://" + etcdAddr += conf.Config.Etcd.Address + etcdPort := conf.Config.Etcd.Port + etcdAddr = etcdAddr + ":" + strconv.FormatInt(int64(etcdPort), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + defer cli.Close() + node.kvBase = kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + node.loadCollections() + node.loadSegments() + return nil +} + +func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) { + node.InitFromMeta() + metaChan := node.kvBase.WatchWithPrefix("") + for { + select { + case <-ctx.Done(): + wg.Done() + println("DONE!!!!!!") + return + case resp := <-metaChan: + node.processResp(resp) + } + } +} \ No newline at end of file diff --git a/reader/partition.go b/reader/read_node/partition.go similarity index 82% rename from reader/partition.go rename to reader/read_node/partition.go index 7e5da29261..b887c1b5f1 100644 --- a/reader/partition.go +++ b/reader/read_node/partition.go @@ -2,9 +2,9 @@ package reader /* -#cgo CFLAGS: -I../core/include +#cgo CFLAGS: -I${SRCDIR}/../../core/include -#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib +#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib #include "collection_c.h" #include "partition_c.h" diff --git a/reader/query_node.go b/reader/read_node/query_node.go similarity index 94% rename from reader/query_node.go rename to reader/read_node/query_node.go index 3f761d7e94..7c09f641cd 100644 --- a/reader/query_node.go +++ b/reader/read_node/query_node.go @@ -2,9 +2,9 @@ package reader /* -#cgo CFLAGS: -I../core/include +#cgo CFLAGS: -I${SRCDIR}/../../core/include -#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib +#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib #include "collection_c.h" #include "partition_c.h" @@ -15,11 +15,14 @@ import "C" import ( "fmt" - msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" - "github.com/czs007/suvlim/reader/message_client" "sort" "sync" "sync/atomic" + + msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/czs007/suvlim/pkg/master/kv" + "github.com/czs007/suvlim/reader/message_client" + //"github.com/stretchr/testify/assert" ) type InsertData struct { @@ -54,16 +57,17 @@ type QueryNodeDataBuffer struct { } type QueryNode struct { - QueryNodeId uint64 - Collections []*Collection - SegmentsMap map[int64]*Segment - messageClient *message_client.MessageClient + QueryNodeId uint64 + Collections []*Collection + SegmentsMap map[int64]*Segment + messageClient *message_client.MessageClient //mc *message_client.MessageClient queryNodeTimeSync *QueryNodeTime buffer QueryNodeDataBuffer deletePreprocessData DeletePreprocessData deleteData DeleteData insertData InsertData + kvBase *kv.EtcdKVBase } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { @@ -87,12 +91,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { } return &QueryNode{ - QueryNodeId: queryNodeId, - Collections: nil, - SegmentsMap: segmentsMap, - messageClient: &mc, - queryNodeTimeSync: queryNodeTimeSync, - buffer: buffer, + QueryNodeId: queryNodeId, + Collections: nil, + SegmentsMap: segmentsMap, + messageClient: &mc, + queryNodeTimeSync: queryNodeTimeSync, + buffer: buffer, } } @@ -119,12 +123,12 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes } return &QueryNode{ - QueryNodeId: queryNodeId, - Collections: nil, - SegmentsMap: segmentsMap, - messageClient: mc, - queryNodeTimeSync: queryNodeTimeSync, - buffer: buffer, + QueryNodeId: queryNodeId, + Collections: nil, + SegmentsMap: segmentsMap, + messageClient: mc, + queryNodeTimeSync: queryNodeTimeSync, + buffer: buffer, } } @@ -173,7 +177,7 @@ func (node *QueryNode) DeleteCollection(collection *Collection) { //////////////////////////////////////////////////////////////////////////////////////////////////// func (node *QueryNode) PrepareBatchMsg() []int { - var msgLen= node.messageClient.PrepareBatchMsg() + var msgLen = node.messageClient.PrepareBatchMsg() return msgLen } @@ -189,7 +193,7 @@ func (node *QueryNode) InitQueryNodeCollection() { //////////////////////////////////////////////////////////////////////////////////////////////////// -func (node *QueryNode) RunInsertDelete(wg * sync.WaitGroup) { +func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { for { // TODO: get timeRange from message client var msgLen = node.PrepareBatchMsg() @@ -271,7 +275,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } // 2. Remove invalid messages from buffer. - tmpInsertOrDeleteBuffer := make([]*msgPb.InsertOrDeleteMsg ,0) + tmpInsertOrDeleteBuffer := make([]*msgPb.InsertOrDeleteMsg, 0) for i, isValid := range node.buffer.validInsertDeleteBuffer { if isValid { tmpInsertOrDeleteBuffer = append(tmpInsertOrDeleteBuffer, node.buffer.InsertDeleteBuffer[i]) diff --git a/reader/query_node_time.go b/reader/read_node/query_node_time.go similarity index 100% rename from reader/query_node_time.go rename to reader/read_node/query_node_time.go diff --git a/reader/quety_node_test.go b/reader/read_node/quety_node_test.go similarity index 100% rename from reader/quety_node_test.go rename to reader/read_node/quety_node_test.go diff --git a/reader/reader.go b/reader/read_node/reader.go similarity index 70% rename from reader/reader.go rename to reader/read_node/reader.go index 8afe3edc76..82bf102068 100644 --- a/reader/reader.go +++ b/reader/read_node/reader.go @@ -1,9 +1,11 @@ package reader import ( - "github.com/czs007/suvlim/reader/message_client" + "context" "log" "sync" + + "github.com/czs007/suvlim/reader/message_client" ) func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) { @@ -32,3 +34,16 @@ func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) { wg.Wait() qn.Close() } + +func StartQueryNode2() { + ctx := context.Background() + qn := CreateQueryNode(0, 0, nil) + //qn.InitQueryNodeCollection() + wg := sync.WaitGroup{} + wg.Add(1) + //go qn.RunInsertDelete(&wg) + //go qn.RunSearch(&wg) + go qn.RunMetaService(ctx, &wg) + wg.Wait() + qn.Close() +} diff --git a/reader/reader_test.go b/reader/read_node/reader_test.go similarity index 100% rename from reader/reader_test.go rename to reader/read_node/reader_test.go diff --git a/reader/result.go b/reader/read_node/result.go similarity index 100% rename from reader/result.go rename to reader/read_node/result.go diff --git a/reader/result_test.go b/reader/read_node/result_test.go similarity index 100% rename from reader/result_test.go rename to reader/read_node/result_test.go diff --git a/reader/segment.go b/reader/read_node/segment.go similarity index 97% rename from reader/segment.go rename to reader/read_node/segment.go index 0aac20ae56..cfbbd2f4da 100644 --- a/reader/segment.go +++ b/reader/read_node/segment.go @@ -2,9 +2,9 @@ package reader /* -#cgo CFLAGS: -I../core/include +#cgo CFLAGS: -I${SRCDIR}/../../core/include -#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib +#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib #include "collection_c.h" #include "partition_c.h" diff --git a/reader/segment_service.go b/reader/read_node/segment_service.go similarity index 100% rename from reader/segment_service.go rename to reader/read_node/segment_service.go diff --git a/reader/segment_service_test.go b/reader/read_node/segment_service_test.go similarity index 100% rename from reader/segment_service_test.go rename to reader/read_node/segment_service_test.go diff --git a/reader/segment_test.go b/reader/read_node/segment_test.go similarity index 100% rename from reader/segment_test.go rename to reader/read_node/segment_test.go diff --git a/reader/util_functions.go b/reader/read_node/util_functions.go similarity index 100% rename from reader/util_functions.go rename to reader/read_node/util_functions.go diff --git a/reader/util_functions_test.go b/reader/read_node/util_functions_test.go similarity index 100% rename from reader/util_functions_test.go rename to reader/read_node/util_functions_test.go