From ec9ccd8bd15350a3fde0dec49000c8df32689dc6 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 19 Aug 2021 15:06:12 +0800 Subject: [PATCH] Fix DropCollection not processed by datanode (#7151) Signed-off-by: Congqi Xia --- internal/rootcoord/dml_channels.go | 53 ++++++++++-------------------- internal/rootcoord/task.go | 5 +++ internal/rootcoord/timeticksync.go | 2 +- 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 18abbc2484..e376ffcba7 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -20,37 +20,28 @@ import ( "go.uber.org/zap" ) -type dmlStream struct { - msgStream msgstream.MsgStream - valid bool -} - type dmlChannels struct { core *Core lock sync.RWMutex - dml map[string]*dmlStream + dml map[string]msgstream.MsgStream } func newDMLChannels(c *Core) *dmlChannels { return &dmlChannels{ core: c, lock: sync.RWMutex{}, - dml: make(map[string]*dmlStream), + dml: make(map[string]msgstream.MsgStream), } } -func (d *dmlChannels) GetNumChannles() int { +// GetNumChannels get current dml channel count +func (d *dmlChannels) GetNumChannels() int { d.lock.RLock() defer d.lock.RUnlock() - count := 0 - for _, ds := range d.dml { - if ds.valid { - count++ - } - } - return count + return len(d.dml) } +// ListChannels lists all dml channel names func (d *dmlChannels) ListChannels() []string { d.lock.RLock() defer d.lock.RUnlock() @@ -63,6 +54,7 @@ func (d *dmlChannels) ListChannels() []string { } +// Produce produces msg pack into specified channel func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { d.lock.Lock() defer d.lock.Unlock() @@ -71,16 +63,13 @@ func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { if !ok { return fmt.Errorf("channel %s not exist", name) } - if err := ds.msgStream.Produce(pack); err != nil { + if err := ds.Produce(pack); err != nil { return err } - if !ds.valid { - ds.msgStream.Close() - delete(d.dml, name) - } return nil } +// Broadcast broadcasts msg pack into specified channel func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { d.lock.Lock() defer d.lock.Unlock() @@ -89,16 +78,13 @@ func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { if !ok { return fmt.Errorf("channel %s not exist", name) } - if err := ds.msgStream.Broadcast(pack); err != nil { + if err := ds.Broadcast(pack); err != nil { return err } - if !ds.valid { - ds.msgStream.Close() - delete(d.dml, name) - } return nil } +// BroadcastAll invoke broadcast with provided msg pack in all channels specified func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error { d.lock.Lock() defer d.lock.Unlock() @@ -108,17 +94,14 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e if !ok { return fmt.Errorf("channel %s not exist", ch) } - if err := ds.msgStream.Broadcast(pack); err != nil { + if err := ds.Broadcast(pack); err != nil { return err } - if !ds.valid { - ds.msgStream.Close() - delete(d.dml, ch) - } } return nil } +// AddProducerChannels add named channels as producer func (d *dmlChannels) AddProducerChannels(names ...string) { d.lock.Lock() defer d.lock.Unlock() @@ -133,22 +116,20 @@ func (d *dmlChannels) AddProducerChannels(names ...string) { continue } ms.AsProducer([]string{name}) - d.dml[name] = &dmlStream{ - msgStream: ms, - valid: true, - } + d.dml[name] = ms } } } +// RemoveProducerChannels removes specified channels func (d *dmlChannels) RemoveProducerChannels(names ...string) { d.lock.Lock() defer d.lock.Unlock() for _, name := range names { - //log.Debug("delete dml channel", zap.String("channel name", name)) if ds, ok := d.dml[name]; ok { - ds.valid = false + ds.Close() + delete(d.dml, name) } } } diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 2c74b00305..111e302982 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -286,6 +286,11 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) + for _, chanName := range collMeta.PhysicalChannelNames { + // send tt into deleted channels to tell data_node to clear flowgragh + t.core.chanTimeTick.SendChannelTimeTick(chanName, ts) + } + // remove dml channel after send dd msg t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) return nil diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index facea9124b..d9e2ed3f68 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -292,7 +292,7 @@ func (t *timetickSync) GetProxyNum() int { // GetChanNum return the num of channel func (t *timetickSync) GetChanNum() int { - return t.core.dmlChannels.GetNumChannles() + return t.core.dmlChannels.GetNumChannels() } func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {