From eb557b289b52ce50ae67b12a806df1c133362c8d Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Sat, 8 May 2021 15:24:12 +0800 Subject: [PATCH] Add docs in package datanode (#5117) Signed-off-by: yangxuan --- build/ci/jenkins/pod/krte.yaml | 2 +- go.mod | 3 ++ internal/datanode/binlog_meta.go | 29 ++++++++------ internal/datanode/collection.go | 1 + internal/datanode/collection_replica.go | 10 +++++ internal/datanode/data_node.go | 50 +++++++++++++++++++++---- internal/datanode/meta_service.go | 3 ++ 7 files changed, 77 insertions(+), 21 deletions(-) diff --git a/build/ci/jenkins/pod/krte.yaml b/build/ci/jenkins/pod/krte.yaml index 3159a09f42..07a31b91a9 100644 --- a/build/ci/jenkins/pod/krte.yaml +++ b/build/ci/jenkins/pod/krte.yaml @@ -20,7 +20,7 @@ spec: ephemeral-storage: "20Gi" requests: cpu: "4" - memory: 8Gi + memory: 12Gi ephemeral-storage: "20Gi" volumeMounts: - mountPath: /docker-graph diff --git a/go.mod b/go.mod index b795768eb8..121105ae0e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,9 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/antonmedv/expr v1.8.9 github.com/apache/pulsar-client-go v0.4.0 + github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect + github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect + github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index 2b44aca763..4eb56ed9b8 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -20,9 +20,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" ) -// ddl binlog meta key: +// binlogMeta persists binlog paths into etcd. +// ddl binlog etcd meta key: // ${prefix}/${collectionID}/${idx} -// segment binlog meta key: +// segment binlog etcd meta key: // ${prefix}/${segmentID}/${fieldID}/${idx} type binlogMeta struct { client kv.TxnKV // etcd kv @@ -37,8 +38,9 @@ func NewBinlogMeta(kv kv.TxnKV, idAllocator allocatorInterface) (*binlogMeta, er return mt, nil } -// if alloc is true, the returned keys will have a generated-unique ID at the end. -// if alloc is false, the returned keys will only consist of provided ids. +// genKey gives a valid key string for lists of UniqueIDs: +// if alloc is true, the returned keys will have a generated-unique ID at the end. +// if alloc is false, the returned keys will only consist of provided ids. func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error) { if alloc { idx, err := bm.idAllocator.allocID() @@ -57,22 +59,25 @@ func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error return } +// SaveSegmentBinlogMetaTxn stores all fields' binlog paths of a segment in a transaction. +// segment binlog etcd meta key: +// ${prefix}/${segmentID}/${fieldID}/${idx} func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID]string) error { - kvs := make(map[string]string, len(field2Path)) + etcdKey2binlogPath := make(map[string]string, len(field2Path)) for fieldID, p := range field2Path { key, err := bm.genKey(true, segmentID, fieldID) if err != nil { return err } - v := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{ + binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{ FieldID: fieldID, BinlogPath: p, }) - kvs[path.Join(Params.SegFlushMetaSubPath, key)] = v + etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath } - return bm.client.MultiSave(kvs) + return bm.client.MultiSave(etcdKey2binlogPath) } func (bm *binlogMeta) getFieldBinlogMeta(segmentID UniqueID, @@ -123,21 +128,21 @@ func (bm *binlogMeta) getSegmentBinlogMeta(segmentID UniqueID) (metas []*datapb. return } +// SaveDDLBinlogMetaTxn stores timestamp and ddl binlog path pair into etcd in a transaction. // ddl binlog meta key: // ${prefix}/${collectionID}/${idx} -// --- DDL --- func (bm *binlogMeta) SaveDDLBinlogMetaTxn(collID UniqueID, tsPath string, ddlPath string) error { - k, err := bm.genKey(true, collID) + uniqueKey, err := bm.genKey(true, collID) if err != nil { return err } - v := proto.MarshalTextString(&datapb.DDLBinlogMeta{ + binlogPathPair := proto.MarshalTextString(&datapb.DDLBinlogMeta{ DdlBinlogPath: ddlPath, TsBinlogPath: tsPath, }) - return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, k), v) + return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, uniqueKey), binlogPathPair) } func (bm *binlogMeta) getDDLBinlogMete(collID UniqueID) (metas []*datapb.DDLBinlogMeta, err error) { diff --git a/internal/datanode/collection.go b/internal/datanode/collection.go index d6925738b0..fc0151cb23 100644 --- a/internal/datanode/collection.go +++ b/internal/datanode/collection.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/schemapb" ) +// Collection stuct is the data structure of collections in data node replica. type Collection struct { schema *schemapb.CollectionSchema id UniqueID diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 79c3d5234b..bb09e39945 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -44,6 +44,7 @@ type Replica interface { getSegmentByID(segmentID UniqueID) (*Segment, error) } +// Segment is the data structure of segments in data node replica. type Segment struct { segmentID UniqueID collectionID UniqueID @@ -60,6 +61,8 @@ type Segment struct { channelName string } +// CollectionSegmentReplica is the data replication of persistent data in datanode. +// It implements `Replica` interface. type CollectionSegmentReplica struct { mu sync.RWMutex segments map[UniqueID]*Segment @@ -89,6 +92,9 @@ func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Se } +// `addSegment` add a new segment into replica when data node see the segment +// for the first time in insert channels. It sets the startPosition of a segment, and +// flags `isNew=true` func (replica *CollectionSegmentReplica) addSegment( segmentID UniqueID, collID UniqueID, @@ -179,6 +185,7 @@ func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endP return fmt.Errorf("There's no segment %v", segmentID) } +// `updateStatistics` updates the number of rows of a segment in replica. func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error { replica.mu.Lock() defer replica.mu.Unlock() @@ -193,6 +200,9 @@ func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, nu return fmt.Errorf("There's no segment %v", segmentID) } +// `getSegmentStatisticsUpdates` gives current segment's statistics updates. +// if the segment's flag `isNew` is true, updates will contain a valid start position. +// if the segment's flag `isFlushed` is true, updates will contain a valid end position. func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) { replica.mu.Lock() defer replica.mu.Unlock() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 2f7cbc7782..f681d03d3c 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -9,6 +9,9 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. +// Package datanode implements data persistence logic. +// +// Data node persists definition language (ddl) strings and insert logs into persistent storage like minIO/S3. package datanode import ( @@ -37,6 +40,22 @@ const ( RPCConnectionTimeout = 30 * time.Second ) +// DataNode struct communicates with outside services and unioun all +// services of data node. +// +// DataNode struct implements `types.Component`, `types.DataNode` interfaces. +// `dataSyncService` controls flowgraph in datanode. +// `metaService` initialize collections from master service when data node starts. +// `masterService` holds a grpc client of master service. +// `dataService` holds a grpc client of data service. +// +// `NodeID` is unique to each data node. +// +// `State` is current statement of this data node, indicating whether it's healthy. +// +// `flushChan` transfer flush messages from data service to flowgraph of data node. +// +// `replica` holds replications of persistent data, including collections and segments. type DataNode struct { ctx context.Context cancel context.CancelFunc @@ -59,6 +78,7 @@ type DataNode struct { msFactory msgstream.Factory } +// NewDataNode will return a DataNode with abnormal state. func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { rand.Seed(time.Now().UnixNano()) ctx2, cancel2 := context.WithCancel(ctx) @@ -79,6 +99,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { return node } +// SetMasterServiceInterface sets master service's grpc client, error is returned if repeatedly set. func (node *DataNode) SetMasterServiceInterface(ms types.MasterService) error { switch { case ms == nil, node.masterService != nil: @@ -89,6 +110,7 @@ func (node *DataNode) SetMasterServiceInterface(ms types.MasterService) error { } } +// SetDataServiceInterface sets data service's grpc client, error is returned if repeatedly set. func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { switch { case ds == nil, node.dataService != nil: @@ -99,7 +121,16 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { } } -// Suppose dataservice is in INITIALIZING +// Init function supposes data service is in INITIALIZING state. +// +// In Init process, data node will register itself to data service with its node id +// and address. Therefore, `SetDataServiceInterface()` must be called before this func. +// Registering return several channel names data node need. +// +// After registering, data node will wait until data service calls `WatchDmChannels` +// for `RPCConnectionTimeout` ms. +// +// At last, data node initializes its `dataSyncService` and `metaService`. func (node *DataNode) Init() error { ctx := context.Background() @@ -121,13 +152,6 @@ func (node *DataNode) Init() error { return fmt.Errorf("Receive error when registering data node, msg: %s", resp.Status.Reason) } - select { - case <-time.After(RPCConnectionTimeout): - return errors.New("Get DmChannels failed in 30 seconds") - case <-node.watchDm: - log.Debug("insert channel names set") - } - for _, kv := range resp.InitParams.StartParams { switch kv.Key { case "DDChannelName": @@ -143,6 +167,13 @@ func (node *DataNode) Init() error { } } + select { + case <-time.After(RPCConnectionTimeout): + return errors.New("Get DmChannels failed in 30 seconds") + case <-node.watchDm: + log.Debug("insert channel names set") + } + replica := newReplica() var alloc allocatorInterface = newAllocator(node.masterService) @@ -158,6 +189,7 @@ func (node *DataNode) Init() error { return nil } +// Start `metaService` and `dataSyncService` and update state to HEALTHY func (node *DataNode) Start() error { node.metaService.init() go node.dataSyncService.start() @@ -169,6 +201,7 @@ func (node *DataNode) UpdateStateCode(code internalpb.StateCode) { node.State.Store(code) } +// WatchDmChannels set insert channel names data node subscribs to. func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -205,6 +238,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo return states, nil } +// FlushSegments packs flush messages into flowgraph through flushChan. func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs))) ids := make([]UniqueID, 0) diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 734b0f74ae..6e5f72f484 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -26,6 +26,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" ) +// metaService initialize replica collections in data node from master service. +// Initializing replica collections happens on data node starting. It depends on +// a healthy master service and a valid master service grpc client. type metaService struct { ctx context.Context replica Replica