diff --git a/internal/datacoord/allocator.go b/internal/datacoord/allocator.go index b42e8b373f..2e6922eab1 100644 --- a/internal/datacoord/allocator.go +++ b/internal/datacoord/allocator.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. + package datacoord import ( diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 56d09f9a76..92149570bd 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -28,38 +28,55 @@ import ( "go.uber.org/zap" ) +// clusterPrefix const for kv prefix storing DataNodeInfo const clusterPrefix = "cluster-prefix/" + +// clusterBuffer const for kv key storing buffer channels(no assigned ones) const clusterBuffer = "cluster-buffer" + +// nodeEventChBufferSize magic number for Event Channel buffer size const nodeEventChBufferSize = 1024 +// eventTimeout magic number for event timeout const eventTimeout = 5 * time.Second +// EventType enum for events type EventType int const ( - Register EventType = 1 - UnRegister EventType = 2 - WatchChannel EventType = 3 + // Register EventType const for data node registration + Register EventType = 1 + // UnRegister EventType const for data node unregistration + UnRegister EventType = 2 + // WatchChannel EventType const for a channel needs to be watched + WatchChannel EventType = 3 + // FlushSegments EventType const for flush specified segments FlushSegments EventType = 4 ) +// NodeEventType enum for node events type NodeEventType int const ( - Watch NodeEventType = 0 - Flush NodeEventType = 1 + // Watch NodeEventType const for assign channel to datanode for watching + Watch NodeEventType = 1 + // Flush NodeEventTYpe const for flush specified segments + Flush NodeEventType = 2 ) +// Event event wrapper contains EventType and related parameter type Event struct { Type EventType Data interface{} } +// WatchChannelParams Watch Event related parameter struct type WatchChannelParams struct { Channel string CollectionID UniqueID } +// Cluster handles all DataNode life-cycle and functional events type Cluster struct { ctx context.Context cancel context.CancelFunc @@ -75,33 +92,42 @@ type Cluster struct { eventCh chan *Event } +// ClusterOption helper function used when creating a Cluster type ClusterOption func(c *Cluster) +// withRegisterPolicy helper function setting registerPolicy func withRegisterPolicy(p dataNodeRegisterPolicy) ClusterOption { return func(c *Cluster) { c.registerPolicy = p } } +// withUnregistorPolicy helper function setting unregisterPolicy func withUnregistorPolicy(p dataNodeUnregisterPolicy) ClusterOption { return func(c *Cluster) { c.unregisterPolicy = p } } +// withAssignPolicy helper function setting assignPolicy func withAssignPolicy(p channelAssignPolicy) ClusterOption { return func(c *Cluster) { c.assignPolicy = p } } +// defaultRegisterPolicy returns default registerPolicy func defaultRegisterPolicy() dataNodeRegisterPolicy { return newAssignBufferRegisterPolicy() } +// defaultUnregisterPolicy returns default unregisterPolicy func defaultUnregisterPolicy() dataNodeUnregisterPolicy { return randomAssignRegisterFunc } +// defaultAssignPolicy returns default assignPolicy func defaultAssignPolicy() channelAssignPolicy { return newBalancedAssignPolicy() } // NewCluster creates a cluster with provided components +// triggers loadFromKV to load previous meta from KV if exists +// returns error when loadFromKV fails func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore, posProvider positionProvider, opts ...ClusterOption) (*Cluster, error) { ctx, cancel := context.WithCancel(ctx) @@ -122,13 +148,16 @@ func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore, opt(c) } - if err := c.loadFromKv(); err != nil { + if err := c.loadFromKV(); err != nil { return nil, err } return c, nil } -func (c *Cluster) loadFromKv() error { +// loadFromKV load pre-stored kv meta +// keys start with clusterPrefix stands for DataNodeInfos +// value bind to key clusterBuffer stands for Channels not assigned yet +func (c *Cluster) loadFromKV() error { _, values, err := c.kv.LoadWithPrefix(clusterPrefix) if err != nil { return err @@ -157,6 +186,9 @@ func (c *Cluster) loadFromKv() error { return nil } +// Flush triggers Flush event +// puts Event into buffered channel +// function returns not guarantee event processed func (c *Cluster) Flush(segments []*datapb.SegmentInfo) { c.eventCh <- &Event{ Type: FlushSegments, @@ -164,6 +196,9 @@ func (c *Cluster) Flush(segments []*datapb.SegmentInfo) { } } +// Register triggers Register event +// put Event into buffered channel +// function returns not guarantee event processed func (c *Cluster) Register(node *NodeInfo) { c.eventCh <- &Event{ Type: Register, @@ -171,6 +206,9 @@ func (c *Cluster) Register(node *NodeInfo) { } } +// UnRegister triggers UnRegister event +// put Event into buffered channel +// function returns not guarantee event processed func (c *Cluster) UnRegister(node *NodeInfo) { c.eventCh <- &Event{ Type: UnRegister, @@ -178,6 +216,9 @@ func (c *Cluster) UnRegister(node *NodeInfo) { } } +// Watch triggers Watch event +// put Event into buffered channel +// function returns not guarantee event processed func (c *Cluster) Watch(channel string, collectionID UniqueID) { c.eventCh <- &Event{ Type: WatchChannel, @@ -188,6 +229,7 @@ func (c *Cluster) Watch(channel string, collectionID UniqueID) { } } +// handleNodeEvent worker loop handles all node events func (c *Cluster) handleNodeEvent() { defer c.wg.Done() for { @@ -212,6 +254,7 @@ func (c *Cluster) handleNodeEvent() { } } +// handleEvent worker loop handles all events belongs to specified DataNode func (c *Cluster) handleEvent(node *NodeInfo) { log.Debug("start handle event", zap.Any("node", node)) ctx := node.ctx @@ -267,6 +310,8 @@ func (c *Cluster) handleEvent(node *NodeInfo) { } } +// getOrCreateClient get type.DataNode for specified data node +// if not connected yet, try to connect func (c *Cluster) getOrCreateClient(ctx context.Context, id UniqueID) (types.DataNode, error) { c.mu.Lock() node := c.nodes.GetNode(id) @@ -289,6 +334,7 @@ func (c *Cluster) getOrCreateClient(ctx context.Context, id UniqueID) (types.Dat return cli, nil } +// parseChannelsFromReq map-reduce to fetch channel names func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string { channels := make([]string, 0, len(req.GetVchannels())) for _, vc := range req.GetVchannels() { @@ -297,6 +343,8 @@ func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string { return channels } +// createClient create type.DataNode from specified address +// needs to be deprecated, since this function hard-corded DataNode to be grpc client func createClient(ctx context.Context, addr string) (types.DataNode, error) { cli, err := grpcdatanodeclient.NewClient(ctx, addr) if err != nil { @@ -327,6 +375,8 @@ func (c *Cluster) Startup(nodes []*NodeInfo) { } } +// updateCluster update nodes list +// separates them into new nodes list and offline nodes list func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlines []*NodeInfo) { var onCnt, offCnt float64 currentOnline := make(map[int64]struct{}) @@ -352,6 +402,8 @@ func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlin return } +// handleRegister handls register logic +// applies register policy and save result into kv store func (c *Cluster) handleRegister(n *NodeInfo) { c.mu.Lock() cNodes := c.nodes.GetNodes() @@ -374,6 +426,8 @@ func (c *Cluster) handleRegister(n *NodeInfo) { } } +// handleUnRegister handles datanode unregister logic +// applies unregisterPolicy and stores results into kv store func (c *Cluster) handleUnRegister(n *NodeInfo) { c.mu.Lock() node := c.nodes.GetNode(n.Info.GetVersion()) @@ -409,6 +463,8 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) { } } +// handleWatchChannel handls watch channel logic +// applies assignPolicy and saves results into kv store func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) { c.mu.Lock() cNodes := c.nodes.GetNodes() @@ -432,6 +488,8 @@ func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) { } } +// handleFlush handles flush logic +// finds corresponding data nodes and trigger Node Events func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) { m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs for _, seg := range segments { @@ -477,6 +535,8 @@ func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) { } } +// watch handles watch logic +// finds corresponding data nodes and trigger Node Events func (c *Cluster) watch(n *NodeInfo) { channelNames := make([]string, 0) uncompletes := make([]vchannel, 0, len(n.Info.Channels)) @@ -547,12 +607,14 @@ func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.Chan return c.kv.MultiSave(data) } +// GetNodes returns all nodes info in Cluster func (c *Cluster) GetNodes() []*NodeInfo { c.mu.Lock() defer c.mu.Unlock() return c.nodes.GetNodes() } +// Close dispose all nodes resources func (c *Cluster) Close() { c.cancel() c.wg.Wait() diff --git a/internal/datacoord/cluster_store.go b/internal/datacoord/cluster_store.go index 07c7dd0370..76c9defe7b 100644 --- a/internal/datacoord/cluster_store.go +++ b/internal/datacoord/cluster_store.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. + package datacoord import ( diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 217da62b2c..817768766b 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -60,12 +60,12 @@ func spyWatchPolicy(ch chan interface{}) channelAssignPolicy { } // a mock kv that always fail when LoadWithPrefix -type loadPrefixFailKv struct { +type loadPrefixFailKV struct { kv.TxnKV } // LoadWithPrefix override behavior -func (kv *loadPrefixFailKv) LoadWithPrefix(key string) ([]string, []string, error) { +func (kv *loadPrefixFailKV) LoadWithPrefix(key string) ([]string, []string, error) { return []string{}, []string{}, errors.New("mocked fail") } @@ -93,7 +93,7 @@ func TestClusterCreate(t *testing.T) { assert.EqualValues(t, "localhost:8080", dataNodes[0].Info.GetAddress()) t.Run("loadKv Fails", func(t *testing.T) { - fkv := &loadPrefixFailKv{TxnKV: memKv} + fkv := &loadPrefixFailKV{TxnKV: memKv} cluster, err := NewCluster(context.TODO(), fkv, spyClusterStore, dummyPosProvider{}) assert.NotNil(t, err) assert.Nil(t, cluster)