From f85271cf3f599ad6a927aaeaba6b23e95abd1c8f Mon Sep 17 00:00:00 2001 From: dragondriver Date: Tue, 12 Oct 2021 17:00:34 +0800 Subject: [PATCH] Estimate memory size by descriptor event (#9688) Signed-off-by: dragondriver --- internal/storage/binlog_reader.go | 15 +- internal/storage/event_writer.go | 14 ++ internal/storage/event_writer_test.go | 20 ++ internal/storage/utils.go | 64 +++++ internal/storage/utils_test.go | 321 ++++++++++++++++++++++++++ 5 files changed, 422 insertions(+), 12 deletions(-) diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go index 99beb997f9..f3e4f6d667 100644 --- a/internal/storage/binlog_reader.go +++ b/internal/storage/binlog_reader.go @@ -13,10 +13,6 @@ package storage import ( "bytes" - "encoding/binary" - "fmt" - "strconv" - "errors" ) @@ -47,14 +43,9 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) { } func (reader *BinlogReader) readMagicNumber() (int32, error) { - if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil { - return -1, err - } - if reader.magicNumber != MagicNumber { - return -1, fmt.Errorf("parse magic number failed, expected: %s, actual: %s", strconv.Itoa(int(MagicNumber)), strconv.Itoa(int(reader.magicNumber))) - } - - return reader.magicNumber, nil + var err error + reader.magicNumber, err = readMagicNumber(reader.buffer) + return reader.magicNumber, err } func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) { diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 3568ee2029..2b369a47e0 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -15,7 +15,9 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" + "strconv" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -77,6 +79,18 @@ func (event *descriptorEvent) Write(buffer io.Writer) error { return nil } +func readMagicNumber(buffer io.Reader) (int32, error) { + var magicNumber int32 + if err := binary.Read(buffer, binary.LittleEndian, &magicNumber); err != nil { + return -1, err + } + if magicNumber != MagicNumber { + return -1, fmt.Errorf("parse magic number failed, expected: %s, actual: %s", strconv.Itoa(int(MagicNumber)), strconv.Itoa(int(magicNumber))) + } + + return magicNumber, nil +} + func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error) { header, err := readDescriptorEventHeader(buffer) if err != nil { diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go index c055c91874..5772328772 100644 --- a/internal/storage/event_writer_test.go +++ b/internal/storage/event_writer_test.go @@ -85,3 +85,23 @@ func TestEventWriter(t *testing.T) { err = insertEvent.Close() assert.Nil(t, err) } + +func TestReadMagicNumber(t *testing.T) { + var err error + buf := bytes.Buffer{} + + // eof + _, err = readMagicNumber(&buf) + assert.Error(t, err) + + // not a magic number + _ = binary.Write(&buf, binary.LittleEndian, MagicNumber+1) + _, err = readMagicNumber(&buf) + assert.Error(t, err) + + // normal case + _ = binary.Write(&buf, binary.LittleEndian, MagicNumber) + num, err := readMagicNumber(&buf) + assert.NoError(t, err) + assert.Equal(t, MagicNumber, num) +} diff --git a/internal/storage/utils.go b/internal/storage/utils.go index f99595eea0..f0ed4d178c 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -15,6 +15,7 @@ import ( "bytes" "encoding/binary" "fmt" + "strconv" "github.com/milvus-io/milvus/internal/kv" ) @@ -64,3 +65,66 @@ func GetBinlogSize(kv kv.DataKV, key string) (int64, error) { endPos = startPos + headerSize } } + +// EstimateMemorySize get approximate memory size of a binlog file. +// 1, key not exist, size = 0, error != nil; +// 2, failed to read event header, size = 0, error != nil; +// 3, invalid event length, size = 0, error != nil; +// 4, failed to read descriptor event, size = 0, error != nil; +// 5, original_size not in extra, size = 0, error != nil; +// 6, original_size not in int format, size = 0, error != nil; +// 7, normal binlog with original_size, return original_size, error = nil; +func EstimateMemorySize(kv kv.DataKV, key string) (int64, error) { + total := int64(0) + + header := &eventHeader{} + headerSize := binary.Size(header) + + startPos := binary.Size(MagicNumber) + endPos := startPos + headerSize + + // get header + headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos)) + if err != nil { + return total, err + } + + buffer := bytes.NewBuffer(headerContent) + + header, err = readEventHeader(buffer) + if err != nil { + return total, err + } + + if header.EventLength <= 0 { + return total, fmt.Errorf("key %v not in binlog format", key) + } + + desc := &descriptorEvent{} + endPos = startPos + int(header.EventLength) + descContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos)) + if err != nil { + return total, err + } + + buffer = bytes.NewBuffer(descContent) + + desc, err = ReadDescriptorEvent(buffer) + if err != nil { + return total, err + } + + sizeStr, ok := desc.Extras[originalSizeKey] + if !ok { + return total, fmt.Errorf("key %v not in extra information", originalSizeKey) + } + + size, err := strconv.Atoi(fmt.Sprintf("%v", sizeStr)) + if err != nil { + return total, fmt.Errorf("%v not in valid format, value: %v", originalSizeKey, sizeStr) + } + + total = int64(size) + + return total, nil +} diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index 3a438fda79..b507e2f2d2 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -15,6 +15,9 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "encoding/json" + "errors" + "fmt" "testing" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -210,3 +213,321 @@ func TestGetBinlogSize_not_in_binlog_format(t *testing.T) { _, err := GetBinlogSize(mockKV, key) assert.Error(t, err) } + +func TestEstimateMemorySize(t *testing.T) { + memoryKV := memkv.NewMemoryKV() + defer memoryKV.Close() + + key := "TestEstimateMemorySize" + + var size int64 + var err error + + // key not in memoryKV + _, err = EstimateMemorySize(memoryKV, key) + assert.Error(t, err) + + // normal binlog key, for example, index binlog + indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + indexName := funcutil.GenRandomStr() + indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + indexParams := make(map[string]string) + indexParams["index_type"] = "IVF_FLAT" + datas := []*Blob{ + { + Key: "ivf1", + Value: []byte{1, 2, 3}, + }, + { + Key: "ivf2", + Value: []byte{4, 5, 6}, + }, + { + Key: "large", + Value: []byte(funcutil.RandomString(maxLengthPerRowOfIndexFile + 1)), + }, + } + + codec := NewIndexFileBinlogCodec() + defer codec.Close() + + serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas) + assert.Nil(t, err) + + for _, blob := range serializedBlobs { + err = memoryKV.Save(blob.Key, string(blob.Value)) + assert.Nil(t, err) + + buf := bytes.NewBuffer(blob.Value) + desc := &descriptorEvent{} + + _, _ = readMagicNumber(buf) + desc, _ = ReadDescriptorEvent(buf) + + size, err = EstimateMemorySize(memoryKV, blob.Key) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("%v", desc.Extras[originalSizeKey]), fmt.Sprintf("%v", size)) + } +} + +// cover case that failed to read event header +func TestEstimateMemorySize_less_header(t *testing.T) { + mockKV := newMockLessHeaderDataKV() + + key := "TestEstimateMemorySize_less_header" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +// cover case that file not in binlog format +func TestEstimateMemorySize_not_in_binlog_format(t *testing.T) { + mockKV := newMockWrongHeaderDataKV() + + key := "TestEstimateMemorySize_not_in_binlog_format" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +type mockFailedToGetDescDataKV struct { +} + +func (kv *mockFailedToGetDescDataKV) Load(key string) (string, error) { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) MultiLoad(keys []string) ([]string, error) { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) Save(key, value string) error { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) MultiSave(kvs map[string]string) error { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) Remove(key string) error { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) MultiRemove(keys []string) error { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) RemoveWithPrefix(key string) error { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) Close() { + panic("implement me") +} + +func (kv *mockFailedToGetDescDataKV) LoadPartial(key string, start, end int64) ([]byte, error) { + header := &eventHeader{} + header.EventLength = 20 + headerSize := binary.Size(header) + + if end-start > int64(headerSize) { + return nil, errors.New("mock failed to get desc data") + } + + buf := bytes.Buffer{} + _ = binary.Write(&buf, binary.LittleEndian, header) + return buf.Bytes(), nil +} + +func newMockFailedToGetDescDataKV() *mockFailedToGetDescDataKV { + return &mockFailedToGetDescDataKV{} +} + +// cover case that failed to get descriptor event content +func TestEstimateMemorySize_failed_to_load_desc(t *testing.T) { + mockKV := newMockFailedToGetDescDataKV() + + key := "TestEstimateMemorySize_failed_to_load_desc" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +type mockLessDescDataKV struct { +} + +func (kv *mockLessDescDataKV) Load(key string) (string, error) { + panic("implement me") +} + +func (kv *mockLessDescDataKV) MultiLoad(keys []string) ([]string, error) { + panic("implement me") +} + +func (kv *mockLessDescDataKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("implement me") +} + +func (kv *mockLessDescDataKV) Save(key, value string) error { + panic("implement me") +} + +func (kv *mockLessDescDataKV) MultiSave(kvs map[string]string) error { + panic("implement me") +} + +func (kv *mockLessDescDataKV) Remove(key string) error { + panic("implement me") +} + +func (kv *mockLessDescDataKV) MultiRemove(keys []string) error { + panic("implement me") +} + +func (kv *mockLessDescDataKV) RemoveWithPrefix(key string) error { + panic("implement me") +} + +func (kv *mockLessDescDataKV) Close() { + panic("implement me") +} + +func (kv *mockLessDescDataKV) LoadPartial(key string, start, end int64) ([]byte, error) { + header := &baseEventHeader{} + header.EventLength = 20 + + buffer := bytes.Buffer{} + _ = binary.Write(&buffer, binary.LittleEndian, header) + + // no event data + return buffer.Bytes(), nil + + /* + desc := &descriptorEvent{} + desc.ExtraLength = 2 + desc.ExtraBytes = []byte{1, 2} + buffer := bytes.Buffer{} + _ = binary.Write(&buffer, binary.LittleEndian, desc) + // extra not in json format + return buffer.Bytes(), nil + */ +} + +func newMockLessDescDataKV() *mockLessDescDataKV { + return &mockLessDescDataKV{} +} + +func TestEstimateMemorySize_less_desc_data(t *testing.T) { + mockKV := newMockLessDescDataKV() + + key := "TestEstimateMemorySize_less_desc_data" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +type mockOriginalSizeDataKV struct { + impl func(key string, start, end int64) ([]byte, error) +} + +func (kv *mockOriginalSizeDataKV) Load(key string) (string, error) { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) MultiLoad(keys []string) ([]string, error) { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) Save(key, value string) error { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) MultiSave(kvs map[string]string) error { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) Remove(key string) error { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) MultiRemove(keys []string) error { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) RemoveWithPrefix(key string) error { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) Close() { + panic("implement me") +} + +func (kv *mockOriginalSizeDataKV) LoadPartial(key string, start, end int64) ([]byte, error) { + if kv.impl != nil { + return kv.impl(key, start, end) + } + return nil, nil +} + +func newMockOriginalSizeDataKV() *mockOriginalSizeDataKV { + return &mockOriginalSizeDataKV{} +} + +func TestEstimateMemorySize_no_original_size(t *testing.T) { + mockKV := newMockOriginalSizeDataKV() + mockKV.impl = func(key string, start, end int64) ([]byte, error) { + desc := &descriptorEvent{} + desc.descriptorEventHeader.EventLength = 20 + desc.descriptorEventData = *newDescriptorEventData() + extra := make(map[string]interface{}) + extra["key"] = "value" + extraBytes, _ := json.Marshal(extra) + desc.ExtraBytes = extraBytes + desc.ExtraLength = int32(len(extraBytes)) + buf := bytes.Buffer{} + _ = desc.descriptorEventHeader.Write(&buf) + _ = desc.descriptorEventData.Write(&buf) + return buf.Bytes(), nil + } + + key := "TestEstimateMemorySize_no_original_size" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +func TestEstimateMemorySize_cannot_convert_original_size_to_int(t *testing.T) { + mockKV := newMockOriginalSizeDataKV() + mockKV.impl = func(key string, start, end int64) ([]byte, error) { + desc := &descriptorEvent{} + desc.descriptorEventHeader.EventLength = 20 + desc.descriptorEventData = *newDescriptorEventData() + extra := make(map[string]interface{}) + extra[originalSizeKey] = "value" + extraBytes, _ := json.Marshal(extra) + desc.ExtraBytes = extraBytes + desc.ExtraLength = int32(len(extraBytes)) + buf := bytes.Buffer{} + _ = desc.descriptorEventHeader.Write(&buf) + _ = desc.descriptorEventData.Write(&buf) + return buf.Bytes(), nil + } + + key := "TestEstimateMemorySize_cannot_convert_original_size_to_int" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +}