From 338edcd3cc77d103ccf91e94d04b65f9ee2f0efa Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Sun, 15 May 2022 16:11:54 +0800 Subject: [PATCH] Load binlog for different field in parallel (#17005) Signed-off-by: xiaofan-luan --- internal/querynode/segment_loader.go | 52 +++++++++++++--------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 5a2c18a90b..9fefe6c7f0 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -310,15 +310,29 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs segmentType := segment.getType() iCodec := storage.InsertCodec{} - blobs := make([]*storage.Blob, 0) + + // change all field bin log loading into concurrent + loadFutures := make([]*concurrency.Future, 0) for _, fieldBinlog := range fieldBinlogs { - fieldBlobs, err := loader.loadFieldBinlogs(fieldBinlog) - if err != nil { - return err - } - blobs = append(blobs, fieldBlobs...) + futures := loader.loadFieldBinlogsAsync(fieldBinlog) + loadFutures = append(loadFutures, futures...) } + // wait for async load result + blobs := make([]*storage.Blob, len(loadFutures)) + for index, future := range loadFutures { + if !future.OK() { + return future.Err() + } + + blob := future.Value().(*storage.Blob) + blobs[index] = blob + } + log.Info("log field binlogs done", + zap.Int64("collection", segment.collectionID), + zap.Int64("segment", segment.segmentID), + zap.Any("field", fieldBinlogs)) + _, _, insertData, err := iCodec.Deserialize(blobs) if err != nil { log.Warn(err.Error()) @@ -350,12 +364,8 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs } } -// Load binlogs concurrently into memory from KV storage -func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*storage.Blob, error) { - log.Debug("load field binlogs", - zap.Int64("fieldID", field.FieldID), - zap.Int("len(binlogs)", len(field.Binlogs))) - +// Load binlogs concurrently into memory from KV storage asyncly +func (loader *segmentLoader) loadFieldBinlogsAsync(field *datapb.FieldBinlog) []*concurrency.Future { futures := make([]*concurrency.Future, 0, len(field.Binlogs)) for i := range field.Binlogs { path := field.Binlogs[i].GetLogPath() @@ -374,21 +384,7 @@ func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*sto futures = append(futures, future) } - - blobs := make([]*storage.Blob, 0, len(field.Binlogs)) - for _, future := range futures { - if !future.OK() { - return nil, future.Err() - } - - blob := future.Value().(*storage.Blob) - blobs = append(blobs, blob) - } - - log.Debug("log field binlogs done", - zap.Int64("fieldID", field.FieldID)) - - return blobs, nil + return futures } func (loader *segmentLoader) loadIndexedFieldData(segment *Segment, vecFieldInfos map[int64]*IndexedFieldInfo) error { @@ -587,7 +583,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb } } if len(blobs) == 0 { - log.Info("there are no delta logs saved with segment", zap.Any("segmentID", segment.segmentID)) + log.Info("there are no delta logs saved with segment, skip loading delete record", zap.Any("segmentID", segment.segmentID)) return nil } _, _, deltaData, err := dCodec.Deserialize(blobs)