Add interfaces with other modules

Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-01-22 19:43:27 +08:00 committed by yefu.chen
parent 92e3c519d9
commit 2f7319cdbb
12 changed files with 706 additions and 152 deletions

View File

@ -1,23 +1,54 @@
package dataservice package dataservice
import (
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
)
type allocator interface { type allocator interface {
allocTimestamp() (Timestamp, error) allocTimestamp() (Timestamp, error)
allocID() (UniqueID, error) allocID() (UniqueID, error)
} }
type allocatorImpl struct { type allocatorImpl struct {
// TODO call allocate functions in client.go in master service masterClient *masterservice.GrpcClient
} }
// TODO implements func newAllocatorImpl(masterClient *masterservice.GrpcClient) *allocatorImpl {
func newAllocatorImpl() *allocatorImpl { return &allocatorImpl{
return nil masterClient: masterClient,
}
} }
func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
return 0, nil resp, err := allocator.masterClient.AllocTimestamp(&masterpb.TsoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: -1, // todo
},
Count: 1,
})
if err != nil {
return 0, err
}
return resp.Timestamp, nil
} }
func (allocator *allocatorImpl) allocID() (UniqueID, error) { func (allocator *allocatorImpl) allocID() (UniqueID, error) {
return 0, nil resp, err := allocator.masterClient.AllocID(&masterpb.IDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: -1, // todo
},
Count: 1,
})
if err != nil {
return 0, err
}
return resp.ID, nil
} }

View File

@ -0,0 +1,127 @@
package dataservice
import (
"log"
"sort"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
)
type (
dataNode struct {
id int64
address struct {
ip string
port int64
}
client *datanode.Client
channelNum int
}
dataNodeCluster struct {
mu sync.RWMutex
finishCh chan struct{}
nodes []*dataNode
}
)
func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
return &dataNodeCluster{
finishCh: finishCh,
nodes: make([]*dataNode, 0),
}
}
func (c *dataNodeCluster) Register(ip string, port int64, id int64) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.checkDataNodeNotExist(ip, port) {
c.nodes = append(c.nodes, &dataNode{
id: id,
address: struct {
ip string
port int64
}{ip: ip, port: port},
channelNum: 0,
})
}
if len(c.nodes) == Params.DataNodeNum {
close(c.finishCh)
}
}
func (c *dataNodeCluster) checkDataNodeNotExist(ip string, port int64) bool {
for _, node := range c.nodes {
if node.address.ip == ip || node.address.port == port {
return false
}
}
return true
}
func (c *dataNodeCluster) GetNumOfNodes() int {
return len(c.nodes)
}
func (c *dataNodeCluster) GetNodeIDs() []int64 {
c.mu.RLock()
defer c.mu.RUnlock()
ret := make([]int64, len(c.nodes))
for _, node := range c.nodes {
ret = append(ret, node.id)
}
return ret
}
func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
c.mu.Lock()
defer c.mu.Unlock()
sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum })
for i, group := range groups {
err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: -1, // todo
Timestamp: 0, // todo
SourceID: -1, // todo
},
ChannelNames: group,
})
if err != nil {
log.Println(err.Error())
continue
}
}
}
func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
ret := make([]*internalpb2.ComponentInfo, 0)
for _, node := range c.nodes {
states, err := node.client.GetComponentStates(nil)
if err != nil {
log.Println(err.Error())
continue
}
ret = append(ret, states.State)
}
return ret, nil
}
func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, node := range c.nodes {
if err := node.client.FlushSegments(request); err != nil {
log.Println(err.Error())
continue
}
}
}

View File

@ -8,16 +8,12 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
) )
type ( type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
errSegmentNotFound struct { errSegmentNotFound struct {
segmentID UniqueID segmentID UniqueID
} }
@ -33,7 +29,6 @@ type (
client kv.TxnBase // client of a reliable kv service, i.e. etcd client client kv.TxnBase // client of a reliable kv service, i.e. etcd client
collID2Info map[UniqueID]*collectionInfo // collection id to collection info collID2Info map[UniqueID]*collectionInfo // collection id to collection info
segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info
allocator allocator allocator allocator
ddLock sync.RWMutex ddLock sync.RWMutex
} }

