Check builder images

Signed-off-by: quicksilver <zhifeng.zhang@zilliz.com>
This commit is contained in:
quicksilver 2020-11-17 18:36:03 +08:00 committed by yefu.chen
parent 4f04be4a28
commit 72ed65d7f8
28 changed files with 493 additions and 796 deletions

View File

@ -59,5 +59,7 @@ jobs:
run: | run: |
cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose up -d cd ${GITHUB_WORKSPACE}/deployments/docker && docker-compose up -d
- name: Build and UnitTest - name: Build and UnitTest
env:
CHECK_BUILDER: "1"
run: | run: |
./build/builder.sh ./build/builder.sh

View File

@ -18,6 +18,9 @@ if [ "${1-}" = "gdbserver" ]; then
chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}" chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
docker-compose pull --ignore-pull-failures gdbserver docker-compose pull --ignore-pull-failures gdbserver
if [ "${CHECK_BUILDER:-}" == "1" ]; then
docker-compose build gdbserver
fi
docker-compose up -d gdbserver docker-compose up -d gdbserver
exit 0 exit 0
fi fi
@ -39,6 +42,9 @@ mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-cache"
chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}" chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
docker-compose pull --ignore-pull-failures ubuntu docker-compose pull --ignore-pull-failures ubuntu
if [ "${CHECK_BUILDER:-}" == "1" ]; then
docker-compose build ubuntu
fi
docker-compose run --rm -u "$uid:$gid" ubuntu "$@" docker-compose run --rm -u "$uid:$gid" ubuntu "$@"
popd popd

View File

@ -27,8 +27,8 @@ func main() {
sig = <-sc sig = <-sc
cancel() cancel()
}() }()
pulsarAddress, _ := reader.Params.PulsarAddress()
reader.StartQueryNode(ctx) reader.StartQueryNode(ctx, pulsarAddress)
switch sig { switch sig {
case syscall.SIGTERM: case syscall.SIGTERM:

View File

@ -167,7 +167,7 @@ func (t *describeCollectionTask) Ts() (Timestamp, error) {
if t.req == nil { if t.req == nil {
return 0, errors.New("null request") return 0, errors.New("null request")
} }
return t.req.Timestamp, nil return Timestamp(t.req.Timestamp), nil
} }
func (t *describeCollectionTask) Execute() error { func (t *describeCollectionTask) Execute() error {
@ -199,7 +199,7 @@ func (t *showCollectionsTask) Ts() (Timestamp, error) {
if t.req == nil { if t.req == nil {
return 0, errors.New("null request") return 0, errors.New("null request")
} }
return t.req.Timestamp, nil return Timestamp(t.req.Timestamp), nil
} }
func (t *showCollectionsTask) Execute() error { func (t *showCollectionsTask) Execute() error {

View File

@ -132,6 +132,17 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes)) return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes))
} }
// mt.ddLock.Lock() before call this function
func (mt *metaTable) deleteSegmentMeta(segID UniqueID) error {
_, ok := mt.segID2Meta[segID]
if ok {
delete(mt.segID2Meta, segID)
}
return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
}
// mt.ddLock.Lock() before call this function // mt.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error { func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error {
segIDStrs := make([]string, 0, len(segIDs)) segIDStrs := make([]string, 0, len(segIDs))

View File

@ -73,7 +73,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll2", Name: "coll2",
}, },
CreateTime: 0, CreateTime: 0,
SegmentIDs: []UniqueID{}, SegmentIDs: []UniqueID{1},
PartitionTags: []string{"1"}, PartitionTags: []string{"1"},
} }
segID1 := pb.SegmentMeta{ segID1 := pb.SegmentMeta{
@ -121,16 +121,11 @@ func TestMetaTable_Collection(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddSegment(&segID3) err = meta.AddSegment(&segID3)
assert.Nil(t, err) assert.Nil(t, err)
getColMeta, err := meta.GetCollectionByName("coll5") getColMeta, err := meta.GetCollectionByName(colMeta.Schema.Name)
assert.NotNil(t, err)
assert.Nil(t, getColMeta)
getColMeta, err = meta.GetCollectionByName(colMeta.Schema.Name)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 3, len(getColMeta.SegmentIDs)) assert.Equal(t, 3, len(getColMeta.SegmentIDs))
err = meta.DeleteCollection(colMeta.ID) err = meta.DeleteCollection(colMeta.ID)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.DeleteCollection(500)
assert.NotNil(t, err)
hasCollection = meta.HasCollection(colMeta.ID) hasCollection = meta.HasCollection(colMeta.ID)
assert.False(t, hasCollection) assert.False(t, hasCollection)
_, err = meta.GetSegmentByID(segID1.SegmentID) _, err = meta.GetSegmentByID(segID1.SegmentID)
@ -198,14 +193,10 @@ func TestMetaTable_DeletePartition(t *testing.T) {
} }
err = meta.AddCollection(&colMeta) err = meta.AddCollection(&colMeta)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddPartition(500, "p1")
assert.NotNil(t, err)
err = meta.AddPartition(colMeta.ID, "p1") err = meta.AddPartition(colMeta.ID, "p1")
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p2") err = meta.AddPartition(colMeta.ID, "p2")
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p2")
assert.NotNil(t, err)
err = meta.AddSegment(&segID1) err = meta.AddSegment(&segID1)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddSegment(&segID2) err = meta.AddSegment(&segID2)
@ -218,8 +209,6 @@ func TestMetaTable_DeletePartition(t *testing.T) {
assert.Equal(t, 3, len(afterCollMeta.SegmentIDs)) assert.Equal(t, 3, len(afterCollMeta.SegmentIDs))
err = meta.DeletePartition(100, "p1") err = meta.DeletePartition(100, "p1")
assert.Nil(t, err) assert.Nil(t, err)
err = meta.DeletePartition(500, "p1")
assert.NotNil(t, err)
afterCollMeta, err = meta.GetCollectionByName("coll1") afterCollMeta, err = meta.GetCollectionByName("coll1")
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(afterCollMeta.PartitionTags)) assert.Equal(t, 1, len(afterCollMeta.PartitionTags))
@ -296,16 +285,12 @@ func TestMetaTable_Segment(t *testing.T) {
assert.Equal(t, &segMeta, getSegMeta) assert.Equal(t, &segMeta, getSegMeta)
err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111) err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.CloseSegment(1000, Timestamp(11), 111)
assert.NotNil(t, err)
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID) getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, getSegMeta.NumRows, int64(111)) assert.Equal(t, getSegMeta.NumRows, int64(111))
assert.Equal(t, getSegMeta.CloseTime, uint64(11)) assert.Equal(t, getSegMeta.CloseTime, uint64(11))
err = meta.DeleteSegment(segMeta.SegmentID) err = meta.DeleteSegment(segMeta.SegmentID)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.DeleteSegment(1000)
assert.NotNil(t, err)
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID) getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, getSegMeta) assert.Nil(t, getSegMeta)
assert.NotNil(t, err) assert.NotNil(t, err)

