diff --git a/go.mod b/go.mod index 27e4c5bc06..0b27e21ebb 100644 --- a/go.mod +++ b/go.mod @@ -65,9 +65,11 @@ require ( require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 require ( + github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 + github.com/zeebo/xxh3 v1.0.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -209,7 +211,6 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect - github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect @@ -250,6 +251,7 @@ replace ( github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5 github.com/go-kit/kit => github.com/go-kit/kit v0.1.0 + github.com/greatroar/blobloom => github.com/weiliu1031/blobloom v0.0.0-20240530105622-1e0e104a7160 // github.com/milvus-io/milvus-storage/go => ../milvus-storage/go github.com/milvus-io/milvus/pkg => ./pkg github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1 diff --git a/go.sum b/go.sum index 25be847bbe..b5f2c76e8e 100644 --- a/go.sum +++ b/go.sum @@ -56,12 +56,14 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9Orh github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0/go.mod h1:c+Lifp3EDEamAkPVzMooRNOK6CZjNSdEnf1A7jsI9u4= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0/go.mod h1:7QJP7dr2wznCMeqIrhMgWGf7XpAQnVrJqDm9nvV3Cu4= github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY= github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= @@ -169,6 +171,7 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= @@ -215,6 +218,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -235,6 +239,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= +github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= @@ -245,6 +250,7 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= +github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= @@ -257,6 +263,7 @@ github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03D github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= +github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -290,7 +297,6 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -300,6 +306,7 @@ github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AE github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -338,6 +345,7 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= +github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -401,6 +409,7 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -517,7 +526,6 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE= github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro= github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8= -github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -548,7 +556,9 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 h1:vN4d3jSss3ExzUn2cE0WctxztfOgiKvMKnDrydBsg00= +github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06/go.mod h1:++9BgZujZd4v0ZTZCb5iPsaomXdZWyxotIAh1IiDm44= github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBEhQjrV+KJ9lEWDWYZoNVZUaBF++Wyljq4= +github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= @@ -556,6 +566,7 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k= +github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= @@ -582,6 +593,7 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.8 h1:3tS41NlGYSmhhe/8fhGRzc+z3AYCw1Fe1WAyLuujKs0= +github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -653,6 +665,7 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8urCTFX88= +github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= @@ -715,6 +728,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -842,6 +856,7 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 h1:LcUqBlKC4j15LhT303yQDX/XxyHG4haEQqbHgZZA4SY= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= +github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= @@ -879,6 +894,8 @@ github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBn github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/weiliu1031/blobloom v0.0.0-20240530105622-1e0e104a7160 h1:x7cclCOEtr9zSzSZhwB7mhz/tFNHsILh6XewGTmJKk0= +github.com/weiliu1031/blobloom v0.0.0-20240530105622-1e0e104a7160/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -901,6 +918,7 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -971,6 +989,7 @@ go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnw go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -1458,6 +1477,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -1514,3 +1534,4 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 7785e68754..80b7bc0578 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -19,10 +19,10 @@ package metacache import ( "sync" - "github.com/bits-and-blooms/bloom/v3" "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -76,8 +76,9 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { if bfs.current == nil { bfs.current = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(bfs.batchSize, - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloomfilter.NewBloomFilterWithType(bfs.batchSize, + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } } diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index 784f349940..35c0789adf 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -205,6 +205,7 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B FieldID: s.pkField.GetFieldID(), MaxPk: pks.MaxPK, MinPk: pks.MinPK, + BFType: pks.PkFilter.Type(), BF: pks.PkFilter, PkType: int64(s.pkField.GetDataType()), } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 675f48fe92..412ff5d95c 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/atomic" @@ -20,6 +19,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -383,7 +383,10 @@ type inData struct { func (id *inData) generatePkStats() { id.batchBF = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloomfilter.NewBloomFilterWithType( + uint(id.rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } for _, ids := range id.pkField { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 3990e63ba1..02dce93ecb 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -526,7 +526,8 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { segmentPks, segmentTss := segment.DeleteRecords() for i, pk := range segmentPks { - if candidate.MayPkExist(pk) { + lc := storage.NewLocationsCache(pk) + if candidate.MayPkExist(lc) { pks = append(pks, pk) tss = append(tss, segmentTss[i]) } @@ -637,7 +638,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, continue } for i, pk := range record.DeleteData.Pks { - if candidate.MayPkExist(pk) { + lc := storage.NewLocationsCache(pk) + if candidate.MayPkExist(lc) { deleteData.Append(pk, record.DeleteData.Tss[i]) } } @@ -733,7 +735,8 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position } for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) { - if candidate.MayPkExist(pk) { + lc := storage.NewLocationsCache(pk) + if candidate.MayPkExist(lc) { result.Pks = append(result.Pks, pk) result.Tss = append(result.Tss, dmsg.Timestamps[idx]) } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 50665425aa..1a17f41812 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - bloom "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -41,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -258,12 +258,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) - ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) - }) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) + ms.EXPECT().MayPkExist(mock.Anything).RunAndReturn(func(lc *storage.LocationsCache) bool { + return lc.GetPk().EQ(storage.NewInt64PrimaryKey(10)) }) return ms }) @@ -272,8 +268,9 @@ func (s *DelegatorDataSuite) TestProcessDelete() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -528,8 +525,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -686,8 +685,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -880,10 +881,6 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) - }) return ms }) }, nil) @@ -891,8 +888,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 4d51b1145d..2dcd9ac5e0 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -99,10 +99,6 @@ func (s *DelegatorSuite) SetupTest() { ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) - ms.EXPECT().GetHashFuncNum().Return(1) - ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool { - return pk.EQ(storage.NewInt64PrimaryKey(10)) - }) return ms }) }, nil) diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index 608bb656ef..88f5602ebf 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -17,15 +17,14 @@ package pkoracle import ( - "context" "sync" - bloom "github.com/bits-and-blooms/bloom/v3" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -41,68 +40,25 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics - - kHashFunc uint } // MayPkExist returns whether any bloom filters returns positive. -func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { +func (s *BloomFilterSet) MayPkExist(lc *storage.LocationsCache) bool { s.statsMutex.RLock() defer s.statsMutex.RUnlock() - if s.currentStat != nil && s.currentStat.PkExist(pk) { + if s.currentStat != nil && s.currentStat.TestLocationCache(lc) { return true } // for sealed, if one of the stats shows it exist, then we have to check it for _, historyStat := range s.historyStats { - if historyStat.PkExist(pk) { + if historyStat.TestLocationCache(lc) { return true } } return false } -func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { - log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60) - s.statsMutex.RLock() - defer s.statsMutex.RUnlock() - - if s.currentStat != nil { - k := s.currentStat.PkFilter.K() - if k > uint(len(locs)) { - log.RatedWarn(30, "locations num is less than hash func num, return false positive result", - zap.Int("locationNum", len(locs)), - zap.Uint("hashFuncNum", k), - zap.Int64("segmentID", s.segmentID)) - return true - } - - if s.currentStat.TestLocations(pk, locs[:k]) { - return true - } - } - - // for sealed, if one of the stats shows it exist, then we have to check it - for _, historyStat := range s.historyStats { - k := historyStat.PkFilter.K() - if k > uint(len(locs)) { - log.RatedWarn(30, "locations num is less than hash func num, return false positive result", - zap.Int("locationNum", len(locs)), - zap.Uint("hashFuncNum", k), - zap.Int64("segmentID", s.segmentID)) - return true - } - if historyStat.TestLocations(pk, locs[:k]) { - return true - } - } - return false -} - -func (s *BloomFilterSet) GetHashFuncNum() uint { - return s.kHashFunc -} - // ID implement candidate. func (s *BloomFilterSet) ID() int64 { return s.segmentID @@ -124,13 +80,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { - m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) - if k > s.kHashFunc { - s.kHashFunc = k - } + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.New(m, k), + PkFilter: bf, } } @@ -157,9 +112,6 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.statsMutex.Lock() defer s.statsMutex.Unlock() - if stats.PkFilter.K() > s.kHashFunc { - s.kHashFunc = stats.PkFilter.K() - } s.historyStats = append(s.historyStats, stats) } diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go index 0384d3faa7..9aaa8f0a08 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set_test.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -41,10 +41,9 @@ func TestInt64Pk(t *testing.T) { bfs.UpdateBloomFilter(pks) for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) + lc := storage.NewLocationsCache(pks[i]) + ret := bfs.MayPkExist(lc) + assert.True(t, ret) } assert.Equal(t, int64(1), bfs.ID()) @@ -66,10 +65,9 @@ func TestVarCharPk(t *testing.T) { bfs.UpdateBloomFilter(pks) for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) + lc := storage.NewLocationsCache(pks[i]) + ret := bfs.MayPkExist(lc) + assert.True(t, ret) } } @@ -91,29 +89,8 @@ func TestHistoricalStat(t *testing.T) { bfs.currentStat = nil for i := 0; i < batchSize; i++ { - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()) - ret1 := bfs.TestLocations(pks[i], locations) - ret2 := bfs.MayPkExist(pks[i]) - assert.Equal(t, ret1, ret2) - } -} - -func TestHashFuncNum(t *testing.T) { - paramtable.Init() - batchSize := 100 - pks := make([]storage.PrimaryKey, 0) - for i := 0; i < batchSize; i++ { - pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) - pks = append(pks, pk) - } - - bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) - bfs.UpdateBloomFilter(pks) - - for i := 0; i < batchSize; i++ { - // pass locations more then hash func num in bf - locations := storage.Locations(pks[i], bfs.GetHashFuncNum()+3) - ret1 := bfs.TestLocations(pks[i], locations) - assert.True(t, ret1) + lc := storage.NewLocationsCache(pks[i]) + ret := bfs.MayPkExist(lc) + assert.True(t, ret) } } diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index e5f051e5f1..c115a5a0c1 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -26,9 +26,7 @@ import ( // Candidate is the interface for pk oracle candidate. type Candidate interface { // MayPkExist checks whether primary key could exists in this candidate. - MayPkExist(pk storage.PrimaryKey) bool - TestLocations(pk storage.PrimaryKey, locs []uint64) bool - GetHashFuncNum() uint + MayPkExist(lc *storage.LocationsCache) bool ID() int64 Partition() int64 diff --git a/internal/querynodev2/pkoracle/key.go b/internal/querynodev2/pkoracle/key.go index 9845b5e065..6600398798 100644 --- a/internal/querynodev2/pkoracle/key.go +++ b/internal/querynodev2/pkoracle/key.go @@ -28,20 +28,11 @@ type candidateKey struct { } // MayPkExist checks whether primary key could exists in this candidate. -func (k candidateKey) MayPkExist(pk storage.PrimaryKey) bool { +func (k candidateKey) MayPkExist(lc *storage.LocationsCache) bool { // always return true to prevent miuse return true } -func (k candidateKey) TestLocations(pk storage.PrimaryKey, locs []uint64) bool { - // always return true to prevent miuse - return true -} - -func (k candidateKey) GetHashFuncNum() uint { - return 0 -} - // ID implements Candidate. func (k candidateKey) ID() int64 { return k.segmentID diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index 4d686503ec..a700fe3066 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -19,10 +19,8 @@ package pkoracle import ( "fmt" - "sync" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -43,30 +41,11 @@ var _ PkOracle = (*pkOracle)(nil) // pkOracle implementation. type pkOracle struct { candidates *typeutil.ConcurrentMap[string, candidateWithWorker] - - hashFuncNumMutex sync.RWMutex - maxHashFuncNum uint -} - -func (pko *pkOracle) GetMaxHashFuncNum() uint { - pko.hashFuncNumMutex.RLock() - defer pko.hashFuncNumMutex.RUnlock() - return pko.maxHashFuncNum -} - -func (pko *pkOracle) TryUpdateHashFuncNum(newValue uint) { - pko.hashFuncNumMutex.Lock() - defer pko.hashFuncNumMutex.Unlock() - if newValue > pko.maxHashFuncNum { - pko.maxHashFuncNum = newValue - } } // Get implements PkOracle. func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { var result []int64 - var locations []uint64 - pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -74,15 +53,8 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i } } - if locations == nil { - locations = storage.Locations(pk, pko.GetMaxHashFuncNum()) - if len(locations) == 0 { - log.Warn("pkOracle: no location found for pk") - return true - } - } - - if candidate.TestLocations(pk, locations) { + lc := storage.NewLocationsCache(pk) + if candidate.MayPkExist(lc) { result = append(result, candidate.ID()) } return true @@ -97,7 +69,6 @@ func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { // Register register candidate func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { - pko.TryUpdateHashFuncNum(candidate.GetHashFuncNum()) pko.candidates.Insert(pko.candidateKey(candidate, workerID), candidateWithWorker{ Candidate: candidate, workerID: workerID, @@ -108,7 +79,6 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { // Remove removes candidate from pko. func (pko *pkOracle) Remove(filters ...CandidateFilter) error { - max := uint(0) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -116,14 +86,9 @@ func (pko *pkOracle) Remove(filters ...CandidateFilter) error { } } pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)) - if candidate.GetHashFuncNum() > max { - max = candidate.GetHashFuncNum() - } - return true }) - pko.TryUpdateHashFuncNum(max) return nil } diff --git a/internal/querynodev2/segments/bloom_filter_set.go b/internal/querynodev2/segments/bloom_filter_set.go deleted file mode 100644 index b07713961c..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "sync" - - bloom "github.com/bits-and-blooms/bloom/v3" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - storage "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type bloomFilterSet struct { - statsMutex sync.RWMutex - currentStat *storage.PkStatistics - historyStats []*storage.PkStatistics -} - -func newBloomFilterSet() *bloomFilterSet { - return &bloomFilterSet{} -} - -// MayPkExist returns whether any bloom filters returns positive. -func (s *bloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { - s.statsMutex.RLock() - defer s.statsMutex.RUnlock() - if s.currentStat != nil && s.currentStat.PkExist(pk) { - return true - } - - // for sealed, if one of the stats shows it exist, then we have to check it - for _, historyStat := range s.historyStats { - if historyStat.PkExist(pk) { - return true - } - } - return false -} - -// UpdateBloomFilter updates currentStats with provided pks. -func (s *bloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - if s.currentStat == nil { - s.initCurrentStat() - } - - buf := make([]byte, 8) - for _, pk := range pks { - s.currentStat.UpdateMinMax(pk) - switch pk.Type() { - case schemapb.DataType_Int64: - int64Value := pk.(*storage.Int64PrimaryKey).Value - common.Endian.PutUint64(buf, uint64(int64Value)) - s.currentStat.PkFilter.Add(buf) - case schemapb.DataType_VarChar: - stringValue := pk.(*storage.VarCharPrimaryKey).Value - s.currentStat.PkFilter.AddString(stringValue) - default: - log.Error("failed to update bloomfilter", zap.Any("PK type", pk.Type())) - panic("failed to update bloomfilter") - } - } -} - -// AddHistoricalStats add loaded historical stats. -func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - s.historyStats = append(s.historyStats, stats) -} - -// initCurrentStat initialize currentStats if nil. -// Note: invoker shall acquire statsMutex lock first. -func (s *bloomFilterSet) initCurrentStat() { - s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } -} diff --git a/internal/querynodev2/segments/bloom_filter_set_test.go b/internal/querynodev2/segments/bloom_filter_set_test.go deleted file mode 100644 index 9bf95a1ff9..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set_test.go +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type BloomFilterSetSuite struct { - suite.Suite - - intPks []int64 - stringPks []string - set *bloomFilterSet -} - -func (suite *BloomFilterSetSuite) SetupTest() { - suite.intPks = []int64{1, 2, 3} - suite.stringPks = []string{"1", "2", "3"} - paramtable.Init() - suite.set = newBloomFilterSet() -} - -func (suite *BloomFilterSetSuite) TestInt64PkBloomFilter() { - pks, err := storage.GenInt64PrimaryKeys(suite.intPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestStringPkBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestHistoricalBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } - - old := suite.set.currentStat - suite.set.currentStat = nil - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.False(exist) - } - - suite.set.AddHistoricalStats(old) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func TestBloomFilterSet(t *testing.T) { - suite.Run(t, &BloomFilterSetSuite{}) -} diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 3121d0ca45..e31d1b5181 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -246,47 +246,6 @@ func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockS return _c } -// GetHashFuncNum provides a mock function with given fields: -func (_m *MockSegment) GetHashFuncNum() uint { - ret := _m.Called() - - var r0 uint - if rf, ok := ret.Get(0).(func() uint); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint) - } - - return r0 -} - -// MockSegment_GetHashFuncNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHashFuncNum' -type MockSegment_GetHashFuncNum_Call struct { - *mock.Call -} - -// GetHashFuncNum is a helper method to define mock.On call -func (_e *MockSegment_Expecter) GetHashFuncNum() *MockSegment_GetHashFuncNum_Call { - return &MockSegment_GetHashFuncNum_Call{Call: _e.mock.On("GetHashFuncNum")} -} - -func (_c *MockSegment_GetHashFuncNum_Call) Run(run func()) *MockSegment_GetHashFuncNum_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockSegment_GetHashFuncNum_Call) Return(_a0 uint) *MockSegment_GetHashFuncNum_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_GetHashFuncNum_Call) RunAndReturn(run func() uint) *MockSegment_GetHashFuncNum_Call { - _c.Call.Return(run) - return _c -} - // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) @@ -752,13 +711,13 @@ func (_c *MockSegment_LoadInfo_Call) RunAndReturn(run func() *querypb.SegmentLoa return _c } -// MayPkExist provides a mock function with given fields: pk -func (_m *MockSegment) MayPkExist(pk storage.PrimaryKey) bool { - ret := _m.Called(pk) +// MayPkExist provides a mock function with given fields: lc +func (_m *MockSegment) MayPkExist(lc *storage.LocationsCache) bool { + ret := _m.Called(lc) var r0 bool - if rf, ok := ret.Get(0).(func(storage.PrimaryKey) bool); ok { - r0 = rf(pk) + if rf, ok := ret.Get(0).(func(*storage.LocationsCache) bool); ok { + r0 = rf(lc) } else { r0 = ret.Get(0).(bool) } @@ -772,14 +731,14 @@ type MockSegment_MayPkExist_Call struct { } // MayPkExist is a helper method to define mock.On call -// - pk storage.PrimaryKey -func (_e *MockSegment_Expecter) MayPkExist(pk interface{}) *MockSegment_MayPkExist_Call { - return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", pk)} +// - lc *storage.LocationsCache +func (_e *MockSegment_Expecter) MayPkExist(lc interface{}) *MockSegment_MayPkExist_Call { + return &MockSegment_MayPkExist_Call{Call: _e.mock.On("MayPkExist", lc)} } -func (_c *MockSegment_MayPkExist_Call) Run(run func(pk storage.PrimaryKey)) *MockSegment_MayPkExist_Call { +func (_c *MockSegment_MayPkExist_Call) Run(run func(lc *storage.LocationsCache)) *MockSegment_MayPkExist_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(storage.PrimaryKey)) + run(args[0].(*storage.LocationsCache)) }) return _c } @@ -789,7 +748,7 @@ func (_c *MockSegment_MayPkExist_Call) Return(_a0 bool) *MockSegment_MayPkExist_ return _c } -func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(storage.PrimaryKey) bool) *MockSegment_MayPkExist_Call { +func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(*storage.LocationsCache) bool) *MockSegment_MayPkExist_Call { _c.Call.Return(run) return _c } @@ -1453,49 +1412,6 @@ func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosi return _c } -// TestLocations provides a mock function with given fields: pk, loc -func (_m *MockSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { - ret := _m.Called(pk, loc) - - var r0 bool - if rf, ok := ret.Get(0).(func(storage.PrimaryKey, []uint64) bool); ok { - r0 = rf(pk, loc) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// MockSegment_TestLocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TestLocations' -type MockSegment_TestLocations_Call struct { - *mock.Call -} - -// TestLocations is a helper method to define mock.On call -// - pk storage.PrimaryKey -// - loc []uint64 -func (_e *MockSegment_Expecter) TestLocations(pk interface{}, loc interface{}) *MockSegment_TestLocations_Call { - return &MockSegment_TestLocations_Call{Call: _e.mock.On("TestLocations", pk, loc)} -} - -func (_c *MockSegment_TestLocations_Call) Run(run func(pk storage.PrimaryKey, loc []uint64)) *MockSegment_TestLocations_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(storage.PrimaryKey), args[1].([]uint64)) - }) - return _c -} - -func (_c *MockSegment_TestLocations_Call) Return(_a0 bool) *MockSegment_TestLocations_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_TestLocations_Call) RunAndReturn(run func(storage.PrimaryKey, []uint64) bool) *MockSegment_TestLocations_Call { - _c.Call.Return(run) - return _c -} - // Type provides a mock function with given fields: func (_m *MockSegment) Type() commonpb.SegmentState { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 3382b4373a..b4291850bc 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -186,16 +186,8 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { // MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter, // false otherwise, // may returns true even the PK doesn't exist actually -func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool { - return s.bloomFilterSet.MayPkExist(pk) -} - -func (s *baseSegment) TestLocations(pk storage.PrimaryKey, loc []uint64) bool { - return s.bloomFilterSet.TestLocations(pk, loc) -} - -func (s *baseSegment) GetHashFuncNum() uint { - return s.bloomFilterSet.GetHashFuncNum() +func (s *baseSegment) MayPkExist(lc *storage.LocationsCache) bool { + return s.bloomFilterSet.MayPkExist(lc) } // ResourceUsageEstimate returns the estimated resource usage of the segment. diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 9ed9d4df90..f439d0f818 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -83,9 +83,7 @@ type Segment interface { // Bloom filter related UpdateBloomFilter(pks []storage.PrimaryKey) - MayPkExist(pk storage.PrimaryKey) bool - TestLocations(pk storage.PrimaryKey, loc []uint64) bool - GetHashFuncNum() uint + MayPkExist(lc *storage.LocationsCache) bool // Read operations Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 138fed79b7..a1930159d4 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -226,7 +226,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() { // Won't load bloom filter with sealed segments for _, segment := range segments { for pk := 0; pk < 100; pk++ { - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().False(exist) } } @@ -260,7 +261,8 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() { // Should load bloom filter with growing segments for _, segment := range segments { for pk := 0; pk < 100; pk++ { - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.True(exist) } } @@ -351,7 +353,8 @@ func (suite *SegmentLoaderSuite) TestLoadBloomFilter() { for _, bf := range bfs { for pk := 0; pk < 100; pk++ { - exist := bf.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := bf.MayPkExist(lc) suite.Require().True(exist) } } @@ -404,7 +407,8 @@ func (suite *SegmentLoaderSuite) TestLoadDeltaLogs() { if pk == 1 || pk == 2 { continue } - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().True(exist) } } @@ -457,7 +461,8 @@ func (suite *SegmentLoaderSuite) TestLoadDupDeltaLogs() { if pk == 1 || pk == 2 { continue } - exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk))) + lc := storage.NewLocationsCache(storage.NewInt64PrimaryKey(int64(pk))) + exist := segment.MayPkExist(lc) suite.Require().True(exist) } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index d4f1855ab4..464df07e7a 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -188,14 +188,6 @@ func (suite *SegmentSuite) TestHasRawData() { suite.True(has) } -func (suite *SegmentSuite) TestLocation() { - pk := storage.NewInt64PrimaryKey(100) - locations := storage.Locations(pk, suite.sealed.GetHashFuncNum()) - ret1 := suite.sealed.TestLocations(pk, locations) - ret2 := suite.sealed.MayPkExist(pk) - suite.Equal(ret1, ret2) -} - func (suite *SegmentSuite) TestCASVersion() { segment := suite.sealed diff --git a/internal/storage/field_stats.go b/internal/storage/field_stats.go index a26e8aa9e1..87d6e9acf7 100644 --- a/internal/storage/field_stats.go +++ b/internal/storage/field_stats.go @@ -20,10 +20,12 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -31,12 +33,13 @@ import ( // FieldStats contains statistics data for any column // todo: compatible to PrimaryKeyStats type FieldStats struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max ScalarFieldValue `json:"max"` // for scalar field - Min ScalarFieldValue `json:"min"` // for scalar field - BF *bloom.BloomFilter `json:"bf"` // for scalar field - Centroids []VectorFieldValue `json:"centroids"` // for vector field + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max ScalarFieldValue `json:"max"` // for scalar field + Min ScalarFieldValue `json:"min"` // for scalar field + BFType bloomfilter.BFType `json:"bfType"` // for scalar field + BF bloomfilter.BloomFilterInterface `json:"bf"` // for scalar field + Centroids []VectorFieldValue `json:"centroids"` // for vector field } // UnmarshalJSON unmarshal bytes to FieldStats @@ -141,12 +144,22 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } } else { stats.initCentroids(data, stats.Type) @@ -161,12 +174,12 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { func (stats *FieldStats) initCentroids(data []byte, dataType schemapb.DataType) { type FieldStatsAux struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max json.RawMessage `json:"max"` - Min json.RawMessage `json:"min"` - BF *bloom.BloomFilter `json:"bf"` - Centroids []json.RawMessage `json:"centroids"` + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max json.RawMessage `json:"max"` + Min json.RawMessage `json:"min"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + Centroids []json.RawMessage `json:"centroids"` } // Unmarshal JSON into the auxiliary struct var aux FieldStatsAux @@ -361,10 +374,15 @@ func NewFieldStats(fieldID int64, pkType schemapb.DataType, rowNum int64) (*Fiel Type: pkType, }, nil } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -391,11 +409,17 @@ func (sw *FieldStatsWriter) GenerateList(stats []*FieldStats) error { // GenerateByData writes data from @msgs with @fieldID to @buffer func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error { statsList := make([]*FieldStats, 0) + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() for _, msg := range msgs { stats := &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(msg.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msg.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msg) diff --git a/internal/storage/field_stats_test.go b/internal/storage/field_stats_test.go index e169902bf9..f04155ac2d 100644 --- a/internal/storage/field_stats_test.go +++ b/internal/storage/field_stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestFieldStatsUpdate(t *testing.T) { @@ -373,7 +374,7 @@ func TestFieldStatsWriter_UpgradePrimaryKey(t *testing.T) { FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } b := make([]byte, 8) @@ -574,8 +575,9 @@ func TestFieldStatsUnMarshal(t *testing.T) { assert.Error(t, err) err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": \"b\"}")) assert.Error(t, err) + // return AlwaysTrueBloomFilter when deserialize bloom filter failed. err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": 1, \"bf\": \"2\"}")) - assert.Error(t, err) + assert.NoError(t, err) }) t.Run("succeed", func(t *testing.T) { diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index ae5c549f65..7d4b21e2ef 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -19,18 +19,18 @@ package storage import ( "fmt" - "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" ) // pkStatistics contains pk field statistic information type PkStatistics struct { - PkFilter *bloom.BloomFilter // bloom filter of pk inside a segment - MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment - MaxPK PrimaryKey // maximal pk value, same above + PkFilter bloomfilter.BloomFilterInterface // bloom filter of pk inside a segment + MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment + MaxPK PrimaryKey // maximal pk value, same above } // update set pk min/max value if input value is beyond former range. @@ -109,16 +109,16 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool { } // Locations returns a list of hash locations representing a data item. -func Locations(pk PrimaryKey, k uint) []uint64 { +func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 { switch pk.Type() { case schemapb.DataType_Int64: buf := make([]byte, 8) int64Pk := pk.(*Int64PrimaryKey) common.Endian.PutUint64(buf, uint64(int64Pk.Value)) - return bloom.Locations(buf, k) + return bloomfilter.Locations(buf, k, bfType) case schemapb.DataType_VarChar: varCharPk := pk.(*VarCharPrimaryKey) - return bloom.Locations([]byte(varCharPk.Value), k) + return bloomfilter.Locations([]byte(varCharPk.Value), k, bfType) default: // TODO:: } @@ -147,7 +147,7 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { } // check bf first, TestLocation just do some bitset compute, cost is cheaper - if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K())) { + if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K(), st.PkFilter.Type())) { return false } @@ -158,18 +158,30 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { // LocationsCache is a helper struct caching pk bloom filter locations. // Note that this helper is not concurrent safe and shall be used in same goroutine. type LocationsCache struct { - pk PrimaryKey - k uint - locations []uint64 + pk PrimaryKey + basicBFLocations []uint64 + blockBFLocations []uint64 } -func (lc *LocationsCache) Locations(k uint) []uint64 { - if k > lc.k { - lc.k = k - lc.locations = Locations(lc.pk, lc.k) - } +func (lc *LocationsCache) GetPk() PrimaryKey { + return lc.pk +} - return lc.locations[:k] +func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64 { + switch bfType { + case bloomfilter.BasicBF: + if int(k) > len(lc.basicBFLocations) { + lc.basicBFLocations = Locations(lc.pk, k, bfType) + } + return lc.basicBFLocations[:k] + case bloomfilter.BlockedBF: + if int(k) > len(lc.blockBFLocations) { + lc.blockBFLocations = Locations(lc.pk, k, bfType) + } + return lc.blockBFLocations[:k] + default: + return nil + } } func NewLocationsCache(pk PrimaryKey) *LocationsCache { diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 7914e04b80..75da19ab5e 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -20,9 +20,10 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -31,13 +32,14 @@ import ( // PrimaryKeyStats contains statistics data for pk column type PrimaryKeyStats struct { - FieldID int64 `json:"fieldID"` - Max int64 `json:"max"` // useless, will delete - Min int64 `json:"min"` // useless, will delete - BF *bloom.BloomFilter `json:"bf"` - PkType int64 `json:"pkType"` - MaxPk PrimaryKey `json:"maxPk"` - MinPk PrimaryKey `json:"minPk"` + FieldID int64 `json:"fieldID"` + Max int64 `json:"max"` // useless, will delete + Min int64 `json:"min"` // useless, will delete + BFType bloomfilter.BFType `json:"bfType"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + PkType int64 `json:"pkType"` + MaxPk PrimaryKey `json:"maxPk"` + MinPk PrimaryKey `json:"minPk"` } // UnmarshalJSON unmarshal bytes to PrimaryKeyStats @@ -110,12 +112,22 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } return nil @@ -189,10 +201,16 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) if rowNum <= 0 { return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) } + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &PrimaryKeyStats{ FieldID: fieldID, PkType: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -228,10 +246,15 @@ func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error { // GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error { + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: fieldID, PkType: int64(pkType), - BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msgs.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msgs) diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index 709f49697f..cccd3d9f9e 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestStatsWriter_Int64PrimaryKey(t *testing.T) { @@ -124,11 +125,13 @@ func TestStatsWriter_UpgradePrimaryKey(t *testing.T) { Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, bfType), } b := make([]byte, 8) @@ -174,3 +177,30 @@ func TestDeserializeEmptyStats(t *testing.T) { _, err := DeserializeStats([]*Blob{blob}) assert.NoError(t, err) } + +func TestMarshalStats(t *testing.T) { + stat, err := NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 100000) + assert.NoError(t, err) + + for i := 0; i < 10000; i++ { + stat.Update(NewInt64PrimaryKey(int64(i))) + } + + sw := &StatsWriter{} + sw.GenerateList([]*PrimaryKeyStats{stat}) + bytes := sw.GetBuffer() + + sr := &StatsReader{} + sr.SetBuffer(bytes) + stat1, err := sr.GetPrimaryKeyStatsList() + assert.NoError(t, err) + assert.Equal(t, 1, len(stat1)) + assert.Equal(t, stat.Min, stat1[0].Min) + assert.Equal(t, stat.Max, stat1[0].Max) + + for i := 0; i < 10000; i++ { + b := make([]byte, 8) + common.Endian.PutUint64(b, uint64(i)) + assert.True(t, stat1[0].BF.Test(b)) + } +} diff --git a/internal/util/bloomfilter/bloom_filter.go b/internal/util/bloomfilter/bloom_filter.go new file mode 100644 index 0000000000..778597844e --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter.go @@ -0,0 +1,297 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "encoding/json" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/cockroachdb/errors" + "github.com/greatroar/blobloom" + "github.com/pingcap/log" + "github.com/zeebo/xxh3" + "go.uber.org/zap" +) + +type BFType int + +var AlwaysTrueBloomFilter = &alwaysTrueBloomFilter{} + +const ( + UnsupportedBFName = "Unsupported BloomFilter" + BlockBFName = "BlockedBloomFilter" + BasicBFName = "BasicBloomFilter" + AlwaysTrueBFName = "AlwaysTrueBloomFilter" +) + +const ( + UnsupportedBF BFType = iota + 1 + AlwaysTrueBF // empty bloom filter + BasicBF + BlockedBF +) + +var bfNames = map[BFType]string{ + BasicBF: BlockBFName, + BlockedBF: BasicBFName, + AlwaysTrueBF: AlwaysTrueBFName, + UnsupportedBF: UnsupportedBFName, +} + +func (t BFType) String() string { + return bfNames[t] +} + +func BFTypeFromString(name string) BFType { + switch name { + case BasicBFName: + return BasicBF + case BlockBFName: + return BlockedBF + case AlwaysTrueBFName: + return AlwaysTrueBF + default: + return UnsupportedBF + } +} + +type BloomFilterInterface interface { + Type() BFType + Cap() uint + K() uint + Add(data []byte) + AddString(data string) + Test(data []byte) bool + TestString(data string) bool + TestLocations(locs []uint64) bool + MarshalJSON() ([]byte, error) + UnmarshalJSON(data []byte) error +} + +type basicBloomFilter struct { + inner *bloom.BloomFilter + k uint +} + +func newBasicBloomFilter(capacity uint, fp float64) *basicBloomFilter { + inner := bloom.NewWithEstimates(capacity, fp) + return &basicBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *basicBloomFilter) Type() BFType { + return BasicBF +} + +func (b *basicBloomFilter) Cap() uint { + return b.inner.Cap() +} + +func (b *basicBloomFilter) K() uint { + return b.k +} + +func (b *basicBloomFilter) Add(data []byte) { + b.inner.Add(data) +} + +func (b *basicBloomFilter) AddString(data string) { + b.inner.AddString(data) +} + +func (b *basicBloomFilter) Test(data []byte) bool { + return b.inner.Test(data) +} + +func (b *basicBloomFilter) TestString(data string) bool { + return b.inner.TestString(data) +} + +func (b *basicBloomFilter) TestLocations(locs []uint64) bool { + return b.inner.TestLocations(locs[:b.k]) +} + +func (b basicBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *basicBloomFilter) UnmarshalJSON(data []byte) error { + inner := &bloom.BloomFilter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + return nil +} + +// impl Blocked Bloom filter with blobloom and xxh3 hash +type blockedBloomFilter struct { + inner *blobloom.Filter + k uint +} + +func newBlockedBloomFilter(capacity uint, fp float64) *blockedBloomFilter { + inner := blobloom.NewOptimized(blobloom.Config{ + Capacity: uint64(capacity), + FPRate: fp, + }) + return &blockedBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *blockedBloomFilter) Type() BFType { + return BlockedBF +} + +func (b *blockedBloomFilter) Cap() uint { + return uint(b.inner.NumBits()) +} + +func (b *blockedBloomFilter) K() uint { + return b.k +} + +func (b *blockedBloomFilter) Add(data []byte) { + loc := xxh3.Hash(data) + b.inner.Add(loc) +} + +func (b *blockedBloomFilter) AddString(data string) { + h := xxh3.HashString(data) + b.inner.Add(h) +} + +func (b *blockedBloomFilter) Test(data []byte) bool { + loc := xxh3.Hash(data) + return b.inner.Has(loc) +} + +func (b *blockedBloomFilter) TestString(data string) bool { + h := xxh3.HashString(data) + return b.inner.Has(h) +} + +func (b *blockedBloomFilter) TestLocations(locs []uint64) bool { + return b.inner.TestLocations(locs) +} + +func (b blockedBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *blockedBloomFilter) UnmarshalJSON(data []byte) error { + inner := &blobloom.Filter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + + return nil +} + +// always true bloom filter is used when deserialize stat log failed. +// Notice: add item to empty bloom filter is not permitted. and all Test Func will return false positive. +type alwaysTrueBloomFilter struct{} + +func (b *alwaysTrueBloomFilter) Type() BFType { + return AlwaysTrueBF +} + +func (b *alwaysTrueBloomFilter) Cap() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) K() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) Add(data []byte) { +} + +func (b *alwaysTrueBloomFilter) AddString(data string) { +} + +func (b *alwaysTrueBloomFilter) Test(data []byte) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestString(data string) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool { + return true +} + +func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) { + return []byte{}, nil +} + +func (b *alwaysTrueBloomFilter) UnmarshalJSON(data []byte) error { + return nil +} + +func NewBloomFilterWithType(capacity uint, fp float64, typeName string) BloomFilterInterface { + bfType := BFTypeFromString(typeName) + switch bfType { + case BlockedBF: + return newBlockedBloomFilter(capacity, fp) + case BasicBF: + return newBasicBloomFilter(capacity, fp) + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", typeName)) + return newBlockedBloomFilter(capacity, fp) + } +} + +func UnmarshalJSON(data []byte, bfType BFType) (BloomFilterInterface, error) { + switch bfType { + case BlockedBF: + bf := &blockedBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case BasicBF: + bf := &basicBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case AlwaysTrueBF: + return AlwaysTrueBloomFilter, nil + default: + return nil, errors.Errorf("unsupported bloom filter type: %d", bfType) + } +} + +func Locations(data []byte, k uint, bfType BFType) []uint64 { + switch bfType { + case BasicBF: + return bloom.Locations(data, k) + case BlockedBF: + return blobloom.Locations(xxh3.Hash(data), k) + case AlwaysTrueBF: + return nil + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", bfType.String())) + return nil + } +} diff --git a/internal/util/bloomfilter/bloom_filter_test.go b/internal/util/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..5774d205b9 --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter_test.go @@ -0,0 +1,220 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "fmt" + "testing" + "time" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-storage/go/common/log" +) + +func TestPerformance(t *testing.T) { + capacity := 1000000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + start1 := time.Now() + for _, key := range keys { + bf1.Add(key) + } + log.Info("Block BF construct time", zap.Duration("time", time.Since(start1))) + data, err := bf1.MarshalJSON() + assert.NoError(t, err) + log.Info("Block BF size", zap.Int("size", len(data))) + + start2 := time.Now() + for _, key := range keys { + bf1.Test(key) + } + log.Info("Block BF Test cost", zap.Duration("time", time.Since(start2))) + + bf2 := newBasicBloomFilter(uint(capacity), fpr) + start3 := time.Now() + for _, key := range keys { + bf2.Add(key) + } + log.Info("Basic BF construct time", zap.Duration("time", time.Since(start3))) + data, err = bf2.MarshalJSON() + assert.NoError(t, err) + log.Info("Basic BF size", zap.Int("size", len(data))) + + start4 := time.Now() + for _, key := range keys { + bf2.Test(key) + } + log.Info("Basic BF Test cost", zap.Duration("time", time.Since(start4))) +} + +func TestPerformance_MultiBF(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + bfNum := 100 + bfs1 := make([]*blockedBloomFilter, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + bf1.Add(key) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range keys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]*basicBloomFilter, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range keys { + bf2.Add(key) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range keys { + locations := Locations(key, bfs1[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) +} + +func TestPerformance_Capacity(t *testing.T) { + fpr := 0.001 + + for _, capacity := range []int64{100, 1000, 10000, 100000, 1000000} { + keys := make([][]byte, 0) + for i := 0; i < int(capacity); i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start1 := time.Now() + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + bf1.Add(key) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + testKeys := make([][]byte, 0) + for i := 0; i < 10000; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bf1.K(), bf1.Type()) + bf1.TestLocations(locations) + } + _, k := bloom.EstimateParameters(uint(capacity), fpr) + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)), zap.Int("k", int(k)), zap.Int64("capacity", capacity)) + } +} + +func TestMarshal(t *testing.T) { + capacity := 200000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + // test basic bf + basicBF := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range keys { + basicBF.Add(key) + } + data, err := basicBF.MarshalJSON() + assert.NoError(t, err) + basicBF2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + assert.Equal(t, basicBF.Type(), basicBF2.Type()) + + for _, key := range keys { + assert.True(t, basicBF2.Test(key)) + } + + // test block bf + blockBF := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + blockBF.Add(key) + } + data, err = blockBF.MarshalJSON() + assert.NoError(t, err) + blockBF2, err := UnmarshalJSON(data, BlockedBF) + assert.NoError(t, err) + assert.Equal(t, blockBF.Type(), blockBF.Type()) + for _, key := range keys { + assert.True(t, blockBF2.Test(key)) + } + + // test compatible with bits-and-blooms/bloom + bf := bloom.NewWithEstimates(uint(capacity), fpr) + for _, key := range keys { + bf.Add(key) + } + data, err = bf.MarshalJSON() + assert.NoError(t, err) + bf2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, bf2.Test(key)) + } + + // test empty bloom filter + emptyBF := AlwaysTrueBloomFilter + for _, key := range keys { + bf.Add(key) + } + data, err = emptyBF.MarshalJSON() + assert.NoError(t, err) + emptyBF2, err := UnmarshalJSON(data, AlwaysTrueBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, emptyBF2.Test(key)) + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 85067e3bad..590c7df33d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -245,6 +245,7 @@ type commonConfig struct { TTMsgEnabled ParamItem `refreshable:"true"` TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` + BloomFilterType ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"` PanicWhenPluginFail ParamItem `refreshable:"false"` } @@ -735,6 +736,15 @@ like the old password verification when updating the credential`, } p.BloomFilterSize.Init(base.mgr) + p.BloomFilterType = ParamItem{ + Key: "common.bloomFilterType", + Version: "2.4.3", + DefaultValue: "BlockedBloomFilter", + Doc: "bloom filter type, support BasicBloomFilter and BlockedBloomFilter", + Export: true, + } + p.BloomFilterType.Init(base.mgr) + p.MaxBloomFalsePositive = ParamItem{ Key: "common.maxBloomFalsePositive", Version: "2.3.2", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 1b4719efe3..34e6d409c8 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -528,6 +528,7 @@ func TestCachedParam(t *testing.T) { assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) + assert.Equal(t, "BlockedBloomFilter", params.CommonCfg.BloomFilterType.GetValue()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) diff --git a/tests/integration/bloomfilter/bloom_filter_test.go b/tests/integration/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..595ecdd025 --- /dev/null +++ b/tests/integration/bloomfilter/bloom_filter_test.go @@ -0,0 +1,196 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type BloomFilterTestSuit struct { + integration.MiniClusterSuite +} + +func (s *BloomFilterTestSuit) SetupSuite() { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") + + // disable compaction + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + + s.Require().NoError(s.SetupEmbedEtcd()) +} + +func (s *BloomFilterTestSuit) TearDownSuite() { + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + s.MiniClusterSuite.TearDownSuite() +} + +func (s *BloomFilterTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const ( + dim = 128 + dbName = "" + ) + + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: int32(channelNum), + }) + s.NoError(err) + s.True(merr.Ok(createCollectionStatus)) + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.True(merr.Ok(showCollectionsResp.Status)) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + for i := 0; i < segmentNum; i++ { + // change bf type in real time + if i%2 == 0 { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BasicBloomFilter") + } else { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BlockedBloomFilter") + } + + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, segmentRowNum, dim) + hashKeys := integration.GenerateHashKeys(segmentRowNum) + insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(segmentRowNum), + }) + s.NoError(err) + s.True(merr.Ok(insertResult.Status)) + + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData()[:segmentDeleteNum] + log.Info("========================delete expr==================", + zap.Int("length of pk", len(pks)), + ) + + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + + // flush + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + } + + // create index + createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(err) + s.True(merr.Ok(createIndexStatus)) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + for i := 1; i < replica; i++ { + s.Cluster.AddQueryNode() + } + + // load + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(replica), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.True(merr.Ok(loadStatus)) + s.WaitForLoad(ctx, collectionName) + log.Info("initCollection Done") +} + +func (s *BloomFilterTestSuit) TestLoadAndQuery() { + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 2, 10, 2000, 500) + + ctx := context.Background() + queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: "", + CollectionName: name, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + if !merr.Ok(queryResult.GetStatus()) { + log.Warn("searchResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason())) + } + s.NoError(err) + s.True(merr.Ok(queryResult.GetStatus())) + numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0] + s.Equal(numEntities, int64(15000)) +} + +func TestBloomFilter(t *testing.T) { + suite.Run(t, new(BloomFilterTestSuit)) +}