From 818cf3ffa05412e2c579a25e2a3fa5fb77fe41be Mon Sep 17 00:00:00 2001 From: dragondriver Date: Thu, 30 Sep 2021 17:57:01 +0800 Subject: [PATCH] Split blob into several string rows when index file is large (#8919) Signed-off-by: dragondriver --- internal/storage/data_codec.go | 55 +++++++++++++++++++++-------- internal/storage/data_codec_test.go | 4 +++ 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 16d23b6d91..b44da2c9aa 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -36,6 +36,14 @@ const ( IndexParamsFile = "indexParams" ) +// when the blob of index file is too large, we can split blob into several rows, +// fortunately, the blob has no other semantics which differs from other binlog type, +// we then assemble these several rows into a whole blob when deserialize index binlog. +// num rows = math.Ceil(len(blob) / maxLengthPerRowOfIndexFile) +// There is only a string row in the past version index file which is a subset case of splitting into several rows. +// So splitting index file won't introduce incompatibility with past version. +const maxLengthPerRowOfIndexFile = 4 * 1024 * 1024 + type ( UniqueID = typeutil.UniqueID FieldID = typeutil.UniqueID @@ -820,9 +828,17 @@ func (codec *IndexFileBinlogCodec) Serialize( return nil, err } - err = eventWriter.AddOneStringToPayload(string(datas[pos].Value)) - if err != nil { - return nil, err + length := (len(datas[pos].Value) + maxLengthPerRowOfIndexFile - 1) / maxLengthPerRowOfIndexFile + for i := 0; i < length; i++ { + start := i * maxLengthPerRowOfIndexFile + end := (i + 1) * maxLengthPerRowOfIndexFile + if end > len(datas[pos].Value) { + end = len(datas[pos].Value) + } + err = eventWriter.AddOneStringToPayload(string(datas[pos].Value[start:end])) + if err != nil { + return nil, err + } } eventWriter.SetEventTimestamp(ts, ts) @@ -854,9 +870,17 @@ func (codec *IndexFileBinlogCodec) Serialize( } params, _ := json.Marshal(indexParams) - err = eventWriter.AddOneStringToPayload(string(params)) - if err != nil { - return nil, err + length := (len(params) + maxLengthPerRowOfIndexFile - 1) / maxLengthPerRowOfIndexFile + for i := 0; i < length; i++ { + start := i * maxLengthPerRowOfIndexFile + end := (i + 1) * maxLengthPerRowOfIndexFile + if end > len(params) { + end = len(params) + } + err = eventWriter.AddOneStringToPayload(string(params[start:end])) + if err != nil { + return nil, err + } } eventWriter.SetEventTimestamp(ts, ts) @@ -963,6 +987,7 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) ( return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err } + var content []byte for i := 0; i < length; i++ { singleString, err := eventReader.GetOneStringFromPayload(i) if err != nil { @@ -971,14 +996,16 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) ( return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err } - if key == "indexParams" { - _ = json.Unmarshal([]byte(singleString), &indexParams) - } else { - datas = append(datas, &Blob{ - Key: key, - Value: []byte(singleString), - }) - } + content = append(content, []byte(singleString)...) + } + + if key == "indexParams" { + _ = json.Unmarshal(content, &indexParams) + } else { + datas = append(datas, &Blob{ + Key: key, + Value: content, + }) } } } diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 939c82413e..fef6427c6a 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -373,6 +373,10 @@ func TestIndexFileBinlogCodec(t *testing.T) { Key: "ivf2", Value: []byte{4, 5, 6}, }, + { + Key: "large", + Value: []byte(funcutil.RandomString(maxLengthPerRowOfIndexFile + 1)), + }, } codec := NewIndexFileBinlogCodec()