diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 6dfc33db40..9161af08c8 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -349,6 +349,11 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni if err != nil { return err } + // get memory size of buffer data + fieldMemorySize := make(map[int64]int) + for fieldID, fieldData := range data.buffer.Data { + fieldMemorySize[fieldID] = fieldData.GetMemorySize() + } // encode data and convert output data inCodec := storage.NewInsertCodec(meta) @@ -385,7 +390,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni TimestampFrom: 0, //TODO TimestampTo: 0, //TODO, LogPath: key, - LogSize: int64(len(blob.Value)), + LogSize: int64(fieldMemorySize[fieldID]), } field2Logidx[fieldID] = logidx } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index cc3e27c80a..b3cb9b546a 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -467,6 +467,12 @@ func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, erro } it.tr.Record("index serialize done") + // use serialized size before encoding + it.serializedSize = 0 + for _, blob := range indexBlobs { + it.serializedSize += uint64(len(blob.Value)) + } + // early release index for gc, and we can ensure that Delete is idempotent. if err := it.index.Delete(); err != nil { log.Error("IndexNode IndexBuildTask Execute CIndexDelete failed", @@ -498,10 +504,6 @@ func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, erro func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob) error { blobCnt := len(blobs) - it.serializedSize = 0 - for i := range blobs { - it.serializedSize += uint64(len(blobs[i].Value)) - } getSavePathByKey := func(key string) string { return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)),