View File

@ -0,0 +1,48 @@
package dataservice
import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
)
func newMemoryMeta(allocator allocator) (*meta, error) {
memoryKV := memkv.NewMemoryKV()
return newMeta(memoryKV, allocator)
}
type MockAllocator struct {
cnt int64
}
func (m *MockAllocator) allocTimestamp() (Timestamp, error) {
val := atomic.AddInt64(&m.cnt, 1)
phy := time.Now().UnixNano() / int64(time.Millisecond)
ts := tsoutil.ComposeTS(phy, val)
return ts, nil
}
func (m *MockAllocator) allocID() (UniqueID, error) {
val := atomic.AddInt64(&m.cnt, 1)
return val, nil
}
func newMockAllocator() *MockAllocator {
return &MockAllocator{}
}
func NewTestSchema() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: "test",
Description: "schema for test used",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{FieldID: 1, Name: "field1", IsPrimaryKey: false, Description: "field no.1", DataType: schemapb.DataType_STRING},
{FieldID: 2, Name: "field2", IsPrimaryKey: false, Description: "field no.2", DataType: schemapb.DataType_VECTOR_FLOAT},
},
}
}

View File

@ -9,6 +9,9 @@ type ParamTable struct {
Address string Address string
Port int Port int
NodeID int64
MasterAddress string
EtcdAddress string EtcdAddress string
MetaRootPath string MetaRootPath string
@ -25,7 +28,7 @@ type ParamTable struct {
InsertChannelNumPerCollection int64 InsertChannelNumPerCollection int64
StatisticsChannelName string StatisticsChannelName string
TimeTickChannelName string TimeTickChannelName string
DataNodeNum int64 DataNodeNum int
} }
var Params ParamTable var Params ParamTable
@ -42,6 +45,7 @@ func (p *ParamTable) Init() {
// set members // set members
p.initAddress() p.initAddress()
p.initPort() p.initPort()
p.NodeID = 1 // todo
p.initEtcdAddress() p.initEtcdAddress()
p.initMetaRootPath() p.initMetaRootPath()
@ -51,6 +55,12 @@ func (p *ParamTable) Init() {
p.initSegmentSize() p.initSegmentSize()
p.initSegmentSizeFactor() p.initSegmentSizeFactor()
p.initDefaultRecordSize() p.initDefaultRecordSize()
p.initSegIDAssignExpiration()
p.initInsertChannelPrefixName()
p.initInsertChannelNumPerCollection()
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initDataNodeNum()
} }
func (p *ParamTable) initAddress() { func (p *ParamTable) initAddress() {
@ -115,3 +125,28 @@ func (p *ParamTable) initSegmentSizeFactor() {
func (p *ParamTable) initDefaultRecordSize() { func (p *ParamTable) initDefaultRecordSize() {
p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord") p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord")
} }
// TODO read from config/env
func (p *ParamTable) initSegIDAssignExpiration() {
p.SegIDAssignExpiration = 3000 //ms
}
func (p *ParamTable) initInsertChannelPrefixName() {
p.InsertChannelPrefixName = "insert-channel-"
}
func (p *ParamTable) initInsertChannelNumPerCollection() {
p.InsertChannelNumPerCollection = 4
}
func (p *ParamTable) initStatisticsChannelName() {
p.StatisticsChannelName = "dataservice-statistics-channel"
}
func (p *ParamTable) initTimeTickChannelName() {
p.TimeTickChannelName = "dataservice-timetick-channel"
}
func (p *ParamTable) initDataNodeNum() {
p.DataNodeNum = 2
}

View File

@ -2,10 +2,13 @@ package dataservice
import ( import (
"fmt" "fmt"
"log"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
@ -26,7 +29,7 @@ func (err errRemainInSufficient) Error() string {
// segmentAllocator is used to allocate rows for segments and record the allocations. // segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocator interface { type segmentAllocator interface {
// OpenSegment add the segment to allocator and set it allocatable // OpenSegment add the segment to allocator and set it allocatable
OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelGroup) error OpenSegment(segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation. // AllocSegment allocate rows and record the allocation.
AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error) AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment. // GetSealedSegments get all sealed segment.
@ -37,6 +40,8 @@ type segmentAllocator interface {
DropSegment(segmentID UniqueID) DropSegment(segmentID UniqueID)
// ExpireAllocations check all allocations' expire time and remove the expired allocation. // ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(timeTick Timestamp) error ExpireAllocations(timeTick Timestamp) error
// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
SealAllSegments(collectionID UniqueID) (bool, []UniqueID)
// IsAllocationsExpired check all allocations of segment expired. // IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
} }
@ -50,7 +55,7 @@ type (
sealed bool sealed bool
lastExpireTime Timestamp lastExpireTime Timestamp
allocations []*allocation allocations []*allocation
cRange channelGroup channelGroup channelGroup
} }
allocation struct { allocation struct {
rowNums int rowNums int
@ -67,9 +72,9 @@ type (
} }
) )
func newSegmentAssigner(metaTable *meta, allocator allocator) (*segmentAllocatorImpl, error) { func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl, error) {
segmentAllocator := &segmentAllocatorImpl{ segmentAllocator := &segmentAllocatorImpl{
mt: metaTable, mt: meta,
segments: make(map[UniqueID]*segmentStatus), segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration, segmentExpireDuration: Params.SegIDAssignExpiration,
segmentThreshold: Params.SegmentSize * 1024 * 1024, segmentThreshold: Params.SegmentSize * 1024 * 1024,
@ -79,22 +84,22 @@ func newSegmentAssigner(metaTable *meta, allocator allocator) (*segmentAllocator
return segmentAllocator, nil return segmentAllocator, nil
} }
func (allocator *segmentAllocatorImpl) OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelGroup) error { func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
if _, ok := allocator.segments[segmentID]; ok { if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentID) return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
} }
totalRows, err := allocator.estimateTotalRows(collectionID) totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
if err != nil { if err != nil {
return err return err
} }
allocator.segments[segmentID] = &segmentStatus{ allocator.segments[segmentInfo.SegmentID] = &segmentStatus{
id: segmentID, id: segmentInfo.SegmentID,
collectionID: collectionID, collectionID: segmentInfo.CollectionID,
partitionID: partitionID, partitionID: segmentInfo.PartitionID,
total: totalRows, total: totalRows,
sealed: false, sealed: false,
lastExpireTime: 0, lastExpireTime: 0,
cRange: cRange, channelGroup: segmentInfo.InsertChannels,
} }
return nil return nil
} }
@ -106,7 +111,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
for _, segStatus := range allocator.segments { for _, segStatus := range allocator.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID || if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
!segStatus.cRange.Contains(channelName) { !segStatus.channelGroup.Contains(channelName) {
continue continue
} }
var success bool var success bool
@ -240,3 +245,24 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID,
} }
return status.lastExpireTime <= ts, nil return status.lastExpireTime <= ts, nil
} }
func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) (bool, []UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
failed := make([]UniqueID, 0)
success := true
for _, status := range allocator.segments {
if status.collectionID == collectionID {
if status.sealed {
continue
}
if err := allocator.mt.SealSegment(status.id); err != nil {
log.Printf("seal segment error: %s", err.Error())
failed = append(failed, status.id)
success = false
}
status.sealed = true
}
}
return success, failed
}

