mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Make write binlog in parallel (#26325)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
709352f96c
commit
fbb5d32cb6
@ -27,6 +27,7 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
@ -670,7 +671,15 @@ func (t *flushBufferInsertTask) flushInsertData() error {
|
||||
defer cancel()
|
||||
if t.ChunkManager != nil && len(t.data) > 0 {
|
||||
tr := timerecord.NewTimeRecorder("insertData")
|
||||
err := t.MultiWrite(ctx, t.data)
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
for key, data := range t.data {
|
||||
key := key
|
||||
data := data
|
||||
group.Go(func() error {
|
||||
return t.Write(ctx, key, data)
|
||||
})
|
||||
}
|
||||
err := group.Wait()
|
||||
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
if err == nil {
|
||||
for _, d := range t.data {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user