View File

@ -1,302 +0,0 @@
package master
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func TestMaster_CreateCollectionTask(t *testing.T) {
req := internalpb.CreateCollectionRequest{
MsgType: internalpb.MsgType_kCreateCollection,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
Schema: nil,
}
var collectionTask task = &createCollectionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kCreateCollection, collectionTask.Type())
ts, err := collectionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
collectionTask = &createCollectionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
ts, err = collectionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = collectionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_DropCollectionTask(t *testing.T) {
req := internalpb.DropCollectionRequest{
MsgType: internalpb.MsgType_kDropPartition,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
CollectionName: nil,
}
var collectionTask task = &dropCollectionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kDropPartition, collectionTask.Type())
ts, err := collectionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
collectionTask = &dropCollectionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
ts, err = collectionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = collectionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_HasCollectionTask(t *testing.T) {
req := internalpb.HasCollectionRequest{
MsgType: internalpb.MsgType_kHasCollection,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
CollectionName: nil,
}
var collectionTask task = &hasCollectionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kHasCollection, collectionTask.Type())
ts, err := collectionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
collectionTask = &hasCollectionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
ts, err = collectionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = collectionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_ShowCollectionTask(t *testing.T) {
req := internalpb.ShowCollectionRequest{
MsgType: internalpb.MsgType_kShowCollections,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
}
var collectionTask task = &showCollectionsTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kShowCollections, collectionTask.Type())
ts, err := collectionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
collectionTask = &showCollectionsTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
ts, err = collectionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = collectionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_DescribeCollectionTask(t *testing.T) {
req := internalpb.DescribeCollectionRequest{
MsgType: internalpb.MsgType_kDescribeCollection,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
CollectionName: nil,
}
var collectionTask task = &describeCollectionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kDescribeCollection, collectionTask.Type())
ts, err := collectionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
collectionTask = &describeCollectionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
ts, err = collectionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = collectionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_CreatePartitionTask(t *testing.T) {
req := internalpb.CreatePartitionRequest{
MsgType: internalpb.MsgType_kCreatePartition,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
PartitionName: nil,
}
var partitionTask task = &createPartitionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kCreatePartition, partitionTask.Type())
ts, err := partitionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
partitionTask = &createPartitionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
ts, err = partitionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = partitionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_DropPartitionTask(t *testing.T) {
req := internalpb.DropPartitionRequest{
MsgType: internalpb.MsgType_kDropPartition,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
PartitionName: nil,
}
var partitionTask task = &dropPartitionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kDropPartition, partitionTask.Type())
ts, err := partitionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
partitionTask = &dropPartitionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
ts, err = partitionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = partitionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_HasPartitionTask(t *testing.T) {
req := internalpb.HasPartitionRequest{
MsgType: internalpb.MsgType_kHasPartition,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
PartitionName: nil,
}
var partitionTask task = &hasPartitionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kHasPartition, partitionTask.Type())
ts, err := partitionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
partitionTask = &hasPartitionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
ts, err = partitionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = partitionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_DescribePartitionTask(t *testing.T) {
req := internalpb.DescribePartitionRequest{
MsgType: internalpb.MsgType_kDescribePartition,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
PartitionName: nil,
}
var partitionTask task = &describePartitionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kDescribePartition, partitionTask.Type())
ts, err := partitionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
partitionTask = &describePartitionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
ts, err = partitionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = partitionTask.Execute()
assert.NotNil(t, err)
}
func TestMaster_ShowPartitionTask(t *testing.T) {
req := internalpb.ShowPartitionRequest{
MsgType: internalpb.MsgType_kShowPartitions,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
}
var partitionTask task = &showPartitionTask{
req: &req,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kShowPartitions, partitionTask.Type())
ts, err := partitionTask.Ts()
assert.Equal(t, uint64(11), ts)
assert.Nil(t, err)
partitionTask = &showPartitionTask{
req: nil,
baseTask: baseTask{},
}
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
ts, err = partitionTask.Ts()
assert.Equal(t, uint64(0), ts)
assert.NotNil(t, err)
err = partitionTask.Execute()
assert.NotNil(t, err)
}

View File

@ -20,19 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
) )
/* type container interface {
* collectionReplica contains a in-memory local copy of persistent collections.
* In common cases, the system has multiple query nodes. Data of a collection will be
* distributed across all the available query nodes, and each query node's collectionReplica
* will maintain its own share (only part of the collection).
* Every replica tracks a value called tSafe which is the maximum timestamp that the replica
* is up-to-date.
*/
type collectionReplica interface {
// tSafe
getTSafe() Timestamp
setTSafe(t Timestamp)
// collection // collection
getCollectionNum() int getCollectionNum() int
addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
@ -56,51 +44,36 @@ type collectionReplica interface {
hasSegment(segmentID UniqueID) bool hasSegment(segmentID UniqueID) bool
} }
type collectionReplicaImpl struct { // TODO: rename
tSafeMu sync.Mutex type colSegContainer struct {
tSafe Timestamp
mu sync.RWMutex mu sync.RWMutex
collections []*Collection collections []*Collection
segments map[UniqueID]*Segment segments map[UniqueID]*Segment
} }
//----------------------------------------------------------------------------------------------------- tSafe
func (colReplica *collectionReplicaImpl) getTSafe() Timestamp {
colReplica.tSafeMu.Lock()
defer colReplica.tSafeMu.Unlock()
return colReplica.tSafe
}
func (colReplica *collectionReplicaImpl) setTSafe(t Timestamp) {
colReplica.tSafeMu.Lock()
colReplica.tSafe = t
colReplica.tSafeMu.Unlock()
}
//----------------------------------------------------------------------------------------------------- collection //----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getCollectionNum() int { func (container *colSegContainer) getCollectionNum() int {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
return len(colReplica.collections) return len(container.collections)
} }
func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
var newCollection = newCollection(collMeta, colMetaBlob) var newCollection = newCollection(collMeta, colMetaBlob)
colReplica.collections = append(colReplica.collections, newCollection) container.collections = append(container.collections, newCollection)
return nil return nil
} }
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { func (container *colSegContainer) removeCollection(collectionID UniqueID) error {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := container.getCollectionByID(collectionID)
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
if err != nil { if err != nil {
return err return err
@ -109,11 +82,11 @@ func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID)
deleteCollection(collection) deleteCollection(collection)
tmpCollections := make([]*Collection, 0) tmpCollections := make([]*Collection, 0)
for _, col := range colReplica.collections { for _, col := range container.collections {
if col.ID() == collectionID { if col.ID() == collectionID {
for _, p := range *col.Partitions() { for _, p := range *col.Partitions() {
for _, s := range *p.Segments() { for _, s := range *p.Segments() {
delete(colReplica.segments, s.ID()) delete(container.segments, s.ID())
} }
} }
} else { } else {
@ -121,15 +94,15 @@ func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID)
} }
} }
colReplica.collections = tmpCollections container.collections = tmpCollections
return nil return nil
} }
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
for _, collection := range colReplica.collections { for _, collection := range container.collections {
if collection.ID() == collectionID { if collection.ID() == collectionID {
return collection, nil return collection, nil
} }
@ -138,11 +111,11 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID
return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10)) return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
} }
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) { func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
for _, collection := range colReplica.collections { for _, collection := range container.collections {
if collection.Name() == collectionName { if collection.Name() == collectionName {
return collection, nil return collection, nil
} }
@ -152,14 +125,14 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri
} }
//----------------------------------------------------------------------------------------------------- partition //----------------------------------------------------------------------------------------------------- partition
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := container.getCollectionByID(collectionID)
if err != nil { if err != nil {
return err return err
} }
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
var newPartition = newPartition(partitionTag) var newPartition = newPartition(partitionTag)
@ -167,20 +140,20 @@ func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, par
return nil return nil
} }
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error { func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := container.getCollectionByID(collectionID)
if err != nil { if err != nil {
return err return err
} }
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
var tmpPartitions = make([]*Partition, 0) var tmpPartitions = make([]*Partition, 0)
for _, p := range *collection.Partitions() { for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag { if p.Tag() == partitionTag {
for _, s := range *p.Segments() { for _, s := range *p.Segments() {
delete(colReplica.segments, s.ID()) delete(container.segments, s.ID())
} }
} else { } else {
tmpPartitions = append(tmpPartitions, p) tmpPartitions = append(tmpPartitions, p)
@ -191,14 +164,14 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID,
return nil return nil
} }
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := container.getCollectionByID(collectionID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
for _, p := range *collection.Partitions() { for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag { if p.Tag() == partitionTag {
@ -210,17 +183,17 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID
} }
//----------------------------------------------------------------------------------------------------- segment //----------------------------------------------------------------------------------------------------- segment
func (colReplica *collectionReplicaImpl) getSegmentNum() int { func (container *colSegContainer) getSegmentNum() int {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
return len(colReplica.segments) return len(container.segments)
} }
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats { func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats {
var statisticData = make([]*internalpb.SegmentStats, 0) var statisticData = make([]*internalpb.SegmentStats, 0)
for segmentID, segment := range colReplica.segments { for segmentID, segment := range container.segments {
currentMemSize := segment.getMemSize() currentMemSize := segment.getMemSize()
segment.lastMemSize = currentMemSize segment.lastMemSize = currentMemSize
segmentNumOfRows := segment.getRowCount() segmentNumOfRows := segment.getRowCount()
@ -242,36 +215,36 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.Quer
} }
} }
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := container.getCollectionByID(collectionID)
if err != nil { if err != nil {
return err return err
} }
partition, err := colReplica.getPartitionByTag(collectionID, partitionTag) partition, err := container.getPartitionByTag(collectionID, partitionTag)
if err != nil { if err != nil {
return err return err
} }
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
var newSegment = newSegment(collection, segmentID) var newSegment = newSegment(collection, segmentID)
colReplica.segments[segmentID] = newSegment container.segments[segmentID] = newSegment
*partition.Segments() = append(*partition.Segments(), newSegment) *partition.Segments() = append(*partition.Segments(), newSegment)
return nil return nil
} }
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
colReplica.mu.Lock() container.mu.Lock()
defer colReplica.mu.Unlock() defer container.mu.Unlock()
var targetPartition *Partition var targetPartition *Partition
var segmentIndex = -1 var segmentIndex = -1
for _, col := range colReplica.collections { for _, col := range container.collections {
for _, p := range *col.Partitions() { for _, p := range *col.Partitions() {
for i, s := range *p.Segments() { for i, s := range *p.Segments() {
if s.ID() == segmentID { if s.ID() == segmentID {
@ -282,7 +255,7 @@ func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error
} }
} }
delete(colReplica.segments, segmentID) delete(container.segments, segmentID)
if targetPartition != nil && segmentIndex > 0 { if targetPartition != nil && segmentIndex > 0 {
targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...) targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
@ -291,11 +264,11 @@ func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error
return nil return nil
} }
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
targetSegment, ok := colReplica.segments[segmentID] targetSegment, ok := container.segments[segmentID]
if !ok { if !ok {
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
@ -304,11 +277,11 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se
return targetSegment, nil return targetSegment, nil
} }
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { func (container *colSegContainer) hasSegment(segmentID UniqueID) bool {
colReplica.mu.RLock() container.mu.RLock()
defer colReplica.mu.RUnlock() defer container.mu.RUnlock()
_, ok := colReplica.segments[segmentID] _, ok := container.segments[segmentID]
return ok return ok
} }

View File

@ -15,7 +15,8 @@ import (
//----------------------------------------------------------------------------------------------------- collection //----------------------------------------------------------------------------------------------------- collection
func TestColSegContainer_addCollection(t *testing.T) { func TestColSegContainer_addCollection(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -58,19 +59,20 @@ func TestColSegContainer_addCollection(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
} }
func TestColSegContainer_removeCollection(t *testing.T) { func TestColSegContainer_removeCollection(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -114,24 +116,25 @@ func TestColSegContainer_removeCollection(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).removeCollection(collectionID) err = (*node.container).removeCollection(collectionID)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, (*node.replica).getCollectionNum(), 0) assert.Equal(t, (*node.container).getCollectionNum(), 0)
} }
func TestColSegContainer_getCollectionByID(t *testing.T) { func TestColSegContainer_getCollectionByID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -174,17 +177,17 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
targetCollection, err := (*node.replica).getCollectionByID(UniqueID(0)) targetCollection, err := (*node.container).getCollectionByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, targetCollection) assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -193,7 +196,8 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
func TestColSegContainer_getCollectionByName(t *testing.T) { func TestColSegContainer_getCollectionByName(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -236,17 +240,17 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
targetCollection, err := (*node.replica).getCollectionByName("collection0") targetCollection, err := (*node.container).getCollectionByName("collection0")
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, targetCollection) assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -256,7 +260,8 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
//----------------------------------------------------------------------------------------------------- partition //----------------------------------------------------------------------------------------------------- partition
func TestColSegContainer_addPartition(t *testing.T) { func TestColSegContainer_addPartition(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -300,20 +305,20 @@ func TestColSegContainer_addPartition(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID) assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags { for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collectionID, tag) err := (*node.container).addPartition(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag) partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default") assert.Equal(t, partition.partitionTag, "default")
} }
@ -321,7 +326,8 @@ func TestColSegContainer_addPartition(t *testing.T) {
func TestColSegContainer_removePartition(t *testing.T) { func TestColSegContainer_removePartition(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -366,30 +372,31 @@ func TestColSegContainer_removePartition(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID) assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags { for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collectionID, tag) err := (*node.container).addPartition(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag) partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, partitionTag) assert.Equal(t, partition.partitionTag, partitionTag)
err = (*node.replica).removePartition(collectionID, partitionTag) err = (*node.container).removePartition(collectionID, partitionTag)
assert.NoError(t, err) assert.NoError(t, err)
} }
} }
func TestColSegContainer_getPartitionByTag(t *testing.T) { func TestColSegContainer_getPartitionByTag(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -433,20 +440,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID) assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags { for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collectionID, tag) err := (*node.container).addPartition(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag) partition, err := (*node.container).getPartitionByTag(collectionID, tag)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default") assert.Equal(t, partition.partitionTag, "default")
assert.NotNil(t, partition) assert.NotNil(t, partition)
@ -456,7 +463,8 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
//----------------------------------------------------------------------------------------------------- segment //----------------------------------------------------------------------------------------------------- segment
func TestColSegContainer_addSegment(t *testing.T) { func TestColSegContainer_addSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -500,24 +508,24 @@ func TestColSegContainer_addSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
const segmentNum = 3 const segmentNum = 3
for i := 0; i < segmentNum; i++ { for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err) assert.NoError(t, err)
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
} }
@ -525,7 +533,8 @@ func TestColSegContainer_addSegment(t *testing.T) {
func TestColSegContainer_removeSegment(t *testing.T) { func TestColSegContainer_removeSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -569,34 +578,35 @@ func TestColSegContainer_removeSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
const segmentNum = 3 const segmentNum = 3
for i := 0; i < segmentNum; i++ { for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err) assert.NoError(t, err)
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
err = (*node.replica).removeSegment(UniqueID(i)) err = (*node.container).removeSegment(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
} }
} }
func TestColSegContainer_getSegmentByID(t *testing.T) { func TestColSegContainer_getSegmentByID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -640,24 +650,24 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
const segmentNum = 3 const segmentNum = 3
for i := 0; i < segmentNum; i++ { for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err) assert.NoError(t, err)
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
} }
@ -665,7 +675,8 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
func TestColSegContainer_hasSegment(t *testing.T) { func TestColSegContainer_hasSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(0) collectionID := UniqueID(0)
@ -709,29 +720,29 @@ func TestColSegContainer_hasSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
const segmentNum = 3 const segmentNum = 3
for i := 0; i < segmentNum; i++ { for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err) assert.NoError(t, err)
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
hasSeg := (*node.replica).hasSegment(UniqueID(i)) hasSeg := (*node.container).hasSegment(UniqueID(i))
assert.Equal(t, hasSeg, true) assert.Equal(t, hasSeg, true)
hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) hasSeg = (*node.container).hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false) assert.Equal(t, hasSeg, false)
} }
} }