View File

@ -0,0 +1,129 @@
package dataservice
import (
"math"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestAllocSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
collID, err := mockAllocator.allocID()
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,
})
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
assert.Nil(t, err)
cases := []struct {
collectionID UniqueID
partitionID UniqueID
channelName string
requestRows int
expectResult bool
}{
{collID, 100, "c1", 100, true},
{collID + 1, 100, "c1", 100, false},
{collID, 101, "c1", 100, false},
{collID, 100, "c3", 100, false},
{collID, 100, "c1", math.MaxInt64, false},
}
for _, c := range cases {
id, count, expireTime, err := segAllocator.AllocSegment(c.collectionID, c.partitionID, c.channelName, c.requestRows)
if c.expectResult {
assert.Nil(t, err)
assert.EqualValues(t, c.requestRows, count)
assert.NotEqualValues(t, 0, id)
assert.NotEqualValues(t, 0, expireTime)
} else {
assert.NotNil(t, err)
}
}
}
func TestSealSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
collID, err := mockAllocator.allocID()
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,
})
assert.Nil(t, err)
var lastSegID UniqueID
for i := 0; i < 10; i++ {
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c" + strconv.Itoa(i)})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
assert.Nil(t, err)
lastSegID = segmentInfo.SegmentID
}
err = segAllocator.SealSegment(lastSegID)
assert.Nil(t, err)
success, ids := segAllocator.SealAllSegments(collID)
assert.True(t, success)
assert.EqualValues(t, 0, len(ids))
sealedSegments, err := segAllocator.GetSealedSegments()
assert.Nil(t, err)
assert.EqualValues(t, 10, sealedSegments)
}
func TestExpireSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := NewTestSchema()
collID, err := mockAllocator.allocID()
err = meta.AddCollection(&collectionInfo{
ID: collID,
Schema: schema,
})
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
assert.Nil(t, err)
id1, _, _, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
assert.Nil(t, err)
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
ts, err := mockAllocator.allocTimestamp()
assert.Nil(t, err)
err = segAllocator.ExpireAllocations(ts)
assert.Nil(t, err)
expired, err := segAllocator.IsAllocationsExpired(id1, ts)
assert.Nil(t, err)
assert.True(t, expired)
assert.EqualValues(t, 0, len(segAllocator.segments[id1].allocations))
}

