mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Load binlog for different field in parallel (#17005)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
ea2c395a38
commit
338edcd3cc
@ -310,15 +310,29 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs
|
|||||||
|
|
||||||
segmentType := segment.getType()
|
segmentType := segment.getType()
|
||||||
iCodec := storage.InsertCodec{}
|
iCodec := storage.InsertCodec{}
|
||||||
blobs := make([]*storage.Blob, 0)
|
|
||||||
|
// change all field bin log loading into concurrent
|
||||||
|
loadFutures := make([]*concurrency.Future, 0)
|
||||||
for _, fieldBinlog := range fieldBinlogs {
|
for _, fieldBinlog := range fieldBinlogs {
|
||||||
fieldBlobs, err := loader.loadFieldBinlogs(fieldBinlog)
|
futures := loader.loadFieldBinlogsAsync(fieldBinlog)
|
||||||
if err != nil {
|
loadFutures = append(loadFutures, futures...)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
blobs = append(blobs, fieldBlobs...)
|
|
||||||
|
// wait for async load result
|
||||||
|
blobs := make([]*storage.Blob, len(loadFutures))
|
||||||
|
for index, future := range loadFutures {
|
||||||
|
if !future.OK() {
|
||||||
|
return future.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blob := future.Value().(*storage.Blob)
|
||||||
|
blobs[index] = blob
|
||||||
|
}
|
||||||
|
log.Info("log field binlogs done",
|
||||||
|
zap.Int64("collection", segment.collectionID),
|
||||||
|
zap.Int64("segment", segment.segmentID),
|
||||||
|
zap.Any("field", fieldBinlogs))
|
||||||
|
|
||||||
_, _, insertData, err := iCodec.Deserialize(blobs)
|
_, _, insertData, err := iCodec.Deserialize(blobs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(err.Error())
|
log.Warn(err.Error())
|
||||||
@ -350,12 +364,8 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load binlogs concurrently into memory from KV storage
|
// Load binlogs concurrently into memory from KV storage asyncly
|
||||||
func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*storage.Blob, error) {
|
func (loader *segmentLoader) loadFieldBinlogsAsync(field *datapb.FieldBinlog) []*concurrency.Future {
|
||||||
log.Debug("load field binlogs",
|
|
||||||
zap.Int64("fieldID", field.FieldID),
|
|
||||||
zap.Int("len(binlogs)", len(field.Binlogs)))
|
|
||||||
|
|
||||||
futures := make([]*concurrency.Future, 0, len(field.Binlogs))
|
futures := make([]*concurrency.Future, 0, len(field.Binlogs))
|
||||||
for i := range field.Binlogs {
|
for i := range field.Binlogs {
|
||||||
path := field.Binlogs[i].GetLogPath()
|
path := field.Binlogs[i].GetLogPath()
|
||||||
@ -374,21 +384,7 @@ func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*sto
|
|||||||
|
|
||||||
futures = append(futures, future)
|
futures = append(futures, future)
|
||||||
}
|
}
|
||||||
|
return futures
|
||||||
blobs := make([]*storage.Blob, 0, len(field.Binlogs))
|
|
||||||
for _, future := range futures {
|
|
||||||
if !future.OK() {
|
|
||||||
return nil, future.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
blob := future.Value().(*storage.Blob)
|
|
||||||
blobs = append(blobs, blob)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug("log field binlogs done",
|
|
||||||
zap.Int64("fieldID", field.FieldID))
|
|
||||||
|
|
||||||
return blobs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (loader *segmentLoader) loadIndexedFieldData(segment *Segment, vecFieldInfos map[int64]*IndexedFieldInfo) error {
|
func (loader *segmentLoader) loadIndexedFieldData(segment *Segment, vecFieldInfos map[int64]*IndexedFieldInfo) error {
|
||||||
@ -587,7 +583,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(blobs) == 0 {
|
if len(blobs) == 0 {
|
||||||
log.Info("there are no delta logs saved with segment", zap.Any("segmentID", segment.segmentID))
|
log.Info("there are no delta logs saved with segment, skip loading delete record", zap.Any("segmentID", segment.segmentID))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, _, deltaData, err := dCodec.Deserialize(blobs)
|
_, _, deltaData, err := dCodec.Deserialize(blobs)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user