View File

@ -13,7 +13,8 @@ import (
func TestCollection_Partitions(t *testing.T) { func TestCollection_Partitions(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -56,18 +57,18 @@ func TestCollection_Partitions(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags { for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collection.ID(), tag) err := (*node.container).addPartition(collection.ID(), tag)
assert.NoError(t, err) assert.NoError(t, err)
} }

View File

@ -4,23 +4,33 @@ import (
"context" "context"
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
) )
type dataSyncService struct { type dataSyncService struct {
ctx context.Context ctx context.Context
fg *flowgraph.TimeTickedFlowGraph pulsarURL string
fg *flowgraph.TimeTickedFlowGraph
replica *collectionReplica // input streams
dmStream *msgstream.MsgStream
// ddStream *msgstream.MsgStream
// k2sStream *msgstream.MsgStream
node *QueryNode
} }
func newDataSyncService(ctx context.Context, replica *collectionReplica) *dataSyncService { func newDataSyncService(ctx context.Context, node *QueryNode, pulsarURL string) *dataSyncService {
return &dataSyncService{ return &dataSyncService{
ctx: ctx, ctx: ctx,
fg: nil, pulsarURL: pulsarURL,
fg: nil,
replica: replica, dmStream: nil,
node: node,
} }
} }
@ -31,6 +41,7 @@ func (dsService *dataSyncService) start() {
func (dsService *dataSyncService) close() { func (dsService *dataSyncService) close() {
dsService.fg.Close() dsService.fg.Close()
(*dsService.dmStream).Close()
} }
func (dsService *dataSyncService) initNodes() { func (dsService *dataSyncService) initNodes() {
@ -38,10 +49,10 @@ func (dsService *dataSyncService) initNodes() {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode Node = newDmInputNode(dsService.ctx) var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL)
var filterDmNode Node = newFilteredDmNode() var filterDmNode Node = newFilteredDmNode()
var insertNode Node = newInsertNode(dsService.replica) var insertNode Node = newInsertNode(dsService.node.container)
var serviceTimeNode Node = newServiceTimeNode(dsService.replica) var serviceTimeNode Node = newServiceTimeNode(dsService.node)
dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&filterDmNode) dsService.fg.AddNode(&filterDmNode)
@ -79,4 +90,21 @@ func (dsService *dataSyncService) initNodes() {
if err != nil { if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name()) log.Fatal("set edges failed in node:", serviceTimeNode.Name())
} }
dsService.setDmStream(&dmStreamNode)
} }
func (dsService *dataSyncService) setDmStream(node *Node) {
if (*node).IsInputNode() {
inStream, ok := (*node).(*InputNode)
dsService.dmStream = inStream.InStream()
if !ok {
log.Fatal("Invalid inputNode")
}
} else {
log.Fatal("stream set failed")
}
}
func (dsService *dataSyncService) setDdStream(node *Node) {}
func (dsService *dataSyncService) setK2sStream(node *Node) {}

View File

