diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 0e2197725a..6a1690e6b4 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -35,7 +35,7 @@ const ( maxWatchDuration = 20 * time.Second ) -// ChannelManager manages the allocation and the balance of channels between datanodes +// ChannelManager manages the allocation and the balance between channels and data nodes. type ChannelManager struct { mu sync.RWMutex h Handler @@ -54,7 +54,7 @@ type channel struct { CollectionID UniqueID } -// ChannelManagerOpt is to set optional parameters in channel manager +// ChannelManagerOpt is to set optional parameters in channel manager. type ChannelManagerOpt func(c *ChannelManager) func withFactory(f ChannelPolicyFactory) ChannelManagerOpt { @@ -69,7 +69,7 @@ func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt { return func(c *ChannelManager) { c.msgstreamFactory = f } } -// NewChannelManager returns a new ChannelManager +// NewChannelManager creates and returns a new ChannelManager instance. func NewChannelManager( kv kv.TxnKV, h Handler, @@ -97,38 +97,43 @@ func NewChannelManager( return c, nil } -// Startup adjusts the channel store according to current cluster states +// Startup adjusts the channel store according to current cluster states. func (c *ChannelManager) Startup(nodes []int64) error { channels := c.store.GetNodesChannels() - olds := make([]int64, 0, len(channels)) + // Retrieve the current old nodes. + oNodes := make([]int64, 0, len(channels)) for _, c := range channels { - olds = append(olds, c.NodeID) + oNodes = append(oNodes, c.NodeID) } - newOnLines := c.getNewOnLines(nodes, olds) + // Add new online nodes to the cluster. + newOnLines := c.getNewOnLines(nodes, oNodes) for _, n := range newOnLines { if err := c.AddNode(n); err != nil { return err } } - offlines := c.getOffLines(nodes, olds) - for _, n := range offlines { + // Remove new offline nodes from the cluster. + offLines := c.getOffLines(nodes, oNodes) + for _, n := range offLines { if err := c.DeleteNode(n); err != nil { return err } } + // Unwatch and drop channel with drop flag. c.unwatchDroppedChannels() log.Debug("cluster start up", zap.Any("nodes", nodes), - zap.Any("olds", olds), + zap.Any("oNodes", oNodes), zap.Int64s("new onlines", newOnLines), - zap.Int64s("offLines", offlines)) + zap.Int64s("offLines", offLines)) return nil } +// unwatchDroppedChannels removes drops channel that are marked to drop. func (c *ChannelManager) unwatchDroppedChannels() { nodeChannels := c.store.GetNodesChannels() for _, nodeChannel := range nodeChannels { @@ -146,6 +151,7 @@ func (c *ChannelManager) unwatchDroppedChannels() { } } +// NOT USED. func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { timer := time.NewTicker(bgCheckInterval) for { @@ -168,7 +174,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { log.Debug("channel manager bg check reassign", zap.Array("updates", updates)) for _, update := range updates { if update.Type == Add { - c.fillChannelPosition(update) + c.fillChannelWatchInfo(update) } } @@ -181,6 +187,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { } } +// getNewOnLines returns a list of new online node ids in `curr` but not in `old`. func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 { mold := make(map[int64]struct{}) ret := make([]int64, 0, len(curr)) @@ -195,6 +202,7 @@ func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 { return ret } +// getOffLines returns a list of new offline node ids in `old` but not in `curr`. func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 { mcurr := make(map[int64]struct{}) ret := make([]int64, 0, len(old)) @@ -209,7 +217,7 @@ func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 { return ret } -// AddNode adds a new node in cluster +// AddNode adds a new node to cluster and reassigns the node - channel mapping. func (c *ChannelManager) AddNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() @@ -221,15 +229,15 @@ func (c *ChannelManager) AddNode(nodeID int64) error { zap.Int64("registered node", nodeID), zap.Array("updates", updates)) - for _, v := range updates { - if v.Type == Add { - c.fillChannelPosition(v) + for _, op := range updates { + if op.Type == Add { + c.fillChannelWatchInfo(op) } } return c.store.Update(updates) } -// DeleteNode deletes the node from the cluster +// DeleteNode deletes the node from the cluster. func (c *ChannelManager) DeleteNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() @@ -239,7 +247,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { return nil } - c.tryToUnsubscribe(nodeChannelInfo) + c.unsubAttempt(nodeChannelInfo) updates := c.deregisterPolicy(c.store, nodeID) log.Debug("deregister node", @@ -248,7 +256,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { for _, v := range updates { if v.Type == Add { - c.fillChannelPosition(v) + c.fillChannelWatchInfo(v) } } if err := c.store.Update(updates); err != nil { @@ -258,8 +266,9 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { return err } -func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) { - if nodeChannelInfo == nil { +// unsubAttempt attempts to unsubscribe node-channel info from the channel. +func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) { + if ncInfo == nil { return } @@ -268,32 +277,33 @@ func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) { return } - nodeID := nodeChannelInfo.NodeID - for _, ch := range nodeChannelInfo.Channels { - subscriptionName := subscriptionGenerator(ch.CollectionID, nodeID) - err := c.unsubscribe(subscriptionName, ch.Name) + nodeID := ncInfo.NodeID + for _, ch := range ncInfo.Channels { + subName := buildSubName(ch.CollectionID, nodeID) + err := c.unsubscribe(subName, ch.Name) if err != nil { - log.Warn("failed to unsubcribe topic", zap.String("subscription name", subscriptionName), zap.String("channel name", ch.Name)) + log.Warn("failed to unsubscribe topic", zap.String("subscription name", subName), zap.String("channel name", ch.Name)) } } } -func subscriptionGenerator(collectionID int64, nodeID int64) string { +// buildSubName generates a subscription name by concatenating DataNodeSubName, node ID and collection ID. +func buildSubName(collectionID int64, nodeID int64) string { return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID) } -func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error { +func (c *ChannelManager) unsubscribe(subName string, channel string) error { msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO()) if err != nil { return err } - msgStream.AsConsumer([]string{channel}, subscriptionName) + msgStream.AsConsumer([]string{channel}, subName) msgStream.Close() return nil } -// Watch try to add the channel to cluster. If the channel already exists, do nothing +// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists. func (c *ChannelManager) Watch(ch *channel) error { c.mu.Lock() defer c.mu.Unlock() @@ -308,7 +318,7 @@ func (c *ChannelManager) Watch(ch *channel) error { for _, v := range updates { if v.Type == Add { - c.fillChannelPosition(v) + c.fillChannelWatchInfo(v) } } err := c.store.Update(updates) @@ -322,19 +332,20 @@ func (c *ChannelManager) Watch(ch *channel) error { return nil } -func (c *ChannelManager) fillChannelPosition(update *ChannelOp) { - for _, ch := range update.Channels { - vchan := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) +// fillChannelWatchInfo updates the channel op by filling in channel watch info. +func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { + for _, ch := range op.Channels { + vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) info := &datapb.ChannelWatchInfo{ - Vchan: vchan, + Vchan: vcInfo, StartTs: time.Now().Unix(), State: datapb.ChannelWatchState_Uncomplete, } - update.ChannelWatchInfos = append(update.ChannelWatchInfos, info) + op.ChannelWatchInfos = append(op.ChannelWatchInfos, info) } } -// GetChannels gets channels info of registered nodes +// GetChannels gets channels info of registered nodes. func (c *ChannelManager) GetChannels() []*NodeChannelInfo { c.mu.RLock() defer c.mu.RUnlock() @@ -342,15 +353,15 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo { return c.store.GetNodesChannels() } -// GetBuffer gets buffer channels -func (c *ChannelManager) GetBuffer() *NodeChannelInfo { +// GetBufferChannels gets buffer channels. +func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo { c.mu.RLock() defer c.mu.RUnlock() return c.store.GetBufferChannelInfo() } -// Match checks whether nodeID and channel match +// Match checks and returns whether the node ID and channel match. func (c *ChannelManager) Match(nodeID int64, channel string) bool { c.mu.RLock() defer c.mu.RUnlock() @@ -368,7 +379,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool { return false } -// FindWatcher finds the datanode watching the provided channel +// FindWatcher finds the datanode watching the provided channel. func (c *ChannelManager) FindWatcher(channel string) (int64, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -392,7 +403,7 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) { return 0, errChannelNotWatched } -// RemoveChannel removes the channel from channel manager +// RemoveChannel removes the channel from channel manager. func (c *ChannelManager) RemoveChannel(channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -405,6 +416,7 @@ func (c *ChannelManager) RemoveChannel(channelName string) error { return c.remove(nodeID, ch) } +// remove deletes the nodeID-channel pair from data store. func (c *ChannelManager) remove(nodeID int64, ch *channel) error { var op ChannelOpSet op.Delete(nodeID, []*channel{ch}) diff --git a/internal/datacoord/channel_manager_factory.go b/internal/datacoord/channel_manager_factory.go index c227bf4d45..d9791c91c8 100644 --- a/internal/datacoord/channel_manager_factory.go +++ b/internal/datacoord/channel_manager_factory.go @@ -21,17 +21,17 @@ import ( "stathat.com/c/consistent" ) -// ChannelPolicyFactory is the abstract factory to create policies for channel manager +// ChannelPolicyFactory is the abstract factory that creates policies for channel manager. type ChannelPolicyFactory interface { - // NewRegisterPolicy create a new register policy + // NewRegisterPolicy creates a new register policy. NewRegisterPolicy() RegisterPolicy - // NewDeregisterPolicy create a new dereigster policy + // NewDeregisterPolicy creates a new deregister policy. NewDeregisterPolicy() DeregisterPolicy - // NewAssignPolicy create a new channel assign policy + // NewAssignPolicy creates a new channel assign policy. NewAssignPolicy() ChannelAssignPolicy - // NewReassignPolicy create a new channel reassign policy + // NewReassignPolicy creates a new channel reassign policy. NewReassignPolicy() ChannelReassignPolicy - // NewBgChecker create a new bakcground checker + // NewBgChecker creates a new background checker. NewBgChecker() ChannelBGChecker } @@ -40,27 +40,27 @@ type ChannelPolicyFactoryV1 struct { kv kv.TxnKV } -// NewChannelPolicyFactoryV1 helper function creates a Channel policy factory v1 from kv +// NewChannelPolicyFactoryV1 helper function creates a Channel policy factory v1 from kv. func NewChannelPolicyFactoryV1(kv kv.TxnKV) *ChannelPolicyFactoryV1 { return &ChannelPolicyFactoryV1{kv: kv} } -// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy +// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy. func (f *ChannelPolicyFactoryV1) NewRegisterPolicy() RegisterPolicy { return AvgAssignRegisterPolicy } -// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels +// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels. func (f *ChannelPolicyFactoryV1) NewDeregisterPolicy() DeregisterPolicy { return AvgAssignUnregisteredChannels } -// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy +// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy. func (f *ChannelPolicyFactoryV1) NewAssignPolicy() ChannelAssignPolicy { return AverageAssignPolicy } -// NewReassignPolicy implementing ChannelPolicyFactory returns AvarageReassginPolicy +// NewReassignPolicy implementing ChannelPolicyFactory returns AverageReassignPolicy. func (f *ChannelPolicyFactoryV1) NewReassignPolicy() ChannelReassignPolicy { return AverageReassignPolicy } diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 1d08bf86b3..725ad00dff 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -31,23 +31,20 @@ import ( const ( bufferID = math.MinInt64 - delimeter = "/" + delimiter = "/" maxOperationsPerTxn = 128 ) var errUnknownOpType = errors.New("unknown operation type") -// ChannelOpType type alias uses int8 stands for Channel operation type type ChannelOpType int8 const ( - // Add const value for Add Channel operation type Add ChannelOpType = iota - // Delete const value for Delete Channel operation type Delete ) -//ChannelOp is the operation to update the channel store +// ChannelOp is an individual ADD or DELETE operation to the channel store. type ChannelOp struct { Type ChannelOpType NodeID int64 @@ -55,10 +52,10 @@ type ChannelOp struct { ChannelWatchInfos []*datapb.ChannelWatchInfo } -// ChannelOpSet contains some channel update operations +// ChannelOpSet is a set of channel operations. type ChannelOpSet []*ChannelOp -// Add adds the operation which maps channels to node +// Add appends a single operation to add the mapping between a node and channels. func (cos *ChannelOpSet) Add(id int64, channels []*channel) { *cos = append(*cos, &ChannelOp{ NodeID: id, @@ -67,7 +64,7 @@ func (cos *ChannelOpSet) Add(id int64, channels []*channel) { }) } -// Delete removes the mapping between channels and node +// Delete appends a single operation to remove the mapping between a node and channels. func (cos *ChannelOpSet) Delete(id int64, channels []*channel) { *cos = append(*cos, &ChannelOp{ NodeID: id, @@ -76,48 +73,49 @@ func (cos *ChannelOpSet) Delete(id int64, channels []*channel) { }) } -// ROChannelStore is the read only channel store from which user can read the mapping between channels and node +// ROChannelStore is a read only channel store for channels and nodes. type ROChannelStore interface { - // GetNode gets the channel info of node + // GetNode returns the channel info of a specific node. GetNode(nodeID int64) *NodeChannelInfo - // GetChannels gets all channel infos + // GetChannels returns info of all channels. GetChannels() []*NodeChannelInfo - // GetNodesChannels gets the channels assigned to real nodes + // GetNodesChannels returns the channels that are assigned to nodes. GetNodesChannels() []*NodeChannelInfo - // GetBufferChannelInfo gets the unassigned channels + // GetBufferChannelInfo gets the unassigned channels. GetBufferChannelInfo() *NodeChannelInfo - // GetNodes gets all nodes id in store + // GetNodes gets all node ids in store. GetNodes() []int64 } -// RWChannelStore is the read write channel store which maintains the mapping between channels and node +// RWChannelStore is the read write channel store for channels and nodes. type RWChannelStore interface { ROChannelStore - // Reload restores the buffer channels and node-channels mapping form kv + // Reload restores the buffer channels and node-channels mapping form kv. Reload() error - // Add creates a new node-channels mapping, but no channels are assigned to this node + // Add creates a new node-channels mapping, with no channels assigned to the node. Add(nodeID int64) - // Delete removes nodeID and returns the channels + // Delete removes nodeID and returns its channels. Delete(nodeID int64) ([]*channel, error) - // Update applies the operations in ChannelOpSet + // Update applies the operations in ChannelOpSet. Update(op ChannelOpSet) error } +// ChannelStore must satisfy RWChannelStore. var _ RWChannelStore = (*ChannelStore)(nil) -// ChannelStore maintains the mapping relationship between channel and datanode +// ChannelStore maintains a mapping between channels and data nodes. type ChannelStore struct { - store kv.TxnKV - channelsInfo map[int64]*NodeChannelInfo + store kv.TxnKV // A kv store with (NodeChannelKey) -> (ChannelWatchInfos) information. + channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo). } -// NodeChannelInfo is the mapping between channels and node +// NodeChannelInfo stores the nodeID and its channels. type NodeChannelInfo struct { NodeID int64 Channels []*channel } -// NewChannelStore creates a new ChannelStore +// NewChannelStore creates and returns a new ChannelStore. func NewChannelStore(kv kv.TxnKV) *ChannelStore { c := &ChannelStore{ store: kv, @@ -130,7 +128,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore { return c } -// Reload restores the buffer channels and node-channels mapping from kv +// Reload restores the buffer channels and node-channels mapping from kv. func (c *ChannelStore) Reload() error { keys, values, err := c.store.LoadWithPrefix(Params.DataCoordCfg.ChannelWatchSubPath) if err != nil { @@ -139,27 +137,28 @@ func (c *ChannelStore) Reload() error { for i := 0; i < len(keys); i++ { k := keys[i] v := values[i] - nodeID, err := parseNodeID(k) + nodeID, err := parseNodeKey(k) if err != nil { return err } - temp := &datapb.ChannelWatchInfo{} - if err := proto.Unmarshal([]byte(v), temp); err != nil { + cw := &datapb.ChannelWatchInfo{} + if err := proto.Unmarshal([]byte(v), cw); err != nil { return err } c.Add(nodeID) channel := &channel{ - Name: temp.GetVchan().GetChannelName(), - CollectionID: temp.GetVchan().GetCollectionID(), + Name: cw.GetVchan().GetChannelName(), + CollectionID: cw.GetVchan().GetCollectionID(), } c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel) } return nil } -// Add creates a new node-channels mapping, but no channels are assigned to this node +// Add creates a new node-channels mapping for the given node, and assigns no channels to it. +// Returns immediately if the node's already in the channel. func (c *ChannelStore) Add(nodeID int64) { if _, ok := c.channelsInfo[nodeID]; ok { return @@ -171,7 +170,7 @@ func (c *ChannelStore) Add(nodeID int64) { } } -// Update applies the operations in opSet +// Update applies the channel operations in opSet. func (c *ChannelStore) Update(opSet ChannelOpSet) error { totalChannelNum := 0 for _, op := range opSet { @@ -180,8 +179,8 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error { if totalChannelNum <= maxOperationsPerTxn { return c.update(opSet) } - // split opset to many txn; same channel's operations should be executed in one txn. - channelsOpSet := make(map[string]ChannelOpSet) + // Split opset into multiple txn. Operations on the same channel must be executed in one txn. + perChOps := make(map[string]ChannelOpSet) for _, op := range opSet { for i, ch := range op.Channels { chOp := &ChannelOp{ @@ -192,14 +191,14 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error { if op.Type == Add { chOp.ChannelWatchInfos = []*datapb.ChannelWatchInfo{op.ChannelWatchInfos[i]} } - channelsOpSet[ch.Name] = append(channelsOpSet[ch.Name], chOp) + perChOps[ch.Name] = append(perChOps[ch.Name], chOp) } } - // execute a txn per 128 operations + // Execute a txn for every 128 operations. count := 0 operations := make([]*ChannelOp, 0, maxOperationsPerTxn) - for _, opset := range channelsOpSet { + for _, opset := range perChOps { if count+len(opset) > maxOperationsPerTxn { if err := c.update(operations); err != nil { return err @@ -216,28 +215,33 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error { return c.update(operations) } +// update applies the ADD/DELETE operations to the current channel store. func (c *ChannelStore) update(opSet ChannelOpSet) error { + // Update ChannelStore's kv store. if err := c.txn(opSet); err != nil { return err } - for _, v := range opSet { - switch v.Type { + // Update node id -> channel mapping. + for _, op := range opSet { + switch op.Type { case Add: - c.channelsInfo[v.NodeID].Channels = append(c.channelsInfo[v.NodeID].Channels, v.Channels...) + // Append target channels to channel store. + c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, op.Channels...) case Delete: - filter := make(map[string]struct{}) - for _, ch := range v.Channels { - filter[ch.Name] = struct{}{} + // Remove target channels from channel store. + del := make(map[string]struct{}) + for _, ch := range op.Channels { + del[ch.Name] = struct{}{} } - origin := c.channelsInfo[v.NodeID].Channels - res := make([]*channel, 0, len(origin)) - for _, ch := range origin { - if _, ok := filter[ch.Name]; !ok { - res = append(res, ch) + prev := c.channelsInfo[op.NodeID].Channels + curr := make([]*channel, 0, len(prev)) + for _, ch := range prev { + if _, ok := del[ch.Name]; !ok { + curr = append(curr, ch) } } - c.channelsInfo[v.NodeID].Channels = res + c.channelsInfo[op.NodeID].Channels = curr default: return errUnknownOpType } @@ -245,7 +249,7 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error { return nil } -// GetChannels gets all channel infos +// GetChannels returns information of all channels. func (c *ChannelStore) GetChannels() []*NodeChannelInfo { ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) for _, info := range c.channelsInfo { @@ -254,19 +258,18 @@ func (c *ChannelStore) GetChannels() []*NodeChannelInfo { return ret } -// GetNodesChannels gets the channels assigned to real nodes +// GetNodesChannels returns the channels assigned to real nodes. func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo { ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) for id, info := range c.channelsInfo { - if id == bufferID { - continue + if id != bufferID { + ret = append(ret, info) } - ret = append(ret, info) } return ret } -// GetBufferChannelInfo gets the unassigned channels +// GetBufferChannelInfo returns all unassigned channels. func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { for id, info := range c.channelsInfo { if id == bufferID { @@ -276,7 +279,7 @@ func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { return nil } -// GetNode gets the channel info of node +// GetNode returns the channel info of a given node. func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo { for id, info := range c.channelsInfo { if id == nodeID { @@ -286,7 +289,7 @@ func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo { return nil } -// Delete remove the nodeID and returns its channels +// Delete removes the given node from the channel store and returns its channels. func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) { for id, info := range c.channelsInfo { if id == nodeID { @@ -300,36 +303,37 @@ func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) { return nil, nil } -// GetNodes gets all nodes id in store +// GetNodes returns a slice of all nodes ids in the current channel store. func (c *ChannelStore) GetNodes() []int64 { ids := make([]int64, 0, len(c.channelsInfo)) for id := range c.channelsInfo { - if id == bufferID { - continue + if id != bufferID { + ids = append(ids, id) } - ids = append(ids, id) } return ids } +// remove deletes kv pairs from the kv store where keys have given nodeID as prefix. func (c *ChannelStore) remove(nodeID int64) error { - k := buildNodeKey(nodeID) + k := buildKeyPrefix(nodeID) return c.store.RemoveWithPrefix(k) } +// txn updates the channelStore's kv store with the given channel ops. func (c *ChannelStore) txn(opSet ChannelOpSet) error { saves := make(map[string]string) var removals []string - for _, update := range opSet { - for i, c := range update.Channels { - k := buildChannelKey(update.NodeID, c.Name) - switch update.Type { + for _, op := range opSet { + for i, ch := range op.Channels { + k := buildNodeChannelKey(op.NodeID, ch.Name) + switch op.Type { case Add: - val, err := proto.Marshal(update.ChannelWatchInfos[i]) + info, err := proto.Marshal(op.ChannelWatchInfos[i]) if err != nil { return err } - saves[k] = string(val) + saves[k] = string(info) case Delete: removals = append(removals, k) default: @@ -340,26 +344,30 @@ func (c *ChannelStore) txn(opSet ChannelOpSet) error { return c.store.MultiSaveAndRemove(saves, removals) } -func buildChannelKey(nodeID int64, channel string) string { - return fmt.Sprintf("%s%s%d%s%s", Params.DataCoordCfg.ChannelWatchSubPath, delimeter, nodeID, delimeter, channel) +// buildNodeChannelKey generates a key for kv store, where the key is a concatenation of ChannelWatchSubPath, nodeID and channel name. +func buildNodeChannelKey(nodeID int64, chName string) string { + return fmt.Sprintf("%s%s%d%s%s", Params.DataCoordCfg.ChannelWatchSubPath, delimiter, nodeID, delimiter, chName) } -func buildNodeKey(nodeID int64) string { - return fmt.Sprintf("%s%s%d", Params.DataCoordCfg.ChannelWatchSubPath, delimeter, nodeID) +// buildKeyPrefix generates a key *prefix* for kv store, where the key prefix is a concatenation of ChannelWatchSubPath and nodeID. +func buildKeyPrefix(nodeID int64) string { + return fmt.Sprintf("%s%s%d", Params.DataCoordCfg.ChannelWatchSubPath, delimiter, nodeID) } -func parseNodeID(key string) (int64, error) { - s := strings.Split(key, delimeter) +// parseNodeKey validates a given node key, then extracts and returns the corresponding node id on success. +func parseNodeKey(key string) (int64, error) { + s := strings.Split(key, delimiter) if len(s) < 2 { - return -1, fmt.Errorf("wrong channel key in etcd %s", key) + return -1, fmt.Errorf("wrong node key in etcd %s", key) } return strconv.ParseInt(s[len(s)-2], 10, 64) } -// ChannelOpTypeNames implements zap log marshaler for ChannelOpSet +// ChannelOpTypeNames implements zap log marshaller for ChannelOpSet. var ChannelOpTypeNames = []string{"Add", "Delete"} -// MarshalLogObject implements the interface ObjectMarshaler +// TODO: NIT: ObjectMarshaler -> ObjectMarshaller +// MarshalLogObject implements the interface ObjectMarshaler. func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddString("type", ChannelOpTypeNames[cu.Type]) enc.AddInt64("nodeID", cu.NodeID) @@ -376,7 +384,8 @@ func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } -// MarshalLogArray implements the interface of ArrayMarshaler of zap +// TODO: NIT: ArrayMarshaler -> ArrayMarshaller +// MarshalLogArray implements the interface of ArrayMarshaler of zap. func (cos ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error { for _, o := range cos { enc.AppendObject(o) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 73271184fa..9d8030d1da 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -41,7 +41,7 @@ func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager) return c } -// Startup inits the cluster +// Startup inits the cluster with the given data nodes. func (c *Cluster) Startup(nodes []*NodeInfo) error { for _, node := range nodes { c.sessionManager.AddSession(node) diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index ca3953910b..594add265c 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -186,7 +186,7 @@ func TestRegister(t *testing.T) { } err = cluster.Register(info) assert.Nil(t, err) - bufferChannels := channelManager.GetBuffer() + bufferChannels := channelManager.GetBufferChannels() assert.Empty(t, bufferChannels.Channels) nodeChannels := channelManager.GetChannels() assert.EqualValues(t, 1, len(nodeChannels)) @@ -297,7 +297,7 @@ func TestUnregister(t *testing.T) { assert.Nil(t, err) channels := channelManager.GetChannels() assert.Empty(t, channels) - channel := channelManager.GetBuffer() + channel := channelManager.GetBufferChannels() assert.NotNil(t, channel) assert.EqualValues(t, 1, len(channel.Channels)) assert.EqualValues(t, "ch_1", channel.Channels[0].Name) @@ -344,7 +344,7 @@ func TestWatchIfNeeded(t *testing.T) { channels := channelManager.GetChannels() assert.Empty(t, channels) - channel := channelManager.GetBuffer() + channel := channelManager.GetBufferChannels() assert.NotNil(t, channel) assert.EqualValues(t, "ch1", channel.Channels[0].Name) }) @@ -423,7 +423,7 @@ func TestConsistentHashPolicy(t *testing.T) { hash.Remove("3") err = cluster.UnRegister(nodeInfo3) assert.Nil(t, err) - bufferChannels := channelManager.GetBuffer() + bufferChannels := channelManager.GetBufferChannels() assert.EqualValues(t, 3, len(bufferChannels.Channels)) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index e4d8a47b9f..64d6deb367 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -374,7 +374,7 @@ func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) Cha return nil } -// AverageReassignPolicy is a reassign policy that evenly assign channels +// AverageReassignPolicy is a reassigning policy that evenly assign channels func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet { channels := store.GetNodesChannels() filterMap := make(map[int64]struct{}) @@ -444,7 +444,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { Channels: make([]*channel, 0), } for _, c := range ch.Channels { - k := buildChannelKey(ch.NodeID, c.Name) + k := buildNodeChannelKey(ch.NodeID, c.Name) v, err := kv.Load(k) if err != nil { return nil, err diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index c03ff410fa..02d514a8e8 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -377,7 +377,7 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) { getKv := func(watchInfos []*watch) kv.TxnKV { kv := memkv.NewMemoryKV() for _, info := range watchInfos { - k := buildChannelKey(info.nodeID, info.name) + k := buildNodeChannelKey(info.nodeID, info.name) v, _ := proto.Marshal(info.info) kv.Save(k, string(v)) } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 2744ef220f..2c75e6bd8c 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -185,7 +185,7 @@ func defaultFlushPolicy() flushPolicy { return flushPolicyV1 } -// newSegmentManager should be the only way to retrieve SegmentManager +// newSegmentManager should be the only way to retrieve SegmentManager. func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager { manager := &SegmentManager{ meta: meta, @@ -237,7 +237,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID segments = append(segments, segment) } - // apply allocate policy + // Apply allocation policy. maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID) if err != nil { return nil, err diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b7e890a6b1..1c997ba791 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -597,7 +597,7 @@ func (s *Server) startWatchService(ctx context.Context) { go s.watchService(ctx) } -// watchService watchs services +// watchService watches services. func (s *Server) watchService(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() @@ -793,6 +793,8 @@ func (s *Server) stopServerLoop() { // return fmt.Errorf("can not find channel %s", channelName) //} +// loadCollectionFromRootCoord communicates with RootCoord and asks for collection information. +// collection information will be added to server meta info. func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error { resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b7b7894810..2ae48e7b93 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -97,7 +97,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F return resp, nil } -// AssignSegmentID applies for segment ids and make allocation for records +// AssignSegmentID applies for segment ids and make allocation for records. func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { if s.isClosed() { return &datapb.AssignSegmentIDResponse{ @@ -117,6 +117,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.String("channelName", r.GetChannelName()), zap.Uint32("count", r.GetCount())) + // Load the collection info from Root Coordinator, if it is not found in server meta. if s.meta.GetCollection(r.GetCollectionID()) == nil { err := s.loadCollectionFromRootCoord(ctx, r.GetCollectionID()) if err != nil { @@ -125,18 +126,19 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI } } + // Add the channel to cluster for watching. s.cluster.Watch(r.ChannelName, r.CollectionID) - allocations, err := s.segmentManager.AllocSegment(ctx, + // Have segment manager allocate and return the segment allocation info. + segAlloc, err := s.segmentManager.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count)) if err != nil { log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err)) continue } + log.Debug("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segAlloc)) - log.Debug("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", allocations)) - - for _, allocation := range allocations { + for _, allocation := range segAlloc { result := &datapb.SegmentIDAssignment{ SegID: allocation.SegmentID, ChannelName: r.ChannelName, @@ -892,7 +894,7 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState return } -// WatchChannels notifies DataCoord to watch vchannels of a collection +// WatchChannels notifies DataCoord to watch vchannels of a collection. func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) { log.Debug("receive watch channels request", zap.Any("channels", req.GetChannelNames())) resp := &datapb.WatchChannelsResponse{ diff --git a/internal/kv/kv.go b/internal/kv/kv.go index ac11dc731c..7a9b242773 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -59,7 +59,7 @@ type ValueKV interface { Load(key string) (Value, error) } -// BaseKV contains base operations of kv. Include save, load and remove. +// BaseKV contains basic kv operations, including save, load and remove. type BaseKV interface { Load(key string) (string, error) MultiLoad(keys []string) ([]string, error) @@ -88,7 +88,7 @@ type TxnKV interface { MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error } -// MetaKv is TxnKV for meta data. It should save data with lease. +// MetaKv is TxnKV for metadata. It should save data with lease. type MetaKv interface { TxnKV GetPath(key string) string diff --git a/internal/proxy/segment.go b/internal/proxy/segment.go index 838ed48799..3c0a128ed6 100644 --- a/internal/proxy/segment.go +++ b/internal/proxy/segment.go @@ -329,22 +329,22 @@ func (sa *segIDAssigner) syncSegments() (bool, error) { var errMsg string now := time.Now() success := true - for _, info := range resp.SegIDAssignments { - if info.Status.GetErrorCode() != commonpb.ErrorCode_Success { - log.Debug("proxy", zap.String("SyncSegment Error", info.Status.Reason)) - errMsg += info.Status.Reason + for _, segAssign := range resp.SegIDAssignments { + if segAssign.Status.GetErrorCode() != commonpb.ErrorCode_Success { + log.Debug("proxy", zap.String("SyncSegment Error", segAssign.Status.Reason)) + errMsg += segAssign.Status.Reason errMsg += "\n" success = false continue } - assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) + assign, err := sa.getAssign(segAssign.CollectionID, segAssign.PartitionID, segAssign.ChannelName) segInfo2 := &segInfo{ - segID: info.SegID, - count: info.Count, - expireTime: info.ExpireTime, + segID: segAssign.SegID, + count: segAssign.Count, + expireTime: segAssign.ExpireTime, } if err != nil { - colInfos, ok := sa.assignInfos[info.CollectionID] + colInfos, ok := sa.assignInfos[segAssign.CollectionID] if !ok { colInfos = list.New() } @@ -352,13 +352,13 @@ func (sa *segIDAssigner) syncSegments() (bool, error) { segInfos.PushBack(segInfo2) assign = &assignInfo{ - collID: info.CollectionID, - partitionID: info.PartitionID, - channelName: info.ChannelName, + collID: segAssign.CollectionID, + partitionID: segAssign.PartitionID, + channelName: segAssign.ChannelName, segInfos: segInfos, } colInfos.PushBack(assign) - sa.assignInfos[info.CollectionID] = colInfos + sa.assignInfos[segAssign.CollectionID] = colInfos } else { assign.segInfos.PushBack(segInfo2) }