View File

@ -4,7 +4,12 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"sync" "time"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@ -19,6 +24,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
const role = "dataservice"
type DataService interface { type DataService interface {
typeutil.Service typeutil.Service
RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
@ -38,15 +45,8 @@ type DataService interface {
} }
type ( type (
datanode struct { UniqueID = typeutil.UniqueID
nodeID int64 Timestamp = typeutil.Timestamp
address struct {
ip string
port int64
}
// todo add client
}
Server struct { Server struct {
ctx context.Context ctx context.Context
state internalpb2.StateCode state internalpb2.StateCode
@ -56,40 +56,70 @@ type (
statsHandler *statsHandler statsHandler *statsHandler
insertChannelMgr *insertChannelManager insertChannelMgr *insertChannelManager
allocator allocator allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer msgProducer *timesync.MsgProducer
nodeIDCounter int64
nodes []*datanode
registerFinishCh chan struct{} registerFinishCh chan struct{}
registerMu sync.RWMutex masterClient *masterservice.GrpcClient
ttMsgStream msgstream.MsgStream
} }
) )
func CreateServer(ctx context.Context) (*Server, error) { func CreateServer(ctx context.Context) (*Server, error) {
ch := make(chan struct{})
return &Server{ return &Server{
ctx: ctx, ctx: ctx,
state: internalpb2.StateCode_INITIALIZING, state: internalpb2.StateCode_INITIALIZING,
insertChannelMgr: newInsertChannelManager(), insertChannelMgr: newInsertChannelManager(),
nodeIDCounter: 0, registerFinishCh: ch,
nodes: make([]*datanode, 0), cluster: newDataNodeCluster(ch),
registerFinishCh: make(chan struct{}),
}, nil }, nil
} }
func (s *Server) Init() error { func (s *Server) Init() error {
Params.Init() Params.Init()
s.allocator = newAllocatorImpl() return nil
}
func (s *Server) Start() error {
if err := s.connectMaster(); err != nil {
return err
}
s.allocator = newAllocatorImpl(s.masterClient)
if err := s.initMeta(); err != nil { if err := s.initMeta(); err != nil {
return err return err
} }
s.statsHandler = newStatsHandler(s.meta) s.statsHandler = newStatsHandler(s.meta)
segAllocator, err := newSegmentAssigner(s.meta, s.allocator) segAllocator, err := newSegmentAllocator(s.meta, s.allocator)
if err != nil { if err != nil {
return err return err
} }
s.segAllocator = segAllocator s.segAllocator = segAllocator
s.waitDataNodeRegister()
if err = s.loadMetaFromMaster(); err != nil {
return err
}
if err = s.initMsgProducer(); err != nil { if err = s.initMsgProducer(); err != nil {
return err return err
} }
s.state = internalpb2.StateCode_HEALTHY
log.Println("start success")
return nil
}
func (s *Server) connectMaster() error {
log.Println("connecting to master")
master, err := masterservice.NewGrpcClient(Params.MasterAddress, 30*time.Second)
if err != nil {
return err
}
if err = master.Init(nil); err != nil {
return err
}
if err = master.Start(); err != nil {
return err
}
s.masterClient = master
log.Println("connect to master success")
return nil return nil
} }
@ -107,37 +137,109 @@ func (s *Server) initMeta() error {
return nil return nil
} }
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh
log.Println("all data nodes register")
}
func (s *Server) initMsgProducer() error { func (s *Server) initMsgProducer() error {
// todo ttstream and peerids // todo ttstream and peerids
timeTickBarrier := timesync.NewHardTimeTickBarrier(nil, nil) s.ttMsgStream = pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
// todo add watchers s.ttMsgStream.Start()
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier) timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher)
if err != nil { if err != nil {
return err return err
} }
s.msgProducer = producer s.msgProducer = producer
return nil
}
func (s *Server) Start() error {
s.waitDataNodeRegister()
// todo add load meta from master
s.msgProducer.Start(s.ctx) s.msgProducer.Start(s.ctx)
return nil return nil
} }
func (s *Server) loadMetaFromMaster() error {
func (s *Server) waitDataNodeRegister() { log.Println("loading collection meta from master")
<-s.registerFinishCh collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: -1, // todo
},
DbName: "",
})
if err != nil {
return err
}
for _, collectionName := range collections.CollectionNames {
collection, err := s.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: -1, // todo
Timestamp: 0, // todo
SourceID: -1, // todo
},
DbName: "",
CollectionName: collectionName,
})
if err != nil {
log.Println(err.Error())
continue
}
partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: -1, // todo
Timestamp: 0, // todo
SourceID: -1, // todo
},
DbName: "",
CollectionName: collectionName,
CollectionID: collection.CollectionID,
})
if err != nil {
log.Println(err.Error())
continue
}
err = s.meta.AddCollection(&collectionInfo{
ID: collection.CollectionID,
Schema: collection.Schema,
partitions: partitions.PartitionIDs,
})
if err != nil {
log.Println(err.Error())
continue
}
}
log.Println("load collection meta from master complete")
return nil
} }
func (s *Server) Stop() error { func (s *Server) Stop() error {
s.ttMsgStream.Close()
s.msgProducer.Close() s.msgProducer.Close()
return nil return nil
} }
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
// todo foreach datanode, call GetServiceStates resp := &internalpb2.ComponentStates{
return nil, nil State: &internalpb2.ComponentInfo{
NodeID: Params.NodeID,
Role: role,
StateCode: s.state,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
dataNodeStates, err := s.cluster.GetDataNodeStates()
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
resp.SubcomponentStates = dataNodeStates
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
} }
func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) { func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
@ -159,45 +261,27 @@ func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
} }
func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
s.registerMu.Lock() s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID)
defer s.registerMu.Unlock()
resp := &datapb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
if !s.checkDataNodeNotExist(req.Address.Ip, req.Address.Port) {
resp.Status.Reason = fmt.Sprintf("data node with address %s exist", req.Address.String())
return resp, nil
}
s.nodeIDCounter++
s.nodes = append(s.nodes, &datanode{
nodeID: s.nodeIDCounter,
address: struct {
ip string
port int64
}{ip: req.Address.Ip, port: req.Address.Port},
})
if s.nodeIDCounter == Params.DataNodeNum {
close(s.registerFinishCh)
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
// add init params // add init params
return resp, nil return &datapb.RegisterNodeResponse{
} Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
func (s *Server) checkDataNodeNotExist(ip string, port int64) bool { },
for _, node := range s.nodes { }, nil
if node.address.ip == ip || node.address.port == port {
return false
}
}
return true
} }
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) { func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
// todo call datanode flush success, fails := s.segAllocator.SealAllSegments(req.CollectionID)
return nil, nil log.Printf("sealing failed segments: %v", fails)
if !success {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("flush failed, %d segment can not be sealed", len(fails)),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
} }
func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) { func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
@ -264,7 +348,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
if err = s.meta.AddSegment(segmentInfo); err != nil { if err = s.meta.AddSegment(segmentInfo); err != nil {
return err return err
} }
if err = s.segAllocator.OpenSegment(collectionID, partitionID, segmentInfo.SegmentID, segmentInfo.InsertChannels); err != nil { if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
return err return err
} }
return nil return nil
@ -310,19 +394,19 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalp
resp.Values = ret resp.Values = ret
return resp, nil return resp, nil
} }
channelGroups, err := s.insertChannelMgr.AllocChannels(req.CollectionID, len(s.nodes)) channelGroups, err := s.insertChannelMgr.AllocChannels(req.CollectionID, s.cluster.GetNumOfNodes())
if err != nil { if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error() resp.Status.Reason = err.Error()
return resp, nil return resp, nil
} }
channels := make([]string, Params.InsertChannelNumPerCollection) channels := make([]string, Params.InsertChannelNumPerCollection)
for _, group := range channelGroups { for _, group := range channelGroups {
for _, c := range group { channels = append(channels, group...)
channels = append(channels, c)
} }
} s.cluster.WatchInsertChannels(channelGroups)
// todo datanode watch dm channels
resp.Values = channels resp.Values = channels
return resp, nil return resp, nil
} }