@ -19,7 +19,6 @@ import (
// NOTE: start pulsar before test // NOTE: start pulsar before test
func TestManipulationService_Start(t *testing.T) { func TestManipulationService_Start(t *testing.T) {
Params.Init()
var ctx context.Context var ctx context.Context
if closeWithDeadline { if closeWithDeadline {
@ -33,7 +32,7 @@ func TestManipulationService_Start(t *testing.T) {
// init query node // init query node
pulsarURL := "pulsar://localhost:6650" pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0, pulsarURL)
// init meta // init meta
collectionName := "collection0" collectionName := "collection0"
@ -77,20 +76,20 @@ func TestManipulationService_Start(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
segmentID := UniqueID(0) segmentID := UniqueID(0)
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
// test data generate // test data generate
@ -169,7 +168,7 @@ func TestManipulationService_Start(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// dataSync // dataSync
node.dataSyncService = newDataSyncService(node.ctx, node.replica) node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
go node.dataSyncService.start() go node.dataSyncService.start()
node.Close() node.Close()

View File

@ -10,7 +10,7 @@ import (
type insertNode struct { type insertNode struct {
BaseNode BaseNode
replica *collectionReplica container *container
} }
type InsertData struct { type InsertData struct {
@ -58,13 +58,13 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
// check if segment exists, if not, create this segment // check if segment exists, if not, create this segment
if !(*iNode.replica).hasSegment(task.SegmentID) { if !(*iNode.container).hasSegment(task.SegmentID) {
collection, err := (*iNode.replica).getCollectionByName(task.CollectionName) collection, err := (*iNode.container).getCollectionByName(task.CollectionName)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
continue continue
} }
err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
if err != nil { if err != nil {
log.Println(err) log.Println(err)
continue continue
@ -74,7 +74,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// 2. do preInsert // 2. do preInsert
for segmentID := range insertData.insertRecords { for segmentID := range insertData.insertRecords {
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
if err != nil { if err != nil {
log.Println("preInsert failed") log.Println("preInsert failed")
// TODO: add error handling // TODO: add error handling
@ -102,7 +102,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
} }
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
if err != nil { if err != nil {
log.Println("cannot find segment:", segmentID) log.Println("cannot find segment:", segmentID)
// TODO: add error handling // TODO: add error handling
@ -125,13 +125,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
wg.Done() wg.Done()
} }
func newInsertNode(replica *collectionReplica) *insertNode { func newInsertNode(container *container) *insertNode {
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism) baseNode.SetMaxParallelism(maxParallelism)
return &insertNode{ return &insertNode{
BaseNode: baseNode, BaseNode: baseNode,
replica: replica, container: container,
} }
} }

View File

@ -2,28 +2,22 @@ package reader
import ( import (
"context" "context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
) )
func newDmInputNode(ctx context.Context) *flowgraph.InputNode { func newDmInputNode(ctx context.Context, pulsarURL string) *flowgraph.InputNode {
const ( const (
receiveBufSize = 1024 receiveBufSize = 1024
pulsarBufSize = 1024 pulsarBufSize = 1024
) )
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := []string{"insert"} consumeChannels := []string{"insert"}
consumeSubName := "insertSub" consumeSubName := "insertSub"
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarCient(msgStreamURL) insertStream.SetPulsarCient(pulsarURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

View File

@ -6,7 +6,7 @@ import (
type serviceTimeNode struct { type serviceTimeNode struct {
BaseNode BaseNode
replica *collectionReplica node *QueryNode
} }
func (stNode *serviceTimeNode) Name() string { func (stNode *serviceTimeNode) Name() string {
@ -28,17 +28,17 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
} }
// update service time // update service time
(*stNode.replica).setTSafe(serviceTimeMsg.timeRange.timestampMax) stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax)
return nil return nil
} }
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { func newServiceTimeNode(node *QueryNode) *serviceTimeNode {
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism) baseNode.SetMaxParallelism(maxParallelism)
return &serviceTimeNode{ return &serviceTimeNode{
BaseNode: baseNode, BaseNode: baseNode,
replica: replica, node: node,
} }
} }

View File

@ -24,12 +24,12 @@ const (
) )
type metaService struct { type metaService struct {
ctx context.Context ctx context.Context
kvBase *kv.EtcdKV kvBase *kv.EtcdKV
replica *collectionReplica container *container
} }
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService { func newMetaService(ctx context.Context, container *container) *metaService {
ETCDAddr, err := Params.EtcdAddress() ETCDAddr, err := Params.EtcdAddress()
if err != nil { if err != nil {
panic(err) panic(err)
@ -46,9 +46,9 @@ func newMetaService(ctx context.Context, replica *collectionReplica) *metaServic
}) })
return &metaService{ return &metaService{
ctx: ctx, ctx: ctx,
kvBase: kv.NewEtcdKV(cli, ETCDRootPath), kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
replica: replica, container: container,
} }
} }
@ -164,12 +164,12 @@ func (mService *metaService) processCollectionCreate(id string, value string) {
col := mService.collectionUnmarshal(value) col := mService.collectionUnmarshal(value)
if col != nil { if col != nil {
err := (*mService.replica).addCollection(col, value) err := (*mService.container).addCollection(col, value)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
for _, partitionTag := range col.PartitionTags { for _, partitionTag := range col.PartitionTags {
err = (*mService.replica).addPartition(col.ID, partitionTag) err = (*mService.container).addPartition(col.ID, partitionTag)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
@ -187,7 +187,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
// TODO: what if seg == nil? We need to notify master and return rpc request failed // TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil { if seg != nil {
err := (*mService.replica).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID) err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@ -216,7 +216,7 @@ func (mService *metaService) processSegmentModify(id string, value string) {
} }
if seg != nil { if seg != nil {
targetSegment, err := (*mService.replica).getSegmentByID(seg.SegmentID) targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@ -251,7 +251,7 @@ func (mService *metaService) processSegmentDelete(id string) {
log.Println("Cannot parse segment id:" + id) log.Println("Cannot parse segment id:" + id)
} }
err = (*mService.replica).removeSegment(segmentID) err = (*mService.container).removeSegment(segmentID)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@ -266,7 +266,7 @@ func (mService *metaService) processCollectionDelete(id string) {
log.Println("Cannot parse collection id:" + id) log.Println("Cannot parse collection id:" + id)
} }
err = (*mService.replica).removeCollection(collectionID) err = (*mService.container).removeCollection(collectionID)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return

View File

@ -27,8 +27,9 @@ func TestMetaService_start(t *testing.T) {
} }
// init query node // init query node
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
(*node.metaService).start() (*node.metaService).start()
} }
@ -186,8 +187,9 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
id := "0" id := "0"
value := `schema: < value := `schema: <
@ -215,10 +217,10 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
node.metaService.processCollectionCreate(id, value) node.metaService.processCollectionCreate(id, value)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
} }
@ -231,8 +233,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -275,10 +278,10 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta) colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default") err = (*node.container).addPartition(UniqueID(0), "default")
assert.NoError(t, err) assert.NoError(t, err)
id := "0" id := "0"
@ -290,7 +293,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
(*node.metaService).processSegmentCreate(id, value) (*node.metaService).processSegmentCreate(id, value)
s, err := (*node.replica).getSegmentByID(UniqueID(0)) s, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0)) assert.Equal(t, s.segmentID, UniqueID(0))
} }
@ -303,8 +306,9 @@ func TestMetaService_processCreate(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
key1 := "by-dev/collection/0" key1 := "by-dev/collection/0"
msg1 := `schema: < msg1 := `schema: <
@ -331,10 +335,10 @@ func TestMetaService_processCreate(t *testing.T) {
` `
(*node.metaService).processCreate(key1, msg1) (*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
@ -346,7 +350,7 @@ func TestMetaService_processCreate(t *testing.T) {
` `
(*node.metaService).processCreate(key2, msg2) (*node.metaService).processCreate(key2, msg2)
s, err := (*node.replica).getSegmentByID(UniqueID(0)) s, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0)) assert.Equal(t, s.segmentID, UniqueID(0))
} }
@ -359,8 +363,9 @@ func TestMetaService_processSegmentModify(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -403,10 +408,10 @@ func TestMetaService_processSegmentModify(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta) colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default") err = (*node.container).addPartition(UniqueID(0), "default")
assert.NoError(t, err) assert.NoError(t, err)
id := "0" id := "0"
@ -417,7 +422,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
` `
(*node.metaService).processSegmentCreate(id, value) (*node.metaService).processSegmentCreate(id, value)
s, err := (*node.replica).getSegmentByID(UniqueID(0)) s, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0)) assert.Equal(t, s.segmentID, UniqueID(0))
@ -429,7 +434,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
// TODO: modify segment for testing processCollectionModify // TODO: modify segment for testing processCollectionModify
(*node.metaService).processSegmentModify(id, newValue) (*node.metaService).processSegmentModify(id, newValue)
seg, err := (*node.replica).getSegmentByID(UniqueID(0)) seg, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0)) assert.Equal(t, seg.segmentID, UniqueID(0))
} }
@ -442,8 +447,9 @@ func TestMetaService_processCollectionModify(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
id := "0" id := "0"
value := `schema: < value := `schema: <
@ -470,10 +476,10 @@ func TestMetaService_processCollectionModify(t *testing.T) {
` `
(*node.metaService).processCollectionCreate(id, value) (*node.metaService).processCollectionCreate(id, value)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
@ -502,7 +508,7 @@ func TestMetaService_processCollectionModify(t *testing.T) {
` `
(*node.metaService).processCollectionModify(id, newValue) (*node.metaService).processCollectionModify(id, newValue)
collection, err = (*node.replica).getCollectionByName("test") collection, err = (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
} }
@ -515,8 +521,9 @@ func TestMetaService_processModify(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
key1 := "by-dev/collection/0" key1 := "by-dev/collection/0"
msg1 := `schema: < msg1 := `schema: <
@ -543,10 +550,10 @@ func TestMetaService_processModify(t *testing.T) {
` `
(*node.metaService).processCreate(key1, msg1) (*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
@ -558,7 +565,7 @@ func TestMetaService_processModify(t *testing.T) {
` `
(*node.metaService).processCreate(key2, msg2) (*node.metaService).processCreate(key2, msg2)
s, err := (*node.replica).getSegmentByID(UniqueID(0)) s, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0)) assert.Equal(t, s.segmentID, UniqueID(0))
@ -588,7 +595,7 @@ func TestMetaService_processModify(t *testing.T) {
` `
(*node.metaService).processModify(key1, msg3) (*node.metaService).processModify(key1, msg3)
collection, err = (*node.replica).getCollectionByName("test") collection, err = (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
@ -600,7 +607,7 @@ func TestMetaService_processModify(t *testing.T) {
// TODO: modify segment for testing processCollectionModify // TODO: modify segment for testing processCollectionModify
(*node.metaService).processModify(key2, msg4) (*node.metaService).processModify(key2, msg4)
seg, err := (*node.replica).getSegmentByID(UniqueID(0)) seg, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0)) assert.Equal(t, seg.segmentID, UniqueID(0))
} }
@ -613,8 +620,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -657,10 +665,10 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta) colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err) assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default") err = (*node.container).addPartition(UniqueID(0), "default")
assert.NoError(t, err) assert.NoError(t, err)
id := "0" id := "0"
@ -671,12 +679,12 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
` `
(*node.metaService).processSegmentCreate(id, value) (*node.metaService).processSegmentCreate(id, value)
seg, err := (*node.replica).getSegmentByID(UniqueID(0)) seg, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0)) assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processSegmentDelete("0") (*node.metaService).processSegmentDelete("0")
mapSize := (*node.replica).getSegmentNum() mapSize := (*node.container).getSegmentNum()
assert.Equal(t, mapSize, 0) assert.Equal(t, mapSize, 0)
} }
@ -688,8 +696,9 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
id := "0" id := "0"
value := `schema: < value := `schema: <
@ -716,15 +725,15 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
` `
(*node.metaService).processCollectionCreate(id, value) (*node.metaService).processCollectionCreate(id, value)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
(*node.metaService).processCollectionDelete(id) (*node.metaService).processCollectionDelete(id)
collectionNum = (*node.replica).getCollectionNum() collectionNum = (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 0) assert.Equal(t, collectionNum, 0)
} }
@ -736,8 +745,9 @@ func TestMetaService_processDelete(t *testing.T) {
defer cancel() defer cancel()
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
key1 := "by-dev/collection/0" key1 := "by-dev/collection/0"
msg1 := `schema: < msg1 := `schema: <
@ -764,10 +774,10 @@ func TestMetaService_processDelete(t *testing.T) {
` `
(*node.metaService).processCreate(key1, msg1) (*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.replica).getCollectionNum() collectionNum := (*node.container).getCollectionNum()
assert.Equal(t, collectionNum, 1) assert.Equal(t, collectionNum, 1)
collection, err := (*node.replica).getCollectionByName("test") collection, err := (*node.container).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
@ -779,15 +789,15 @@ func TestMetaService_processDelete(t *testing.T) {
` `
(*node.metaService).processCreate(key2, msg2) (*node.metaService).processCreate(key2, msg2)
seg, err := (*node.replica).getSegmentByID(UniqueID(0)) seg, err := (*node.container).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0)) assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processDelete(key1) (*node.metaService).processDelete(key1)
collectionsSize := (*node.replica).getCollectionNum() collectionsSize := (*node.container).getCollectionNum()
assert.Equal(t, collectionsSize, 0) assert.Equal(t, collectionsSize, 0)
mapSize := (*node.replica).getSegmentNum() mapSize := (*node.container).getSegmentNum()
assert.Equal(t, mapSize, 0) assert.Equal(t, mapSize, 0)
} }
@ -805,8 +815,9 @@ func TestMetaService_processResp(t *testing.T) {
} }
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
metaChan := (*node.metaService).kvBase.WatchWithPrefix("") metaChan := (*node.metaService).kvBase.WatchWithPrefix("")
@ -832,8 +843,9 @@ func TestMetaService_loadCollections(t *testing.T) {
} }
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
err2 := (*node.metaService).loadCollections() err2 := (*node.metaService).loadCollections()
assert.Nil(t, err2) assert.Nil(t, err2)
@ -853,8 +865,9 @@ func TestMetaService_loadSegments(t *testing.T) {
} }
// init metaService // init metaService
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node.metaService = newMetaService(ctx, node.replica) node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
err2 := (*node.metaService).loadSegments() err2 := (*node.metaService).loadSegments()
assert.Nil(t, err2) assert.Nil(t, err2)

