From af32f442bb34cfa904aafec83f2f56cf236e9152 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Tue, 9 Mar 2021 16:18:58 +0800 Subject: [PATCH] Split big insert message into serveral smaller Signed-off-by: dragondriver --- configs/milvus.yaml | 2 + go.mod | 1 + go.sum | 2 + internal/proxynode/paramtable.go | 44 +++++++++++++++++++- internal/proxynode/repack_func.go | 53 ++++++++++++++++++++++++- internal/proxynode/util.go | 41 +++++++++++++++++++ internal/proxynode/util_test.go | 38 ++++++++++++++++++ tests/python_test/entity/test_insert.py | 1 + tests/python_test/requirements.txt | 2 +- 9 files changed, 180 insertions(+), 4 deletions(-) create mode 100644 internal/proxynode/util.go create mode 100644 internal/proxynode/util_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7c47b8f3f9..79bc724fec 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -38,6 +38,8 @@ minio: pulsar: address: localhost port: 6650 + rest-port: 18080 # keep same with pulsar container + maxMessageSize: 5242880 # 5 * 1024 * 1024 authentication: false user: user-default token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY diff --git a/go.mod b/go.mod index bfc36ba50a..765b12e9a5 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 + github.com/jarcoal/httpmock v1.0.8 github.com/klauspost/compress v1.10.11 // indirect github.com/minio/minio-go/v7 v7.0.5 github.com/mitchellh/mapstructure v1.1.2 diff --git a/go.sum b/go.sum index 1b0c2edf49..024c34d78b 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,8 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jarcoal/httpmock v1.0.8 h1:8kI16SoO6LQKgPE7PvQuV+YuD/inwHd7fOOe2zMbo4k= +github.com/jarcoal/httpmock v1.0.8/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index d1a720f082..41b66d3b78 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -19,7 +19,9 @@ import ( ) const ( - StartParamsKey = "START_PARAMS" + StartParamsKey = "START_PARAMS" + PulsarMaxMessageSizeKey = "maxMessageSize" + SuggestPulsarMaxMessageSizeKey = 5 * 1024 * 1024 ) type ParamTable struct { @@ -55,7 +57,8 @@ type ParamTable struct { DefaultPartitionTag string DefaultIndexName string - Log log.Config + PulsarMaxMessageSize int + Log log.Config } var Params ParamTable @@ -154,6 +157,7 @@ func (pt *ParamTable) initParams() { pt.initDefaultIndexName() pt.initLogCfg() + pt.initPulsarMaxMessageSize() } func (pt *ParamTable) initPulsarAddress() { @@ -404,6 +408,42 @@ func (pt *ParamTable) initDefaultIndexName() { pt.DefaultIndexName = name } +func (pt *ParamTable) initPulsarMaxMessageSize() { + // pulsarHost, err := pt.Load("pulsar.address") + // if err != nil { + // panic(err) + // } + + // pulsarRestPort, err := pt.Load("pulsar.rest-port") + // if err != nil { + // panic(err) + // } + + // protocol := "http" + // url := "/admin/v2/brokers/configuration/runtime" + // runtimeConfig, err := GetPulsarConfig(protocol, pulsarHost, pulsarRestPort, url) + // if err != nil { + // panic(err) + // } + // maxMessageSizeStr := fmt.Sprintf("%v", runtimeConfig[PulsarMaxMessageSizeKey]) + // pt.PulsarMaxMessageSize, err = strconv.Atoi(maxMessageSizeStr) + // if err != nil { + // panic(err) + // } + + maxMessageSizeStr, err := pt.Load("pulsar.maxMessageSize") + if err != nil { + pt.PulsarMaxMessageSize = SuggestPulsarMaxMessageSizeKey + } else { + maxMessageSize, err := strconv.Atoi(maxMessageSizeStr) + if err != nil { + pt.PulsarMaxMessageSize = SuggestPulsarMaxMessageSizeKey + } else { + pt.PulsarMaxMessageSize = maxMessageSize + } + } +} + func (pt *ParamTable) initLogCfg() { pt.Log = log.Config{} format, err := pt.Load("log.format") diff --git a/internal/proxynode/repack_func.go b/internal/proxynode/repack_func.go index 9edd7d23b7..893c44e737 100644 --- a/internal/proxynode/repack_func.go +++ b/internal/proxynode/repack_func.go @@ -4,6 +4,7 @@ import ( "errors" "log" "sort" + "unsafe" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -170,6 +171,53 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, return 0 } + factor := 10 + threshold := Params.PulsarMaxMessageSize / factor + log.Println("threshold of message size: ", threshold) + // not accurate + getSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int { + // if real struct, call unsafe.Sizeof directly, + // if reference, dereference and then call unsafe.Sizeof, + // if slice, todo: a common function to calculate size of slice, + // if map, a little complicated + size := 0 + size += int(unsafe.Sizeof(msg.BeginTimestamp)) + size += int(unsafe.Sizeof(msg.EndTimestamp)) + size += int(unsafe.Sizeof(msg.HashValues)) + size += len(msg.HashValues) * 4 + size += int(unsafe.Sizeof(*msg.MsgPosition)) + size += int(unsafe.Sizeof(*msg.Base)) + size += int(unsafe.Sizeof(msg.DbName)) + size += int(unsafe.Sizeof(msg.CollectionName)) + size += int(unsafe.Sizeof(msg.PartitionName)) + size += int(unsafe.Sizeof(msg.DbID)) + size += int(unsafe.Sizeof(msg.CollectionID)) + size += int(unsafe.Sizeof(msg.PartitionID)) + size += int(unsafe.Sizeof(msg.SegmentID)) + size += int(unsafe.Sizeof(msg.ChannelID)) + size += int(unsafe.Sizeof(msg.Timestamps)) + size += int(unsafe.Sizeof(msg.RowIDs)) + size += len(msg.RowIDs) * 8 + for _, blob := range msg.RowData { + size += int(unsafe.Sizeof(blob.Value)) + size += len(blob.Value) + } + log.Println("size of insert message: ", size) + return size + } + // not accurate + // getSizeOfMsgPack := func(mp *msgstream.MsgPack) int { + // size := 0 + // for _, msg := range mp.Msgs { + // insertMsg, ok := msg.(*msgstream.InsertMsg) + // if !ok { + // log.Panic("only insert message is supported!") + // } + // size += getSizeOfInsertMsg(insertMsg) + // } + // return size + // } + for i, request := range tsMsgs { insertRequest := request.(*msgstream.InsertMsg) keys := hashKeys[i] @@ -214,10 +262,13 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, InsertRequest: sliceRequest, } if together { // all rows with same hash value are accumulated to only one message + msgNums := len(result[key].Msgs) if len(result[key].Msgs) <= 0 { result[key].Msgs = append(result[key].Msgs, insertMsg) + } else if getSizeOfInsertMsg(result[key].Msgs[msgNums-1].(*msgstream.InsertMsg)) >= threshold { + result[key].Msgs = append(result[key].Msgs, insertMsg) } else { - accMsgs, _ := result[key].Msgs[0].(*msgstream.InsertMsg) + accMsgs, _ := result[key].Msgs[msgNums-1].(*msgstream.InsertMsg) accMsgs.Timestamps = append(accMsgs.Timestamps, ts) accMsgs.RowIDs = append(accMsgs.RowIDs, rowID) accMsgs.RowData = append(accMsgs.RowData, row) diff --git a/internal/proxynode/util.go b/internal/proxynode/util.go new file mode 100644 index 0000000000..a206c55a51 --- /dev/null +++ b/internal/proxynode/util.go @@ -0,0 +1,41 @@ +package proxynode + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "time" + + "github.com/zilliztech/milvus-distributed/internal/util/retry" +) + +func GetPulsarConfig(protocol, ip, port, url string) (map[string]interface{}, error) { + var resp *http.Response + var err error + + getResp := func() error { + log.Println("GET: ", protocol+"://"+ip+":"+port+url) + resp, err = http.Get(protocol + "://" + ip + ":" + port + url) + return err + } + + err = retry.Retry(10, time.Second, getResp) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + ret := make(map[string]interface{}) + err = json.Unmarshal(body, &ret) + if err != nil { + return nil, err + } + + return ret, nil +} diff --git a/internal/proxynode/util_test.go b/internal/proxynode/util_test.go new file mode 100644 index 0000000000..3c15405b66 --- /dev/null +++ b/internal/proxynode/util_test.go @@ -0,0 +1,38 @@ +package proxynode + +import ( + "fmt" + "net/http" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jarcoal/httpmock" +) + +func TestGetPulsarConfig(t *testing.T) { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + runtimeConfig := make(map[string]interface{}) + runtimeConfig[PulsarMaxMessageSizeKey] = strconv.FormatInt(5*1024*1024, 10) + + protocol := "http" + ip := "pulsar" + port := "18080" + url := "/admin/v2/brokers/configuration/runtime" + httpmock.RegisterResponder("GET", protocol+"://"+ip+":"+port+url, + func(req *http.Request) (*http.Response, error) { + return httpmock.NewJsonResponse(200, runtimeConfig) + }, + ) + + ret, err := GetPulsarConfig(protocol, ip, port, url) + assert.Equal(t, nil, err) + assert.Equal(t, len(ret), len(runtimeConfig)) + assert.Equal(t, len(ret), 1) + for key, value := range ret { + assert.Equal(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", runtimeConfig[key])) + } +} diff --git a/tests/python_test/entity/test_insert.py b/tests/python_test/entity/test_insert.py index 5ea16d28d6..59f12b161e 100644 --- a/tests/python_test/entity/test_insert.py +++ b/tests/python_test/entity/test_insert.py @@ -702,6 +702,7 @@ class TestInsertAsync: assert len(ids) == nb @pytest.mark.level(2) + @pytest.mark.tags("0331") def test_insert_async_long(self, connect, collection): ''' target: test insert vectors with different length of vectors diff --git a/tests/python_test/requirements.txt b/tests/python_test/requirements.txt index cebcd44ec7..3152140df2 100644 --- a/tests/python_test/requirements.txt +++ b/tests/python_test/requirements.txt @@ -2,7 +2,7 @@ grpcio==1.26.0 grpcio-tools==1.26.0 numpy==1.18.1 pytest-cov==2.8.1 -pymilvus-distributed==0.0.39 +pymilvus-distributed==0.0.40 sklearn==0.0 pytest==6.2.2 pytest-timeout==1.3.3