View File

@ -3,6 +3,9 @@ package dataservice
import ( import (
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
@ -14,6 +17,8 @@ type (
msgQueue chan *msgstream.TimeTickMsg msgQueue chan *msgstream.TimeTickMsg
} }
dataNodeTimeTickWatcher struct { dataNodeTimeTickWatcher struct {
meta *meta
cluster *dataNodeCluster
allocator segmentAllocator allocator segmentAllocator
msgQueue chan *msgstream.TimeTickMsg msgQueue chan *msgstream.TimeTickMsg
} }
@ -30,7 +35,7 @@ func (watcher *proxyTimeTickWatcher) StartBackgroundLoop(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Println("proxy time tick watcher clsoed") log.Println("proxy time tick watcher closed")
return return
case msg := <-watcher.msgQueue: case msg := <-watcher.msgQueue:
if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil { if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil {
@ -44,9 +49,11 @@ func (watcher *proxyTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) {
watcher.msgQueue <- msg watcher.msgQueue <- msg
} }
func newDataNodeTimeTickWatcher(allocator segmentAllocator) *dataNodeTimeTickWatcher { func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocator, cluster *dataNodeCluster) *dataNodeTimeTickWatcher {
return &dataNodeTimeTickWatcher{ return &dataNodeTimeTickWatcher{
meta: meta,
allocator: allocator, allocator: allocator,
cluster: cluster,
msgQueue: make(chan *msgstream.TimeTickMsg, 1), msgQueue: make(chan *msgstream.TimeTickMsg, 1),
} }
} }
@ -74,7 +81,21 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
continue continue
} }
if expired { if expired {
// TODO: flush segment segmentInfo, err := watcher.meta.GetSegment(id)
if err != nil {
log.Println(err.Error())
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: -1, // todo
},
CollectionID: segmentInfo.CollectionID,
SegmentIDs: []int64{segmentInfo.SegmentID},
})
watcher.allocator.DropSegment(id) watcher.allocator.DropSegment(id)
} }
} }