View File

@ -1,25 +0,0 @@
package reader
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_QueryNodeID(t *testing.T) {
Params.InitParamTable()
id := Params.QueryNodeID()
assert.Equal(t, id, 0)
}
func TestParamTable_TopicStart(t *testing.T) {
Params.InitParamTable()
topicStart := Params.TopicStart()
assert.Equal(t, topicStart, 0)
}
func TestParamTable_TopicEnd(t *testing.T) {
Params.InitParamTable()
topicEnd := Params.TopicEnd()
assert.Equal(t, topicEnd, 128)
}

View File

@ -16,23 +16,6 @@ func (p *ParamTable) InitParamTable() {
p.Init() p.Init()
} }
func (p *ParamTable) PulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return "pulsar://" + url, nil
}
func (p *ParamTable) QueryNodeID() int {
queryNodeID, _ := p.Load("reader.clientid")
id, err := strconv.Atoi(queryNodeID)
if err != nil {
panic(err)
}
return id
}
func (p *ParamTable) TopicStart() int { func (p *ParamTable) TopicStart() int {
topicStart, _ := p.Load("reader.topicstart") topicStart, _ := p.Load("reader.topicstart")
topicStartNum, err := strconv.Atoi(topicStart) topicStartNum, err := strconv.Atoi(topicStart)

View File

@ -13,7 +13,8 @@ import (
func TestPartition_Segments(t *testing.T) { func TestPartition_Segments(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
collectionName := "collection0" collectionName := "collection0"
fieldVec := schemapb.FieldSchema{ fieldVec := schemapb.FieldSchema{
@ -56,17 +57,17 @@ func TestPartition_Segments(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags { for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collection.ID(), tag) err := (*node.container).addPartition(collection.ID(), tag)
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -77,7 +78,7 @@ func TestPartition_Segments(t *testing.T) {
const segmentNum = 3 const segmentNum = 3
for i := 0; i < segmentNum; i++ { for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID()) err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
assert.NoError(t, err) assert.NoError(t, err)
} }

View File

@ -14,14 +14,18 @@ import "C"
import ( import (
"context" "context"
"sync"
) )
type QueryNode struct { type QueryNode struct {
ctx context.Context ctx context.Context
QueryNodeID uint64 QueryNodeID uint64
pulsarURL string
replica *collectionReplica tSafe tSafe
container *container
dataSyncService *dataSyncService dataSyncService *dataSyncService
metaService *metaService metaService *metaService
@ -29,21 +33,36 @@ type QueryNode struct {
statsService *statsService statsService *statsService
} }
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { type tSafe interface {
getTSafe() Timestamp
setTSafe(t Timestamp)
}
type serviceTime struct {
tSafeMu sync.Mutex
time Timestamp
}
func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode {
segmentsMap := make(map[int64]*Segment) segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0) collections := make([]*Collection, 0)
var replica collectionReplica = &collectionReplicaImpl{ var container container = &colSegContainer{
collections: collections, collections: collections,
segments: segmentsMap, segments: segmentsMap,
} }
var tSafe tSafe = &serviceTime{}
return &QueryNode{ return &QueryNode{
ctx: ctx, ctx: ctx,
QueryNodeID: queryNodeID, QueryNodeID: queryNodeID,
pulsarURL: pulsarURL,
replica: &replica, tSafe: tSafe,
container: &container,
dataSyncService: nil, dataSyncService: nil,
metaService: nil, metaService: nil,
@ -53,10 +72,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
} }
func (node *QueryNode) Start() { func (node *QueryNode) Start() {
node.dataSyncService = newDataSyncService(node.ctx, node.replica) node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
node.searchService = newSearchService(node.ctx, node.replica) node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
node.metaService = newMetaService(node.ctx, node.replica) node.metaService = newMetaService(node.ctx, node.container)
node.statsService = newStatsService(node.ctx, node.replica) node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
go node.dataSyncService.start() go node.dataSyncService.start()
// go node.searchService.start() // go node.searchService.start()
@ -67,3 +86,15 @@ func (node *QueryNode) Start() {
func (node *QueryNode) Close() { func (node *QueryNode) Close() {
// TODO: close services // TODO: close services
} }
func (st *serviceTime) getTSafe() Timestamp {
st.tSafeMu.Lock()
defer st.tSafeMu.Unlock()
return st.time
}
func (st *serviceTime) setTSafe(t Timestamp) {
st.tSafeMu.Lock()
st.time = t
st.tSafeMu.Unlock()
}

View File

@ -23,6 +23,10 @@ func TestQueryNode_start(t *testing.T) {
ctx = context.Background() ctx = context.Background()
} }
node := NewQueryNode(ctx, 0) pulsarAddr, err := Params.PulsarAddress()
if err != nil {
panic(err)
}
node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr)
node.Start() node.Start()
} }

View File

@ -8,8 +8,8 @@ func Init() {
Params.Init() Params.Init()
} }
func StartQueryNode(ctx context.Context) { func StartQueryNode(ctx context.Context, pulsarURL string) {
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0, pulsarURL)
node.Start() node.Start()
} }

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"sort" "sort"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -20,7 +19,7 @@ type searchService struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
replica *collectionReplica node *QueryNode
searchMsgStream *msgstream.MsgStream searchMsgStream *msgstream.MsgStream
searchResultMsgStream *msgstream.MsgStream searchResultMsgStream *msgstream.MsgStream
} }
@ -32,29 +31,24 @@ type SearchResult struct {
ResultDistances []float32 ResultDistances []float32
} }
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService {
const ( const (
//TODO:: read config file //TODO:: read config file
receiveBufSize = 1024 receiveBufSize = 1024
pulsarBufSize = 1024 pulsarBufSize = 1024
) )
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := []string{"search"} consumeChannels := []string{"search"}
consumeSubName := "subSearch" consumeSubName := "subSearch"
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarCient(msgStreamURL) searchStream.SetPulsarCient(pulsarURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var inputStream msgstream.MsgStream = searchStream var inputStream msgstream.MsgStream = searchStream
producerChannels := []string{"searchResult"} producerChannels := []string{"searchResult"}
searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchResultStream.SetPulsarCient(msgStreamURL) searchResultStream.SetPulsarCient(pulsarURL)
searchResultStream.CreatePulsarProducers(producerChannels) searchResultStream.CreatePulsarProducers(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream var outputStream msgstream.MsgStream = searchResultStream
@ -63,7 +57,7 @@ func newSearchService(ctx context.Context, replica *collectionReplica) *searchSe
ctx: searchServiceCtx, ctx: searchServiceCtx,
cancel: searchServiceCancel, cancel: searchServiceCancel,
replica: replica, node: node,
searchMsgStream: &inputStream, searchMsgStream: &inputStream,
searchResultMsgStream: &outputStream, searchResultMsgStream: &outputStream,
} }
@ -126,7 +120,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
} }
collectionName := query.CollectionName collectionName := query.CollectionName
partitionTags := query.PartitionTags partitionTags := query.PartitionTags
collection, err := (*ss.replica).getCollectionByName(collectionName) collection, err := (*ss.node.container).getCollectionByName(collectionName)
if err != nil { if err != nil {
return err return err
} }
@ -156,7 +150,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
// 3. Do search in all segments // 3. Do search in all segments
for _, partitionTag := range partitionTags { for _, partitionTag := range partitionTags {
partition, err := (*ss.replica).getPartitionByTag(collectionID, partitionTag) partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag)
if err != nil { if err != nil {
return err return err
} }

View File

@ -21,13 +21,12 @@ import (
) )
func TestSearch_Search(t *testing.T) { func TestSearch_Search(t *testing.T) {
Params.Init()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// init query node // init query node
pulsarURL := "pulsar://localhost:6650" pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0, pulsarURL)
// init meta // init meta
collectionName := "collection0" collectionName := "collection0"
@ -71,20 +70,20 @@ func TestSearch_Search(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
segmentID := UniqueID(0) segmentID := UniqueID(0)
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
// test data generate // test data generate
@ -163,7 +162,7 @@ func TestSearch_Search(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// dataSync // dataSync
node.dataSyncService = newDataSyncService(node.ctx, node.replica) node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
go node.dataSyncService.start() go node.dataSyncService.start()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
@ -234,7 +233,7 @@ func TestSearch_Search(t *testing.T) {
err = searchMsgStream.Produce(&msgPackSearch) err = searchMsgStream.Produce(&msgPackSearch)
assert.NoError(t, err) assert.NoError(t, err)
node.searchService = newSearchService(node.ctx, node.replica) node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
go node.searchService.start() go node.searchService.start()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)

View File

@ -13,17 +13,21 @@ import (
) )
type statsService struct { type statsService struct {
ctx context.Context ctx context.Context
statsStream *msgstream.MsgStream pulsarURL string
replica *collectionReplica
msgStream *msgstream.MsgStream
container *container
} }
func newStatsService(ctx context.Context, replica *collectionReplica) *statsService { func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService {
return &statsService{ return &statsService{
ctx: ctx, ctx: ctx,
statsStream: nil, pulsarURL: pulsarURL,
replica: replica, msgStream: nil,
container: container,
} }
} }
@ -34,20 +38,16 @@ func (sService *statsService) start() {
) )
// start pulsar // start pulsar
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
producerChannels := []string{"statistic"} producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarCient(msgStreamURL) statsStream.SetPulsarCient(sService.pulsarURL)
statsStream.CreatePulsarProducers(producerChannels) statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream var statsMsgStream msgstream.MsgStream = statsStream
sService.statsStream = &statsMsgStream sService.msgStream = &statsMsgStream
(*sService.statsStream).Start() (*sService.msgStream).Start()
// start service // start service
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
@ -62,7 +62,7 @@ func (sService *statsService) start() {
} }
func (sService *statsService) sendSegmentStatistic() { func (sService *statsService) sendSegmentStatistic() {
statisticData := (*sService.replica).getSegmentStatistics() statisticData := (*sService.container).getSegmentStatistics()
// fmt.Println("Publish segment statistic") // fmt.Println("Publish segment statistic")
// fmt.Println(statisticData) // fmt.Println(statisticData)
@ -80,7 +80,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg
var msgPack = msgstream.MsgPack{ var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg}, Msgs: []msgstream.TsMsg{msg},
} }
err := (*sService.statsStream).Produce(&msgPack) err := (*sService.msgStream).Produce(&msgPack)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }

View File

@ -15,87 +15,6 @@ import (
// NOTE: start pulsar before test // NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) { func TestStatsService_start(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
// init query node
node := NewQueryNode(ctx, 0)
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// start stats service
node.statsService = newStatsService(node.ctx, node.replica)
node.statsService.start()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
Params.Init()
var ctx context.Context var ctx context.Context
if closeWithDeadline { if closeWithDeadline {
@ -109,7 +28,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// init query node // init query node
pulsarURL := "pulsar://localhost:6650" pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0, pulsarURL)
// init meta // init meta
collectionName := "collection0" collectionName := "collection0"
@ -153,20 +72,100 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta) collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob) assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err) assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName) collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err) assert.NoError(t, err)
segmentID := UniqueID(0) segmentID := UniqueID(0)
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// start stats service
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService.start()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
const receiveBufSize = 1024 const receiveBufSize = 1024
@ -179,9 +178,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
var statsMsgStream msgstream.MsgStream = statsStream var statsMsgStream msgstream.MsgStream = statsStream
node.statsService = newStatsService(node.ctx, node.replica) node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService.statsStream = &statsMsgStream node.statsService.msgStream = &statsMsgStream
(*node.statsService.statsStream).Start() (*node.statsService.msgStream).Start()
// send stats // send stats
node.statsService.sendSegmentStatistic() node.statsService.sendSegmentStatistic()

View File

@ -2,7 +2,6 @@ package flowgraph
import ( import (
"context" "context"
"log"
"sync" "sync"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
@ -69,15 +68,6 @@ func (fg *TimeTickedFlowGraph) Start() {
func (fg *TimeTickedFlowGraph) Close() { func (fg *TimeTickedFlowGraph) Close() {
for _, v := range fg.nodeCtx { for _, v := range fg.nodeCtx {
// close message stream
if (*v.node).IsInputNode() {
inStream, ok := (*v.node).(*InputNode)
if !ok {
log.Fatal("Invalid inputNode")
}
(*inStream.inStream).Close()
}
// close input channels
v.Close() v.Close()
} }
} }