diff --git a/internal/storage/utils.go b/internal/storage/utils.go new file mode 100644 index 0000000000..e5d43f98f1 --- /dev/null +++ b/internal/storage/utils.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/milvus-io/milvus/internal/kv" +) + +// EstimateMemorySize get approximate memory size of a binlog file. +// normal binlog file, error = nil; +// key not exist, size = 0, error = nil; +// key not in binlog format, size = (a not accurate number), error != nil; +// failed to read event reader, size = (a not accurate number), error != nil; +func EstimateMemorySize(kv kv.DataKV, key string) (int64, error) { + total := int64(0) + + header := &baseEventHeader{} + headerSize := binary.Size(header) + + startPos := binary.Size(MagicNumber) + endPos := startPos + headerSize + + for { + headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos)) + if err != nil { + // case 1: key not exist, total = 0; + // case 2: all events have been read, total = (length of all events); + // whatever the case is, the return value is reasonable. + return total, nil + } + + buffer := bytes.NewBuffer(headerContent) + + header, err := readEventHeader(buffer) + if err != nil { + // FIXME(dragondriver): should we return 0 here? + return total, fmt.Errorf("failed to read event reader: %v", err) + } + + if header.EventLength <= 0 || header.NextPosition < int32(endPos) { + // key not in binlog format + // FIXME(dragondriver): should we return 0 here? + return total, fmt.Errorf("key not in binlog format") + } + + total += int64(header.EventLength) + // startPos = startPos + int(header.EventLength) + // || + // \/ + startPos = int(header.NextPosition) + endPos = startPos + headerSize + } +} diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go new file mode 100644 index 0000000000..5a0c2f47d1 --- /dev/null +++ b/internal/storage/utils_test.go @@ -0,0 +1,212 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "testing" + + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/uniquegenerator" + + "github.com/milvus-io/milvus/internal/kv" + + "github.com/stretchr/testify/assert" + + memkv "github.com/milvus-io/milvus/internal/kv/mem" +) + +type mockLessHeaderDataKV struct { +} + +func (kv *mockLessHeaderDataKV) Load(key string) (string, error) { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) MultiLoad(keys []string) ([]string, error) { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) Save(key, value string) error { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) MultiSave(kvs map[string]string) error { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) Remove(key string) error { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) MultiRemove(keys []string) error { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) RemoveWithPrefix(key string) error { + panic("implement me") +} + +func (kv *mockLessHeaderDataKV) Close() { +} + +func (kv *mockLessHeaderDataKV) LoadPartial(key string, start, end int64) ([]byte, error) { + header := &baseEventHeader{} + + headerSize := binary.Size(header) + mockSize := headerSize - 1 + + ret := make([]byte, mockSize) + _, _ = rand.Read(ret) + return ret, nil +} + +func newMockLessHeaderDataKV() *mockLessHeaderDataKV { + return &mockLessHeaderDataKV{} +} + +type mockWrongHeaderDataKV struct { +} + +func (kv *mockWrongHeaderDataKV) Load(key string) (string, error) { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) MultiLoad(keys []string) ([]string, error) { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) Save(key, value string) error { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) MultiSave(kvs map[string]string) error { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) Remove(key string) error { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) MultiRemove(keys []string) error { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) RemoveWithPrefix(key string) error { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) Close() { + panic("implement me") +} + +func (kv *mockWrongHeaderDataKV) LoadPartial(key string, start, end int64) ([]byte, error) { + header := &baseEventHeader{} + + header.EventLength = -1 + header.NextPosition = -1 + + buffer := bytes.Buffer{} + _ = binary.Write(&buffer, binary.LittleEndian, header) + + return buffer.Bytes(), nil +} + +func newMockWrongHeaderDataKV() kv.DataKV { + return &mockWrongHeaderDataKV{} +} + +func TestEstimateMemorySize(t *testing.T) { + memoryKV := memkv.NewMemoryKV() + defer memoryKV.Close() + + key := "TestEstimateMemorySize" + + var size int64 + var err error + + // key not in memoryKV + size, err = EstimateMemorySize(memoryKV, key) + assert.NoError(t, err) + assert.Zero(t, size) + + // normal binlog key, for example, index binlog + indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + indexName := funcutil.GenRandomStr() + indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + indexParams := make(map[string]string) + indexParams["index_type"] = "IVF_FLAT" + datas := []*Blob{ + { + Key: "ivf1", + Value: []byte{1, 2, 3}, + }, + { + Key: "ivf2", + Value: []byte{4, 5, 6}, + }, + { + Key: "large", + Value: []byte(funcutil.RandomString(maxLengthPerRowOfIndexFile + 1)), + }, + } + + codec := NewIndexFileBinlogCodec() + defer codec.Close() + + serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas) + assert.Nil(t, err) + + for _, blob := range serializedBlobs { + err = memoryKV.Save(blob.Key, string(blob.Value)) + assert.Nil(t, err) + + size, err = EstimateMemorySize(memoryKV, blob.Key) + assert.Nil(t, err) + assert.Equal(t, size+int64(binary.Size(MagicNumber)), int64(len(blob.Value))) + } +} + +// cover case that failed to read event header +func TestEstimateMemorySize_less_header(t *testing.T) { + mockKV := newMockLessHeaderDataKV() + + key := "TestEstimateMemorySize_less_header" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +} + +// cover case that file not in binlog format +func TestEstimateMemorySize_not_in_binlog_format(t *testing.T) { + mockKV := newMockWrongHeaderDataKV() + + key := "TestEstimateMemorySize_not_in_binlog_format" + + _, err := EstimateMemorySize(mockKV, key) + assert.Error(t, err) +}