View File

@ -2,6 +2,10 @@ package dataservice
import ( import (
"context" "context"
"log"
"net"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/dataservice" "github.com/zilliztech/milvus-distributed/internal/dataservice"
@ -14,6 +18,46 @@ import (
type Service struct { type Service struct {
server *dataservice.Server server *dataservice.Server
ctx context.Context
cancel context.CancelFunc
grpcServer *grpc.Server
}
func NewGrpcService() {
s := &Service{}
var err error
s.ctx, s.cancel = context.WithCancel(context.Background())
s.server, err = dataservice.CreateServer(s.ctx)
if err != nil {
log.Fatalf("create server error: %s", err.Error())
return
}
s.grpcServer = grpc.NewServer()
datapb.RegisterDataServiceServer(s.grpcServer, s)
lis, err := net.Listen("tcp", "localhost:11111") // todo address
if err != nil {
log.Fatal(err.Error())
return
}
if err = s.grpcServer.Serve(lis); err != nil {
log.Fatal(err.Error())
return
}
}
func (s *Service) Init() error {
return s.server.Init()
}
func (s *Service) Start() error {
return s.server.Start()
}
func (s *Service) Stop() error {
err := s.server.Stop()
s.grpcServer.GracefulStop()
s.cancel()
return err
} }
func (s *Service) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { func (s *Service) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {

View File

@ -1,6 +1,7 @@
package memkv package memkv
import ( import (
"strings"
"sync" "sync"
"github.com/google/btree" "github.com/google/btree"
@ -110,7 +111,19 @@ func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []strin
// todo // todo
func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error) { func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("implement me") kv.Lock()
defer kv.Unlock()
keys := make([]string, 0)
values := make([]string, 0)
kv.tree.Ascend(func(i btree.Item) bool {
if strings.HasPrefix(i.(memoryKVItem).key, key) {
keys = append(keys, i.(memoryKVItem).key)
values = append(values, i.(memoryKVItem).value)
}
return true
})
return keys, values, nil
} }
func (kv *MemoryKV) Close() { func (kv *MemoryKV) Close() {

View File

@ -38,7 +38,7 @@ type (
} }
) )
func NewSoftTimeTickBarrier(ttStream *ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
if len(peerIds) <= 0 { if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n") log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
return nil return nil
@ -46,7 +46,7 @@ func NewSoftTimeTickBarrier(ttStream *ms.MsgStream, peerIds []UniqueID, minTtInt
sttbarrier := softTimeTickBarrier{} sttbarrier := softTimeTickBarrier{}
sttbarrier.minTtInterval = minTtInterval sttbarrier.minTtInterval = minTtInterval
sttbarrier.ttStream = *ttStream sttbarrier.ttStream = ttStream
sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp) sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
for _, id := range peerIds { for _, id := range peerIds {
@ -86,7 +86,9 @@ func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err()) log.Printf("[TtBarrierStart] %s\n", ctx.Err())
return return
case ttmsgs := <-ttBarrier.ttStream.Chan(): default:
}
ttmsgs := ttBarrier.ttStream.Consume()
if len(ttmsgs.Msgs) > 0 { if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs { for _, timetickmsg := range ttmsgs.Msgs {
ttmsg := timetickmsg.(*ms.TimeTickMsg) ttmsg := timetickmsg.(*ms.TimeTickMsg)
@ -111,7 +113,6 @@ func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
} }
} }
} }
}
} }
func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp { func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp {
@ -145,10 +146,11 @@ func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err()) log.Printf("[TtBarrierStart] %s\n", ctx.Err())
return return
case ttmsgs := <-ttBarrier.ttStream.Chan(): default:
}
ttmsgs := ttBarrier.ttStream.Consume()
if len(ttmsgs.Msgs) > 0 { if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs { for _, timetickmsg := range ttmsgs.Msgs {
// Suppose ttmsg.Timestamp from stream is always larger than the previous one, // Suppose ttmsg.Timestamp from stream is always larger than the previous one,
// that `ttmsg.Timestamp > oldT` // that `ttmsg.Timestamp > oldT`
ttmsg := timetickmsg.(*ms.TimeTickMsg) ttmsg := timetickmsg.(*ms.TimeTickMsg)
@ -174,7 +176,6 @@ func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
} }
} }
} }
}
} }
func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
@ -187,14 +188,14 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
return tempMin return tempMin
} }
func NewHardTimeTickBarrier(ttStream *ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
if len(peerIds) <= 0 { if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!") log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
return nil return nil
} }
sttbarrier := hardTimeTickBarrier{} sttbarrier := hardTimeTickBarrier{}
sttbarrier.ttStream = *ttStream sttbarrier.ttStream = ttStream
sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2Tt = make(map[UniqueID]Timestamp) sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)