diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c72054ed20..b921f9e713 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -27,6 +27,7 @@ import ( "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -670,7 +671,15 @@ func (t *flushBufferInsertTask) flushInsertData() error { defer cancel() if t.ChunkManager != nil && len(t.data) > 0 { tr := timerecord.NewTimeRecorder("insertData") - err := t.MultiWrite(ctx, t.data) + group, ctx := errgroup.WithContext(ctx) + for key, data := range t.data { + key := key + data := data + group.Go(func() error { + return t.Write(ctx, key, data) + }) + } + err := group.Wait() metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) if err == nil { for _, d := range t.data {