diff --git a/go.mod b/go.mod index 0cdea61d83..966209b153 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.47 github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.23.0 github.com/bits-and-blooms/bitset v1.10.0 + github.com/bytedance/mockey v1.2.14 github.com/bytedance/sonic v1.13.2 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cockroachdb/redact v1.1.3 @@ -164,6 +165,7 @@ require ( github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.5 // indirect + github.com/gopherjs/gopherjs v1.12.80 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect @@ -176,6 +178,7 @@ require ( github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -218,6 +221,8 @@ require ( github.com/shirou/gopsutil/v3 v3.23.7 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/smartystreets/assertions v1.2.0 // indirect + github.com/smartystreets/goconvey v1.7.2 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.6.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -260,7 +265,7 @@ require ( go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/automaxprocs v1.5.3 // indirect - golang.org/x/arch v0.3.0 // indirect + golang.org/x/arch v0.11.0 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.30.0 // indirect diff --git a/go.sum b/go.sum index b843613ba2..589d130a23 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/bytedance/mockey v1.2.14 h1:KZaFgPdiUwW+jOWFieo3Lr7INM1P+6adO3hxZhDswY8= +github.com/bytedance/mockey v1.2.14/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ= github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= @@ -507,6 +509,8 @@ github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBY github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v1.12.80 h1:aC68NT6VK715WeUapxcPSFq/a3gZdS32HdtghdOIgAo= +github.com/gopherjs/gopherjs v1.12.80/go.mod h1:d55Q4EjGQHeJVms+9LGtXul6ykz5Xzx1E1gaXQXdimY= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -599,6 +603,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/juju/ansiterm v0.0.0-20160907234532-b99631de12cf/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU= github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c/go.mod h1:nD0vlnrUjcjJhqN5WuCWZyzfd5AHZAC9/ajvbSx69xA= @@ -777,6 +782,8 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= +github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= @@ -888,6 +895,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.0.1-alpha.1/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= @@ -922,7 +930,10 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= +github.com/shurcooL/httpfs v0.0.0-20181222201310-74dc9339e414/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/shurcooL/vfsgen v0.0.0-20180915214035-33ae1944be3f/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -931,8 +942,12 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= +github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= @@ -946,6 +961,7 @@ github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI= github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= @@ -954,6 +970,7 @@ github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRM github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= +github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= @@ -1161,9 +1178,10 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= -golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4= +golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20180214000028-650f4a345ab4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20180807104621-f027049dab0a/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1313,6 +1331,7 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20180807162357-acbc56fc7007/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1429,6 +1448,7 @@ golang.org/x/tools v0.0.0-20181221001348-537d06c36207/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190308142131-b40df0fb21c3/go.mod h1:25r3+/G6/xytQM8iWZKq3Hn0kr0rgFKPUNVEL/dr3z4= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index cb437e03b9..76a3cb6a4d 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -68,6 +69,8 @@ type channelQueryView struct { loadedRatio *atomic.Float64 // loaded ratio of current query view, set serviceable to true if loadedRatio == 1.0 unloadedSealedSegments []SegmentEntry // workerID -> -1 + + syncedByCoord bool // if the query view is synced by coord } func NewChannelQueryView(growings []int64, sealedSegmentRowCount map[int64]int64, partitions []int64, version int64) *channelQueryView { @@ -85,7 +88,16 @@ func (q *channelQueryView) GetVersion() int64 { } func (q *channelQueryView) Serviceable() bool { - return q.loadedRatio.Load() >= 1.0 + dataReady := q.loadedRatio.Load() >= 1.0 + // for now, we only support collection level target(data view), so we need to wait for the query view is synced by coord + // incase of delegator become serviceable before current target is ready when memory is not enough. + // if current target is not ready, segment on delegator will be released at any time, serviceable state is not reliable. + // Note: after we support channel level target(data view), we can remove this flag + viewReady := q.syncedByCoord + + // if partial result is enabled, we can skip the viewReady check + enablePartialResult := paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat() < 1.0 + return dataReady && (viewReady || enablePartialResult) } func (q *channelQueryView) GetLoadedRatio() float64 { @@ -378,6 +390,7 @@ func (d *distribution) SyncTargetVersion(action *querypb.SyncAction, partitions partitions: typeutil.NewUniqueSet(partitions...), version: action.GetTargetVersion(), loadedRatio: atomic.NewFloat64(0), + syncedByCoord: true, } sealedSet := typeutil.NewUniqueSet(action.GetSealedInTarget()...) diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index 42dda02e5a..74fbaccd15 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -20,11 +20,13 @@ import ( "testing" "time" + "github.com/bytedance/mockey" "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) type DistributionSuite struct { @@ -33,6 +35,7 @@ type DistributionSuite struct { } func (s *DistributionSuite) SetupTest() { + paramtable.Init() s.dist = NewDistribution("channel-1", NewChannelQueryView(nil, nil, nil, initialTargetVersion)) } @@ -860,8 +863,10 @@ func TestDistribution_UpdateServiceable(t *testing.T) { } } dist.updateServiceable("test") - assert.True(t, dist.Serviceable()) assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio()) + assert.False(t, dist.Serviceable()) + dist.queryView.syncedByCoord = true + assert.True(t, dist.Serviceable()) } func TestDistribution_SyncTargetVersion(t *testing.T) { @@ -936,3 +941,199 @@ func TestDistribution_MarkOfflineSegments(t *testing.T) { assert.Equal(t, unreadableTargetVersion, dist.sealedSegments[1].Version) assert.Equal(t, unreadableTargetVersion, dist.sealedSegments[2].Version) } + +// TestChannelQueryView_SyncedByCoord tests the syncedByCoord field functionality +func TestChannelQueryView_SyncedByCoord(t *testing.T) { + mockey.PatchConvey("TestChannelQueryView_SyncedByCoord", t, func() { + // Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization + mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build() + + growings := []int64{1, 2, 3} + sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300} + partitions := []int64{7, 8, 9} + version := int64(10) + + t.Run("new channelQueryView has syncedByCoord false", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + assert.False(t, view.syncedByCoord, "New channelQueryView should have syncedByCoord = false") + }) + + t.Run("syncedByCoord can be set manually", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + + // Initially false + assert.False(t, view.syncedByCoord) + + // Set to true + view.syncedByCoord = true + assert.True(t, view.syncedByCoord) + + // Set back to false + view.syncedByCoord = false + assert.False(t, view.syncedByCoord) + }) + }) +} + +// TestDistribution_SyncTargetVersionSetsSyncedByCoord tests that SyncTargetVersion sets syncedByCoord +func TestDistribution_SyncTargetVersionSetsSyncedByCoord(t *testing.T) { + mockey.PatchConvey("TestDistribution_SyncTargetVersionSetsSyncedByCoord", t, func() { + // Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization + mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build() + + channelName := "test_channel" + growings := []int64{1, 2, 3} + sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300} + partitions := []int64{7, 8, 9} + version := int64(10) + + t.Run("SyncTargetVersion sets syncedByCoord to true", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + dist := NewDistribution(channelName, view) + + // Initially syncedByCoord should be false + assert.False(t, dist.queryView.syncedByCoord) + + // Create sync action + action := &querypb.SyncAction{ + GrowingInTarget: []int64{1, 2}, + SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100}, + TargetVersion: version + 1, + } + + // Sync the view + dist.SyncTargetVersion(action, partitions) + + // Verify syncedByCoord is set to true after sync + assert.True(t, dist.queryView.syncedByCoord, "SyncTargetVersion should set syncedByCoord to true") + assert.Equal(t, action.GetTargetVersion(), dist.queryView.version) + }) + + t.Run("multiple SyncTargetVersion calls maintain syncedByCoord true", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + dist := NewDistribution(channelName, view) + + // First sync + action1 := &querypb.SyncAction{ + GrowingInTarget: []int64{1, 2}, + SealedSegmentRowCount: map[int64]int64{4: 100}, + TargetVersion: version + 1, + } + dist.SyncTargetVersion(action1, partitions) + assert.True(t, dist.queryView.syncedByCoord) + + // Second sync + action2 := &querypb.SyncAction{ + GrowingInTarget: []int64{1, 2, 3}, + SealedSegmentRowCount: map[int64]int64{4: 100, 5: 200}, + TargetVersion: version + 2, + } + dist.SyncTargetVersion(action2, partitions) + assert.True(t, dist.queryView.syncedByCoord, "syncedByCoord should remain true after multiple syncs") + assert.Equal(t, action2.GetTargetVersion(), dist.queryView.version) + }) + + t.Run("SyncTargetVersion creates new queryView with syncedByCoord true", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + dist := NewDistribution(channelName, view) + + // Store reference to original view + originalView := dist.queryView + assert.False(t, originalView.syncedByCoord) + + // Sync should create new queryView + action := &querypb.SyncAction{ + GrowingInTarget: []int64{1, 2}, + SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100}, + TargetVersion: version + 1, + } + dist.SyncTargetVersion(action, partitions) + + // Verify new queryView is created with syncedByCoord = true + newView := dist.queryView + assert.NotSame(t, originalView, newView, "SyncTargetVersion should create new queryView") + assert.True(t, newView.syncedByCoord, "New queryView should have syncedByCoord = true") + assert.False(t, originalView.syncedByCoord, "Original queryView should remain unchanged") + }) + }) +} + +// TestDistribution_ServiceableWithSyncedByCoord tests serviceable logic considering syncedByCoord +func TestDistribution_ServiceableWithSyncedByCoord(t *testing.T) { + mockey.PatchConvey("TestDistribution_ServiceableWithSyncedByCoord", t, func() { + // Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization + mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build() + + channelName := "test_channel" + growings := []int64{1, 2} + sealedWithRowCount := map[int64]int64{4: 100, 5: 100} + partitions := []int64{7, 8} + version := int64(10) + + t.Run("distribution becomes serviceable after sync and full load", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + dist := NewDistribution(channelName, view) + + // Initially not serviceable + assert.False(t, dist.Serviceable()) + + // Add all segments to make it fully loaded + for _, id := range growings { + dist.growingSegments[id] = SegmentEntry{ + SegmentID: id, + } + } + for id := range sealedWithRowCount { + dist.sealedSegments[id] = SegmentEntry{ + SegmentID: id, + Offline: false, + } + } + + // Sync target version to set syncedByCoord = true + action := &querypb.SyncAction{ + GrowingInTarget: growings, + SealedSegmentRowCount: sealedWithRowCount, + TargetVersion: version + 1, + } + dist.SyncTargetVersion(action, partitions) + + // Update serviceable to calculate loaded ratio + dist.updateServiceable("test") + + // Should be serviceable now + assert.True(t, dist.Serviceable()) + assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio()) + assert.True(t, dist.queryView.syncedByCoord) + }) + + t.Run("distribution not serviceable without sync even with full load", func(t *testing.T) { + view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version) + dist := NewDistribution(channelName, view) + + // Add all segments to make it fully loaded + for _, id := range growings { + dist.growingSegments[id] = SegmentEntry{ + SegmentID: id, + } + } + for id := range sealedWithRowCount { + dist.sealedSegments[id] = SegmentEntry{ + SegmentID: id, + Offline: false, + } + } + + // Update serviceable to calculate loaded ratio but don't sync + dist.updateServiceable("test") + + // Should not be serviceable without sync (assuming partial result is disabled by default) + // The exact behavior depends on paramtable configuration, but we test the basic structure + assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio(), "Load ratio should be 1.0") + assert.False(t, dist.queryView.syncedByCoord, "Should not be synced by coord") + + // Note: The actual serviceable result depends on partial result configuration + // We focus on testing that the fields are set correctly + }) + }) +}