diff --git a/.env b/.env index c74af55f76..d8d30d7c3c 100644 --- a/.env +++ b/.env @@ -16,4 +16,5 @@ MINIO_ADDRESS=minio:9000 PULSAR_ADDRESS=pulsar://pulsar:6650 ETCD_ENDPOINTS=etcd:2379 AZURITE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite:10000/devstoreaccount1;" +ENABLE_GCP_NATIVE=ON diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 24257d183c..902af35e98 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -141,7 +141,7 @@ jobs: - name: Start Service shell: bash run: | - docker compose up -d azurite + docker compose up -d azurite gcpnative - name: UT run: | chmod +x build/builder.sh @@ -193,7 +193,7 @@ jobs: - name: Start Service shell: bash run: | - docker compose up -d pulsar etcd minio azurite + docker compose up -d pulsar etcd minio azurite gcpnative - name: UT run: | chmod +x build/builder.sh diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 4904f976de..929686e27f 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -306,7 +306,8 @@ ${CMAKE_EXTRA_ARGS} \ -DUSE_ASAN=${USE_ASAN} \ -DUSE_DYNAMIC_SIMD=${USE_DYNAMIC_SIMD} \ -DCPU_ARCH=${CPU_ARCH} \ - -DINDEX_ENGINE=${INDEX_ENGINE} " + -DINDEX_ENGINE=${INDEX_ENGINE} \ + -DENABLE_GCP_NATIVE=${ENABLE_GCP_NATIVE} " if [ -z "$BUILD_WITHOUT_AZURE" ]; then CMAKE_CMD=${CMAKE_CMD}"-DAZURE_BUILD_DIR=${AZURE_BUILD_DIR} \ -DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET} " diff --git a/configs/milvus.yaml b/configs/milvus.yaml index be3343f557..b7a2e6f55f 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -133,11 +133,17 @@ minio: # aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role useIAM: false # Cloud Provider of S3. Supports: "aws", "gcp", "aliyun". + # Cloud Provider of Google Cloud Storage. Supports: "gcpnative". # You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio # You can use "gcp" for other cloud provider supports S3 API with signature v2 # You can use "aliyun" for other cloud provider uses virtual host style bucket + # You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials + # for authentication. # When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now cloudProvider: aws + # The JSON content contains the gcs service account credentials. + # Used only for the "gcpnative" cloud provider. + gcpCredentialJSON: # Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws". # Leave it empty if you want to use AWS default endpoint iamEndpoint: diff --git a/deployments/docker/dev/docker-compose-apple-silicon.yml b/deployments/docker/dev/docker-compose-apple-silicon.yml index a74652f2fe..809fbf8da6 100644 --- a/deployments/docker/dev/docker-compose-apple-silicon.yml +++ b/deployments/docker/dev/docker-compose-apple-silicon.yml @@ -67,6 +67,12 @@ services: - "16686:16686" # frontent - "14268:14268" # jaeger.thirft + gcpnative: + image: fsouza/fake-gcs-server + command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443 + ports: + - "4443:4443" + networks: default: name: milvus_dev diff --git a/deployments/docker/dev/docker-compose.yml b/deployments/docker/dev/docker-compose.yml index b772b33964..f3a0eeff08 100644 --- a/deployments/docker/dev/docker-compose.yml +++ b/deployments/docker/dev/docker-compose.yml @@ -102,6 +102,12 @@ services: depends_on: - zookeeper + gcpnative: + image: fsouza/fake-gcs-server + command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443 + ports: + - "4443:4443" + networks: default: name: milvus_dev diff --git a/docker-compose.yml b/docker-compose.yml index 3e20108b71..d1ce9ace6d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: MINIO_ADDRESS: ${MINIO_ADDRESS} CONAN_USER_HOME: /home/milvus AZURE_STORAGE_CONNECTION_STRING: ${AZURITE_CONNECTION_STRING} + ENABLE_GCP_NATIVE: ${ENABLE_GCP_NATIVE} volumes: &builder-volumes - .:/go/src/github.com/milvus-io/milvus:delegated - ${DOCKER_VOLUME_DIRECTORY:-.docker}/${IMAGE_ARCH}-${OS_NAME}-ccache:/ccache:delegated @@ -43,6 +44,7 @@ services: - minio - pulsar - azurite + - gcpnative # Command command: &builder-command > /bin/bash -c " @@ -71,6 +73,7 @@ services: MINIO_ADDRESS: ${MINIO_ADDRESS} CONAN_USER_HOME: /home/milvus AZURE_STORAGE_CONNECTION_STRING: ${AZURITE_CONNECTION_STRING} + ENABLE_GCP_NATIVE: ${ENABLE_GCP_NATIVE} volumes: &builder-volumes-gpu - .:/go/src/github.com/milvus-io/milvus:delegated - ${DOCKER_VOLUME_DIRECTORY:-.docker-gpu}/${OS_NAME}-ccache:/ccache:delegated @@ -83,6 +86,7 @@ services: - minio - pulsar - azurite + - gcpnative # Command command: &builder-command-gpu > /bin/bash -c " @@ -134,6 +138,13 @@ services: jaeger: image: jaegertracing/all-in-one:latest + gcpnative: + image: fsouza/fake-gcs-server + command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443 -external-url "http://storage.gcs.127.0.0.1.nip.io:4443" + hostname: storage.gcs.127.0.0.1.nip.io + ports: + - "4443:4443" + networks: default: name: milvus_dev diff --git a/go.mod b/go.mod index c9eacfcb45..0522e82e76 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( golang.org/x/crypto v0.25.0 golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 golang.org/x/net v0.27.0 - golang.org/x/oauth2 v0.20.0 + golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.7.0 golang.org/x/text v0.16.0 google.golang.org/grpc v1.65.0 @@ -75,7 +75,12 @@ require ( ) require ( + cloud.google.com/go v0.115.0 // indirect + cloud.google.com/go/auth v0.6.1 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect + cloud.google.com/go/iam v1.1.8 // indirect + cloud.google.com/go/storage v1.43.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.1 // indirect github.com/AthenZ/athenz v1.10.39 // indirect @@ -111,6 +116,7 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/expr-lang/expr v1.15.7 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -127,9 +133,13 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect + github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.5 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect @@ -215,6 +225,8 @@ require ( go.etcd.io/etcd/client/v2 v2.305.5 // indirect go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/raft/v3 v3.5.5 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.13.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect @@ -232,8 +244,9 @@ require ( golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gonum.org/v1/gonum v0.11.0 // indirect - google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/api v0.187.0 // indirect + google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index b68189bb4e..7f99553051 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,12 @@ cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmW cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECHg= cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8= cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0= +cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14= +cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU= +cloud.google.com/go/auth v0.6.1 h1:T0Zw1XM5c1GlpN2HYr2s+m3vr1p2wy+8VN+Z1FKxW38= +cloud.google.com/go/auth v0.6.1/go.mod h1:eFHG7zDzbXHKmjJddFG/rBlcGp6t25SwRUiEQSlO4x4= +cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= +cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -29,6 +35,8 @@ cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1h cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= +cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0= +cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -38,6 +46,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= +cloud.google.com/go/storage v1.43.0 h1:CcxnSohZwizt4LCzQHWvBf1/kvtHUn7gk9QERXPyXFs= +cloud.google.com/go/storage v1.43.0/go.mod h1:ajvxEa7WmZS1PxvKRq4bq0tFT3vMd502JwstCcYv0Q0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= @@ -242,6 +252,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= @@ -338,6 +350,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -415,12 +428,18 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= +github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA= +github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= @@ -936,9 +955,13 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0/go.mod h1:E5NNboN0UqSAki0Atn9kVwaN7I+l25gGxDqBueo/74E= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= @@ -1121,6 +1144,8 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1335,6 +1360,8 @@ google.golang.org/api v0.40.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjR google.golang.org/api v0.41.0/go.mod h1:RkxM5lITDfTzmyKFPt+wGrCJbVfniCr2ool8kTBzRTU= google.golang.org/api v0.43.0/go.mod h1:nQsDGjRXMo4lvh5hP0TKqF244gqhGcr/YSIykhUk/94= google.golang.org/api v0.44.0/go.mod h1:EBOGZqzyhtvMDoxwS97ctnh0zUmYY6CxqXsc1AvkYD8= +google.golang.org/api v0.187.0 h1:Mxs7VATVC2v7CY+7Xwm4ndkX71hpElcvx0D1Ji/p1eo= +google.golang.org/api v0.187.0/go.mod h1:KIHlTc4x7N7gKKuVsdmfBXN13yEEWXWFURWY6SBp2gk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1391,8 +1418,12 @@ google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+n google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= +google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d h1:PksQg4dV6Sem3/HkBX+Ltq8T0ke0PKIRBNBatoDTVls= +google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:s7iA721uChleev562UJO2OYB0PPT9CMFjV+Ce7VJH5M= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= +google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc= +google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c= google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= diff --git a/internal/core/src/CMakeLists.txt b/internal/core/src/CMakeLists.txt index e53cb09c50..0c17d074bd 100644 --- a/internal/core/src/CMakeLists.txt +++ b/internal/core/src/CMakeLists.txt @@ -84,6 +84,10 @@ if(DEFINED AZURE_BUILD_DIR) set(LINK_TARGETS ${LINK_TARGETS} azure_blob_chunk_manager) endif() +if (ENABLE_GCP_NATIVE) + set(LINK_TARGETS ${LINK_TARGETS} gcp-native-storage) +endif() + target_link_libraries(milvus_core ${LINK_TARGETS}) install(TARGETS milvus_core DESTINATION "${CMAKE_INSTALL_LIBDIR}") diff --git a/internal/core/src/clustering/analyze_c.cpp b/internal/core/src/clustering/analyze_c.cpp index 8df1aec71b..bc2598f16e 100644 --- a/internal/core/src/clustering/analyze_c.cpp +++ b/internal/core/src/clustering/analyze_c.cpp @@ -45,6 +45,8 @@ get_storage_config(const milvus::proto::clustering::StorageConfig& config) { storage_config.region = config.region(); storage_config.useVirtualHost = config.use_virtual_host(); storage_config.requestTimeoutMs = config.request_timeout_ms(); + storage_config.gcp_credential_json = + std::string(config.gcpcredentialjson()); return storage_config; } diff --git a/internal/core/src/common/EasyAssert.h b/internal/core/src/common/EasyAssert.h index 8458d17e01..763bd65a13 100644 --- a/internal/core/src/common/EasyAssert.h +++ b/internal/core/src/common/EasyAssert.h @@ -65,6 +65,7 @@ enum ErrorCode { MemAllocateSizeNotMatch = 2035, MmapError = 2036, OutOfRange = 2037, + GcpNativeError = 2038, KnowhereError = 2100, // timeout or cancel related. diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index af751e5da1..bf19e0dac9 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -92,6 +92,7 @@ typedef struct CStorageConfig { bool useIAM; bool useVirtualHost; int64_t requestTimeoutMs; + const char* gcp_credential_json; } CStorageConfig; typedef struct CMmapConfig { diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index f0ff6c830b..f4f4613c72 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -103,6 +103,8 @@ get_storage_config(const milvus::proto::indexcgo::StorageConfig& config) { storage_config.region = config.region(); storage_config.useVirtualHost = config.use_virtual_host(); storage_config.requestTimeoutMs = config.request_timeout_ms(); + storage_config.gcp_credential_json = + std::string(config.gcpcredentialjson()); return storage_config; } @@ -563,6 +565,8 @@ NewBuildIndexInfo(CBuildIndexInfo* c_build_index_info, storage_config.region = c_storage_config.region; storage_config.useVirtualHost = c_storage_config.useVirtualHost; storage_config.requestTimeoutMs = c_storage_config.requestTimeoutMs; + storage_config.gcp_credential_json = + std::string(c_storage_config.gcp_credential_json); *c_build_index_info = build_index_info.release(); auto status = CStatus(); diff --git a/internal/core/src/storage/CMakeLists.txt b/internal/core/src/storage/CMakeLists.txt index 0d69386e08..b88211bb87 100644 --- a/internal/core/src/storage/CMakeLists.txt +++ b/internal/core/src/storage/CMakeLists.txt @@ -15,6 +15,12 @@ # limitations under the License. add_source_at_current_directory() + +if (ENABLE_GCP_NATIVE) + add_definitions(-DENABLE_GCP_NATIVE) + add_subdirectory(gcp-native-storage) +endif() + if (DEFINED AZURE_BUILD_DIR) add_definitions(-DAZURE_BUILD_DIR) include_directories(azure-blob-storage) diff --git a/internal/core/src/storage/Types.h b/internal/core/src/storage/Types.h index cde1484fd1..281c9341cf 100644 --- a/internal/core/src/storage/Types.h +++ b/internal/core/src/storage/Types.h @@ -101,6 +101,8 @@ struct StorageConfig { bool useIAM = false; bool useVirtualHost = false; int64_t requestTimeoutMs = 3000; + bool gcp_native_without_auth = false; + std::string gcp_credential_json = ""; std::string ToString() const { @@ -113,7 +115,9 @@ struct StorageConfig { << ", sslCACert=" << sslCACert.size() // only print cert length << ", useIAM=" << std::boolalpha << useIAM << ", useVirtualHost=" << std::boolalpha << useVirtualHost - << ", requestTimeoutMs=" << requestTimeoutMs << "]"; + << ", requestTimeoutMs=" << requestTimeoutMs + << ", gcp_native_without_auth=" << std::boolalpha + << gcp_native_without_auth << "]"; return ss.str(); } diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index ba2ef5103c..4efdd45d8c 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -29,6 +29,9 @@ #ifdef AZURE_BUILD_DIR #include "storage/azure/AzureChunkManager.h" #endif +#ifdef ENABLE_GCP_NATIVE +#include "storage/gcp-native-storage/GcpNativeChunkManager.h" +#endif #include "storage/ChunkManager.h" #include "storage/DiskFileManagerImpl.h" #include "storage/InsertData.h" @@ -59,6 +62,7 @@ enum class CloudProviderType : int8_t { ALIYUN = 3, AZURE = 4, TENCENTCLOUD = 5, + GCPNATIVE = 6, }; std::map CloudProviderType_Map = { @@ -66,7 +70,8 @@ std::map CloudProviderType_Map = { {"gcp", CloudProviderType::GCP}, {"aliyun", CloudProviderType::ALIYUN}, {"azure", CloudProviderType::AZURE}, - {"tencent", CloudProviderType::TENCENTCLOUD}}; + {"tencent", CloudProviderType::TENCENTCLOUD}, + {"gcpnative", CloudProviderType::GCPNATIVE}}; std::map ReadAheadPolicy_Map = { {"normal", MADV_NORMAL}, @@ -713,6 +718,12 @@ CreateChunkManager(const StorageConfig& storage_config) { case CloudProviderType::AZURE: { return std::make_shared(storage_config); } +#endif +#ifdef ENABLE_GCP_NATIVE + case CloudProviderType::GCPNATIVE: { + return std::make_shared( + storage_config); + } #endif default: { return std::make_shared(storage_config); diff --git a/internal/core/src/storage/gcp-native-storage/CMakeLists.txt b/internal/core/src/storage/gcp-native-storage/CMakeLists.txt new file mode 100644 index 0000000000..fc18260cf8 --- /dev/null +++ b/internal/core/src/storage/gcp-native-storage/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License + +cmake_minimum_required (VERSION 3.12) +set(CMAKE_CXX_STANDARD 17) + +add_library(gcp-native-storage OBJECT GcpNativeChunkManager.cpp GcpNativeClientManager.cpp) diff --git a/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.cpp b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.cpp new file mode 100644 index 0000000000..6ef3d107a7 --- /dev/null +++ b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.cpp @@ -0,0 +1,276 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include "common/Consts.h" +#include "common/EasyAssert.h" +#include "log/Log.h" +#include "monitor/prometheus_client.h" +#include "storage/gcp-native-storage/GcpNativeChunkManager.h" + +namespace milvus { +namespace storage { + +GcpNativeChunkManager::GcpNativeChunkManager( + const StorageConfig& storage_config) + : default_bucket_name_(storage_config.bucket_name), + path_prefix_(storage_config.root_path) { + try { + client_ = std::make_unique( + storage_config.address, + storage_config.gcp_credential_json, + storage_config.useSSL, + storage_config.gcp_native_without_auth, + storage_config.requestTimeoutMs == 0 + ? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS + : storage_config.requestTimeoutMs); + } catch (std::exception& err) { + ThrowGcpNativeError("GcpNativeChunkManager", + err, + "gcp native chunk manager client creation failed, " + "error: {}, configuration: {}", + err.what(), + storage_config.ToString()); + } +} + +GcpNativeChunkManager::~GcpNativeChunkManager() { +} + +uint64_t +GcpNativeChunkManager::Size(const std::string& filepath) { + return GetObjectSize(default_bucket_name_, filepath); +} + +bool +GcpNativeChunkManager::Exist(const std::string& filepath) { + return ObjectExists(default_bucket_name_, filepath); +} + +void +GcpNativeChunkManager::Remove(const std::string& filepath) { + DeleteObject(default_bucket_name_, filepath); +} + +std::vector +GcpNativeChunkManager::ListWithPrefix(const std::string& filepath) { + return ListObjects(default_bucket_name_, filepath); +} + +uint64_t +GcpNativeChunkManager::Read(const std::string& filepath, + void* buf, + uint64_t size) { + return GetObjectBuffer(default_bucket_name_, filepath, buf, size); +} + +void +GcpNativeChunkManager::Write(const std::string& filepath, + void* buf, + uint64_t size) { + PutObjectBuffer(default_bucket_name_, filepath, buf, size); +} + +bool +GcpNativeChunkManager::BucketExists(const std::string& bucket_name) { + bool res; + try { + res = client_->BucketExists(bucket_name); + } catch (std::exception& err) { + ThrowGcpNativeError( + "BucketExists", err, "params, bucket={}", bucket_name); + } + return res; +} + +std::vector +GcpNativeChunkManager::ListBuckets() { + std::vector res; + try { + res = client_->ListBuckets(); + } catch (std::exception& err) { + ThrowGcpNativeError("ListBuckets", err, "params"); + } + return res; +} + +bool +GcpNativeChunkManager::CreateBucket(const std::string& bucket_name) { + bool res; + try { + res = client_->CreateBucket(bucket_name); + } catch (std::exception& err) { + ThrowGcpNativeError( + "CreateBucket", err, "params, bucket={}", bucket_name); + } + return res; +} + +bool +GcpNativeChunkManager::DeleteBucket(const std::string& bucket_name) { + bool res; + try { + res = client_->DeleteBucket(bucket_name); + } catch (std::exception& err) { + ThrowGcpNativeError( + "DeleteBucket", err, "params, bucket={}", bucket_name); + } + return res; +} + +bool +GcpNativeChunkManager::ObjectExists(const std::string& bucket_name, + const std::string& object_name) { + bool res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->ObjectExists(bucket_name, object_name); + monitor::internal_storage_request_latency_stat.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_stat_suc.Increment(); + } catch (std::exception& err) { + monitor::internal_storage_op_count_stat_fail.Increment(); + ThrowGcpNativeError("ObjectExists", + err, + "params, bucket={}, object={}", + bucket_name, + object_name); + } + return res; +} + +uint64_t +GcpNativeChunkManager::GetObjectSize(const std::string& bucket_name, + const std::string& object_name) { + uint64_t res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->GetObjectSize(bucket_name, object_name); + monitor::internal_storage_request_latency_stat.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_stat_suc.Increment(); + } catch (std::exception& err) { + monitor::internal_storage_op_count_stat_fail.Increment(); + ThrowGcpNativeError("GetObjectSize", + err, + "params, bucket={}, object={}", + bucket_name, + object_name); + } + return res; +} + +bool +GcpNativeChunkManager::DeleteObject(const std::string& bucket_name, + const std::string& object_name) { + bool res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->DeleteObject(bucket_name, object_name); + monitor::internal_storage_request_latency_remove.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_remove_suc.Increment(); + } catch (std::exception& err) { + monitor::internal_storage_op_count_remove_fail.Increment(); + ThrowGcpNativeError("DeleteObject", + err, + "params, bucket={}, object={}", + bucket_name, + object_name); + } + return res; +} + +bool +GcpNativeChunkManager::PutObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size) { + bool res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->PutObjectBuffer(bucket_name, object_name, buf, size); + monitor::internal_storage_request_latency_put.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_put_suc.Increment(); + monitor::internal_storage_kv_size_put.Observe(size); + } catch (std::exception& err) { + monitor::internal_storage_op_count_put_fail.Increment(); + ThrowGcpNativeError("PutObjectBuffer", + err, + "params, bucket={}, object={}", + bucket_name, + object_name); + } + return res; +} + +uint64_t +GcpNativeChunkManager::GetObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size) { + uint64_t res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->GetObjectBuffer(bucket_name, object_name, buf, size); + monitor::internal_storage_request_latency_get.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_get_suc.Increment(); + monitor::internal_storage_kv_size_get.Observe(size); + } catch (std::exception& err) { + monitor::internal_storage_op_count_get_fail.Increment(); + ThrowGcpNativeError("GetObjectBuffer", + err, + "params, bucket={}, object={}", + bucket_name, + object_name); + } + return res; +} + +std::vector +GcpNativeChunkManager::ListObjects(const std::string& bucket_name, + const std::string& prefix) { + std::vector res; + try { + auto start = std::chrono::system_clock::now(); + res = client_->ListObjects(bucket_name, prefix); + monitor::internal_storage_request_latency_list.Observe( + std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count()); + monitor::internal_storage_op_count_list_suc.Increment(); + } catch (std::exception& err) { + monitor::internal_storage_op_count_list_fail.Increment(); + ThrowGcpNativeError("ListObjects", + err, + "params, bucket={}, prefix={}", + bucket_name, + prefix); + } + return res; +} + +} // namespace storage +} // namespace milvus diff --git a/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h new file mode 100644 index 0000000000..81d1b3327c --- /dev/null +++ b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h @@ -0,0 +1,151 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include "storage/gcp-native-storage/GcpNativeClientManager.h" +#include "storage/ChunkManager.h" +#include "storage/Types.h" + +namespace milvus { +namespace storage { + +template +static SegcoreError +ThrowGcpNativeError(const std::string& func, + const std::exception& err, + const std::string& fmt_string, + Args&&... args) { + std::ostringstream oss; + const auto& message = fmt::format(fmt_string, std::forward(args)...); + oss << "Error in " << func << "[exception:" << err.what() + << ", params:" << message << "]"; + throw SegcoreError(GcpNativeError, oss.str()); +} + +/** + * @brief This GcpNativeChunkManager is used to interact with + * Google Cloud Storage (GCS) services using GcpNativeClientManager class. + */ +class GcpNativeChunkManager : public ChunkManager { + public: + explicit GcpNativeChunkManager(const StorageConfig& storage_config); + + GcpNativeChunkManager(const GcpNativeChunkManager&); + GcpNativeChunkManager& + operator=(const GcpNativeChunkManager&); + + virtual ~GcpNativeChunkManager(); + + public: + bool + Exist(const std::string& filepath) override; + + uint64_t + Size(const std::string& filepath) override; + + uint64_t + Read(const std::string& filepath, + uint64_t offset, + void* buf, + uint64_t len) override { + PanicInfo(NotImplemented, GetName() + "Read with offset not implement"); + } + + void + Write(const std::string& filepath, + uint64_t offset, + void* buf, + uint64_t len) override { + PanicInfo(NotImplemented, + GetName() + "Write with offset not implement"); + } + + uint64_t + Read(const std::string& filepath, void* buf, uint64_t len) override; + + void + Write(const std::string& filepath, void* buf, uint64_t len) override; + + std::vector + ListWithPrefix(const std::string& filepath) override; + + void + Remove(const std::string& filepath) override; + + std::string + GetName() const override { + return "GcpNativeChunkManager"; + } + + std::string + GetRootPath() const override { + return path_prefix_; + } + + public: + inline std::string + GetBucketName() { + return default_bucket_name_; + } + + inline void + SetBucketName(const std::string& bucket_name) { + default_bucket_name_ = bucket_name; + } + + bool + BucketExists(const std::string& bucket_name); + + bool + CreateBucket(const std::string& bucket_name); + + bool + DeleteBucket(const std::string& bucket_name); + + std::vector + ListBuckets(); + + bool + ObjectExists(const std::string& bucket_name, + const std::string& object_name); + uint64_t + GetObjectSize(const std::string& bucket_name, + const std::string& object_name); + bool + DeleteObject(const std::string& bucket_name, + const std::string& object_name); + bool + PutObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size); + uint64_t + GetObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size); + std::vector + ListObjects(const std::string& bucket_name, + const std::string& prefix = nullptr); + + private: + std::unique_ptr client_; + std::string default_bucket_name_; + std::string path_prefix_; +}; + +} // namespace storage +} // namespace milvus diff --git a/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.cpp b/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.cpp new file mode 100644 index 0000000000..a53a899b01 --- /dev/null +++ b/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.cpp @@ -0,0 +1,208 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include "google/cloud/storage/retry_policy.h" +#include "GcpNativeClientManager.h" + +namespace gcs = google::cloud::storage; + +namespace gcpnative { + +GcpNativeClientManager::GcpNativeClientManager( + const std::string& address, + const std::string& gcp_credential_json, + bool use_ssl, + bool gcp_native_without_auth, + int64_t request_timeout_ms) { + auto options = google::cloud::Options{}; + if (gcp_native_without_auth) { + // Create an anonymous credential for unit testing with GCS emulation. + auto credentials = + google::cloud::storage::oauth2::CreateAnonymousCredentials(); + options.set(std::move(credentials)); + } else { + if (gcp_credential_json.empty()) { + throw std::runtime_error( + "GCP native error: GCS service account credentials are " + "missing."); + } + auto credentials = + google::cloud::MakeServiceAccountCredentials(gcp_credential_json); + options.set(credentials); + } + if (request_timeout_ms > 0) { + options.set( + gcs::LimitedTimeRetryPolicy( + std::chrono::milliseconds(request_timeout_ms)) + .clone()); + } + if (!address.empty()) { + std::string complete_address = "http://"; + if (use_ssl) { + complete_address = "https://"; + } + complete_address += address; + options.set(complete_address); + } + + client_ = + std::make_unique(std::move(options)); +} + +GcpNativeClientManager::~GcpNativeClientManager() { +} + +bool +GcpNativeClientManager::BucketExists(const std::string& bucket_name) { + auto metadata = client_->GetBucketMetadata(bucket_name); + if (!metadata) { + if (metadata.status().code() == google::cloud::StatusCode::kNotFound) { + return false; + } else { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + } + return true; +} + +std::vector +GcpNativeClientManager::ListBuckets() { + std::vector buckets; + for (auto&& metadata : client_->ListBuckets()) { + if (!metadata) { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + + buckets.emplace_back(metadata->name()); + } + return buckets; +} + +bool +GcpNativeClientManager::CreateBucket(const std::string& bucket_name) { + if (BucketExists(bucket_name)) { + return false; + } + auto metadata = client_->CreateBucket(bucket_name, gcs::BucketMetadata()); + if (!metadata) { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + return true; +} + +bool +GcpNativeClientManager::DeleteBucket(const std::string& bucket_name) { + auto status = client_->DeleteBucket(bucket_name); + if (!status.ok()) { + if (status.code() == google::cloud::StatusCode::kNotFound) { + return false; + } else { + throw std::runtime_error(GetGcpNativeError(std::move(status))); + } + } + return true; +} + +bool +GcpNativeClientManager::ObjectExists(const std::string& bucket_name, + const std::string& object_name) { + auto metadata = client_->GetObjectMetadata(bucket_name, object_name); + if (!metadata) { + if (metadata.status().code() == google::cloud::StatusCode::kNotFound) { + return false; + } else { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + } + return true; +} + +int64_t +GcpNativeClientManager::GetObjectSize(const std::string& bucket_name, + const std::string& object_name) { + auto metadata = client_->GetObjectMetadata(bucket_name, object_name); + if (!metadata) { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + return metadata->size(); +} + +bool +GcpNativeClientManager::DeleteObject(const std::string& bucket_name, + const std::string& object_name) { + google::cloud::Status status = + client_->DeleteObject(bucket_name, object_name); + if (!status.ok()) { + if (status.code() == google::cloud::StatusCode::kNotFound) { + return false; + } else { + throw std::runtime_error(GetGcpNativeError(std::move(status))); + } + } + return true; +} + +bool +GcpNativeClientManager::PutObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size) { + std::string buffer(reinterpret_cast(buf), size); + auto stream = client_->WriteObject(bucket_name, object_name); + stream << buffer; + stream.Close(); + if (stream.bad()) { + throw std::runtime_error(GetGcpNativeError(stream.metadata().status())); + } + return true; +} + +uint64_t +GcpNativeClientManager::GetObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size) { + auto stream = client_->ReadObject(bucket_name, object_name); + if (stream.bad()) { + throw std::runtime_error(GetGcpNativeError(stream.status())); + } + + stream.read(reinterpret_cast(buf), size); + auto bytes_read = stream.gcount(); + stream.Close(); + + return bytes_read; +} + +std::vector +GcpNativeClientManager::ListObjects(const std::string& bucket_name, + const std::string& prefix) { + std::vector objects_vec; + for (auto&& metadata : + client_->ListObjects(bucket_name, gcs::Prefix(prefix))) { + if (!metadata) { + throw std::runtime_error( + GetGcpNativeError(std::move(metadata).status())); + } + objects_vec.emplace_back(metadata->name()); + } + + return objects_vec; +} + +} // namespace gcpnative diff --git a/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.h b/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.h new file mode 100644 index 0000000000..19e8eb8cd0 --- /dev/null +++ b/internal/core/src/storage/gcp-native-storage/GcpNativeClientManager.h @@ -0,0 +1,87 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include "google/cloud/storage/client.h" +#include "google/cloud/options.h" +#include "common/EasyAssert.h" + +namespace gcpnative { +/** + * @brief This GcpNativeClientManager is used to interact with + * Google Cloud Storage (GCS) services. + */ +class GcpNativeClientManager { + public: + explicit GcpNativeClientManager(const std::string& address, + const std::string& gcp_credential_json, + bool use_ssl, + bool gcp_native_without_auth, + int64_t request_timeout_ms = 0); + + GcpNativeClientManager(const GcpNativeClientManager&); + GcpNativeClientManager& + operator=(const GcpNativeClientManager&); + + ~GcpNativeClientManager(); + + public: + bool + BucketExists(const std::string& bucket_name); + bool + CreateBucket(const std::string& bucket_name); + bool + DeleteBucket(const std::string& bucket_name); + std::vector + ListBuckets(); + bool + ObjectExists(const std::string& bucket_name, + const std::string& object_name); + int64_t + GetObjectSize(const std::string& bucket_name, + const std::string& object_name); + bool + DeleteObject(const std::string& bucket_name, + const std::string& object_name); + bool + PutObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size); + uint64_t + GetObjectBuffer(const std::string& bucket_name, + const std::string& object_name, + void* buf, + uint64_t size); + std::vector + ListObjects(const std::string& bucket_name, + const std::string& prefix = nullptr); + + private: + inline std::string + GetGcpNativeError(google::cloud::Status&& status) { + return GetGcpNativeError(status); + } + + inline std::string + GetGcpNativeError(const google::cloud::Status& status) { + return fmt::format("Gcp native error: {} ({})", + status.message(), + static_cast(status.code())); + } + + private: + std::unique_ptr client_; +}; + +} // namespace gcpnative diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index bab1e76164..d2305b9153 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -76,6 +76,8 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) { storage_config.useVirtualHost = c_storage_config.useVirtualHost; storage_config.region = c_storage_config.region; storage_config.requestTimeoutMs = c_storage_config.requestTimeoutMs; + storage_config.gcp_credential_json = + std::string(c_storage_config.gcp_credential_json); milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init( storage_config); diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 25e2bca0ba..11799675fa 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -117,6 +117,14 @@ if (DEFINED AZURE_BUILD_DIR) include_directories("${AZURE_BUILD_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/include") endif() +if (ENABLE_GCP_NATIVE) + add_definitions(-DENABLE_GCP_NATIVE) + set(MILVUS_TEST_FILES + ${MILVUS_TEST_FILES} + test_gcp_native_chunk_manager.cpp + ) +endif() + if (LINUX) message( STATUS "Building Milvus Unit Test on Linux") option(USE_ASAN "Whether to use AddressSanitizer" OFF) diff --git a/internal/core/unittest/test_gcp_native_chunk_manager.cpp b/internal/core/unittest/test_gcp_native_chunk_manager.cpp new file mode 100644 index 0000000000..642085b8b8 --- /dev/null +++ b/internal/core/unittest/test_gcp_native_chunk_manager.cpp @@ -0,0 +1,283 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include + +#include "common/EasyAssert.h" +#include "storage/gcp-native-storage/GcpNativeChunkManager.h" +#include "storage/Util.h" + +using namespace std; +using namespace milvus; +using namespace milvus::storage; + +StorageConfig +get_default_storage_config() { + auto endpoint = "storage.gcs.127.0.0.1.nip.io:4443"; + auto access_key = ""; + auto access_value = ""; + auto root_path = "files"; + auto use_ssl = false; + auto ssl_ca_cert = ""; + auto iam_endpoint = ""; + auto bucket_name = "sample-bucket"; + bool use_iam = false; + bool gcp_native_without_auth = true; + + return StorageConfig{ + endpoint, + bucket_name, + access_key, + access_value, + root_path, + "remote", // storage_type + "gcpnative", // cloud_provider_type + iam_endpoint, + "error", // log_level + "", // region + use_ssl, + ssl_ca_cert, + use_iam, + false, // useVirtualHost + 1000, // requestTimeoutMs + gcp_native_without_auth, + "" // gcp_credential_json + }; +} + +class GcpNativeChunkManagerTest : public testing::Test { + public: + GcpNativeChunkManagerTest() { + } + ~GcpNativeChunkManagerTest() { + } + + virtual void + SetUp() { + configs_ = get_default_storage_config(); + chunk_manager_ = make_unique(configs_); + } + + protected: + std::unique_ptr chunk_manager_; + StorageConfig configs_; +}; + +TEST_F(GcpNativeChunkManagerTest, BasicFunctions) { + EXPECT_TRUE(chunk_manager_->GetName() == "GcpNativeChunkManager"); + EXPECT_TRUE(chunk_manager_->GetRootPath() == configs_.root_path); + EXPECT_TRUE(chunk_manager_->GetBucketName() == configs_.bucket_name); +} + +TEST_F(GcpNativeChunkManagerTest, BucketPositive) { + string test_bucket_name = "bucket-not-exist"; + EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), false); + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), true); + vector buckets = chunk_manager_->ListBuckets(); + EXPECT_TRUE(std::find(buckets.begin(), buckets.end(), test_bucket_name) != + buckets.end()); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); + EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), false); +} + +TEST_F(GcpNativeChunkManagerTest, BucketNegtive) { + string test_bucket_name = configs_.bucket_name; + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), false); + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + // Create an already existing bucket. + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), false); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, ObjectExist) { + string test_bucket_name = configs_.bucket_name; + string obj_path = "1/3"; + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + EXPECT_EQ(chunk_manager_->Exist(obj_path), false); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, WritePositive) { + string test_bucket_name = configs_.bucket_name; + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; + string path = "1"; + chunk_manager_->Write(path, data, sizeof(data)); + EXPECT_EQ(chunk_manager_->Exist(path), true); + EXPECT_EQ(chunk_manager_->Size(path), 5); + + int datasize = 10000; + uint8_t* bigdata = new uint8_t[datasize]; + srand((unsigned)time(NULL)); + for (int i = 0; i < datasize; ++i) { + bigdata[i] = rand() % 256; + } + chunk_manager_->Write(path, bigdata, datasize); + EXPECT_EQ(chunk_manager_->Size(path), datasize); + delete[] bigdata; + + chunk_manager_->Remove(path); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, ReadPositive) { + string test_bucket_name = configs_.bucket_name; + chunk_manager_->SetBucketName(test_bucket_name); + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + + uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; + string path = "1/4/6"; + chunk_manager_->Write(path, data, sizeof(data)); + EXPECT_EQ(chunk_manager_->Exist(path), true); + EXPECT_EQ(chunk_manager_->Size(path), sizeof(data)); + + uint8_t readdata[20] = {0}; + EXPECT_EQ(chunk_manager_->Read(path, readdata, sizeof(data)), sizeof(data)); + EXPECT_EQ(readdata[0], 0x17); + EXPECT_EQ(readdata[1], 0x32); + EXPECT_EQ(readdata[2], 0x45); + EXPECT_EQ(readdata[3], 0x34); + EXPECT_EQ(readdata[4], 0x23); + + EXPECT_EQ(chunk_manager_->Read(path, readdata, 3), 3); + EXPECT_EQ(readdata[0], 0x17); + EXPECT_EQ(readdata[1], 0x32); + EXPECT_EQ(readdata[2], 0x45); + + uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23}; + chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL)); + EXPECT_EQ(chunk_manager_->Exist(path), true); + EXPECT_EQ(chunk_manager_->Size(path), sizeof(dataWithNULL)); + EXPECT_EQ(chunk_manager_->Read(path, readdata, sizeof(dataWithNULL)), + sizeof(dataWithNULL)); + EXPECT_EQ(readdata[0], 0x17); + EXPECT_EQ(readdata[1], 0x32); + EXPECT_EQ(readdata[2], 0x00); + EXPECT_EQ(readdata[3], 0x34); + EXPECT_EQ(readdata[4], 0x23); + + chunk_manager_->Remove(path); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, ReadNotExist) { + string test_bucket_name = configs_.bucket_name; + chunk_manager_->SetBucketName(test_bucket_name); + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + string path = "1/5/8"; + uint8_t readdata[20] = {0}; + EXPECT_THROW( + try { + chunk_manager_->Read(path, readdata, sizeof(readdata)); + } catch (SegcoreError& e) { + EXPECT_TRUE(std::string(e.what()).find("Not Found") != + string::npos); + throw e; + }, + SegcoreError); + + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, RemovePositive) { + string test_bucket_name = configs_.bucket_name; + chunk_manager_->SetBucketName(test_bucket_name); + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; + string path = "1/7/8"; + chunk_manager_->Write(path, data, sizeof(data)); + + EXPECT_EQ(chunk_manager_->Exist(path), true); + chunk_manager_->Remove(path); + EXPECT_EQ(chunk_manager_->Exist(path), false); + + // test double deleted + chunk_manager_->Remove(path); + EXPECT_EQ(chunk_manager_->Exist(path), false); + + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, ListWithPrefixPositive) { + string test_bucket_name = configs_.bucket_name; + chunk_manager_->SetBucketName(test_bucket_name); + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + + string path1 = "1/7/8"; + string path2 = "1/7/4"; + string path3 = "1/4/8"; + uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; + chunk_manager_->Write(path1, data, sizeof(data)); + chunk_manager_->Write(path2, data, sizeof(data)); + chunk_manager_->Write(path3, data, sizeof(data)); + + vector objs = chunk_manager_->ListWithPrefix("1/7"); + EXPECT_EQ(objs.size(), 2); + std::sort(objs.begin(), objs.end()); + EXPECT_EQ(objs[0], "1/7/4"); + EXPECT_EQ(objs[1], "1/7/8"); + + objs = chunk_manager_->ListWithPrefix("//1/7"); + EXPECT_EQ(objs.size(), 0); + + objs = chunk_manager_->ListWithPrefix("1"); + EXPECT_EQ(objs.size(), 3); + std::sort(objs.begin(), objs.end()); + EXPECT_EQ(objs[0], "1/4/8"); + EXPECT_EQ(objs[1], "1/7/4"); + EXPECT_EQ(objs[2], "1/7/8"); + + chunk_manager_->Remove(path1); + chunk_manager_->Remove(path2); + chunk_manager_->Remove(path3); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} + +TEST_F(GcpNativeChunkManagerTest, ListWithPrefixNegative) { + string test_bucket_name = configs_.bucket_name; + chunk_manager_->SetBucketName(test_bucket_name); + EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name); + + if (!chunk_manager_->BucketExists(test_bucket_name)) { + EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true); + } + + string path = "1/4/8"; + uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; + chunk_manager_->Write(path, data, sizeof(data)); + vector objs = chunk_manager_->ListWithPrefix("1/6"); + EXPECT_EQ(objs.size(), 0); + chunk_manager_->Remove(path); + EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true); +} diff --git a/internal/core/unittest/test_remote_chunk_manager.cpp b/internal/core/unittest/test_remote_chunk_manager.cpp index a21d0ed17f..b9d58d2348 100644 --- a/internal/core/unittest/test_remote_chunk_manager.cpp +++ b/internal/core/unittest/test_remote_chunk_manager.cpp @@ -89,6 +89,12 @@ TEST_F(RemoteChunkManagerTest, BasicFunctions) { EXPECT_TRUE(the_chunk_manager_->GetName() == "AzureChunkManager"); #endif +#ifdef ENABLE_GCP_NATIVE + configs_.cloud_provider = "gcpnative"; + the_chunk_manager_ = CreateChunkManager(configs_); + EXPECT_TRUE(the_chunk_manager_->GetName() == "GcpNativeChunkManager"); +#endif + configs_.cloud_provider = ""; } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 1712a9c5d6..94816931c4 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -330,20 +330,21 @@ func createStorageConfig() *indexpb.StorageConfig { } } else { storageConfig = &indexpb.StorageConfig{ - Address: Params.MinioCfg.Address.GetValue(), - AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), - SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), - UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), - SslCACert: Params.MinioCfg.SslCACert.GetValue(), - BucketName: Params.MinioCfg.BucketName.GetValue(), - RootPath: Params.MinioCfg.RootPath.GetValue(), - UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), - IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), - StorageType: Params.CommonCfg.StorageType.GetValue(), - Region: Params.MinioCfg.Region.GetValue(), - UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(), - CloudProvider: Params.MinioCfg.CloudProvider.GetValue(), - RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), + Address: Params.MinioCfg.Address.GetValue(), + AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), + SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), + UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), + SslCACert: Params.MinioCfg.SslCACert.GetValue(), + BucketName: Params.MinioCfg.BucketName.GetValue(), + RootPath: Params.MinioCfg.RootPath.GetValue(), + UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), + IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), + StorageType: Params.CommonCfg.StorageType.GetValue(), + Region: Params.MinioCfg.Region.GetValue(), + UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(), + CloudProvider: Params.MinioCfg.CloudProvider.GetValue(), + RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), + GcpCredentialJSON: Params.MinioCfg.GcpCredentialJSON.GetValue(), } } diff --git a/internal/indexnode/chunk_mgr_factory.go b/internal/indexnode/chunk_mgr_factory.go index c68035d74d..839a05c714 100644 --- a/internal/indexnode/chunk_mgr_factory.go +++ b/internal/indexnode/chunk_mgr_factory.go @@ -39,6 +39,7 @@ func (m *chunkMgrFactory) NewChunkManager(ctx context.Context, config *indexpb.S storage.RequestTimeout(config.GetRequestTimeoutMs()), storage.Region(config.GetRegion()), storage.CreateBucket(true), + storage.GcpCredentialJSON(config.GetGcpCredentialJSON()), ) return chunkManagerFactory.NewPersistentStorageChunkManager(ctx) } diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index c2f6ae65ab..39bb694ba0 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -209,20 +209,21 @@ func (s *IndexNodeSuite) SetupTest() { s.NoError(err) s.storageConfig = &indexpb.StorageConfig{ - Address: Params.MinioCfg.Address.GetValue(), - AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), - SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), - UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), - SslCACert: Params.MinioCfg.SslCACert.GetValue(), - BucketName: Params.MinioCfg.BucketName.GetValue(), - RootPath: Params.MinioCfg.RootPath.GetValue(), - UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), - IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), - StorageType: Params.CommonCfg.StorageType.GetValue(), - Region: Params.MinioCfg.Region.GetValue(), - UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(), - CloudProvider: Params.MinioCfg.CloudProvider.GetValue(), - RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), + Address: Params.MinioCfg.Address.GetValue(), + AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), + SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), + UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), + SslCACert: Params.MinioCfg.SslCACert.GetValue(), + BucketName: Params.MinioCfg.BucketName.GetValue(), + RootPath: Params.MinioCfg.RootPath.GetValue(), + UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), + IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), + StorageType: Params.CommonCfg.StorageType.GetValue(), + Region: Params.MinioCfg.Region.GetValue(), + UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(), + CloudProvider: Params.MinioCfg.CloudProvider.GetValue(), + RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), + GcpCredentialJSON: Params.MinioCfg.GcpCredentialJSON.GetValue(), } var ( diff --git a/internal/indexnode/task_analyze.go b/internal/indexnode/task_analyze.go index 156c0a7922..796373cccc 100644 --- a/internal/indexnode/task_analyze.go +++ b/internal/indexnode/task_analyze.go @@ -92,20 +92,21 @@ func (at *analyzeTask) Execute(ctx context.Context) error { log.Info("Begin to build analyze task") storageConfig := &clusteringpb.StorageConfig{ - Address: at.req.GetStorageConfig().GetAddress(), - AccessKeyID: at.req.GetStorageConfig().GetAccessKeyID(), - SecretAccessKey: at.req.GetStorageConfig().GetSecretAccessKey(), - UseSSL: at.req.GetStorageConfig().GetUseSSL(), - BucketName: at.req.GetStorageConfig().GetBucketName(), - RootPath: at.req.GetStorageConfig().GetRootPath(), - UseIAM: at.req.GetStorageConfig().GetUseIAM(), - IAMEndpoint: at.req.GetStorageConfig().GetIAMEndpoint(), - StorageType: at.req.GetStorageConfig().GetStorageType(), - UseVirtualHost: at.req.GetStorageConfig().GetUseVirtualHost(), - Region: at.req.GetStorageConfig().GetRegion(), - CloudProvider: at.req.GetStorageConfig().GetCloudProvider(), - RequestTimeoutMs: at.req.GetStorageConfig().GetRequestTimeoutMs(), - SslCACert: at.req.GetStorageConfig().GetSslCACert(), + Address: at.req.GetStorageConfig().GetAddress(), + AccessKeyID: at.req.GetStorageConfig().GetAccessKeyID(), + SecretAccessKey: at.req.GetStorageConfig().GetSecretAccessKey(), + UseSSL: at.req.GetStorageConfig().GetUseSSL(), + BucketName: at.req.GetStorageConfig().GetBucketName(), + RootPath: at.req.GetStorageConfig().GetRootPath(), + UseIAM: at.req.GetStorageConfig().GetUseIAM(), + IAMEndpoint: at.req.GetStorageConfig().GetIAMEndpoint(), + StorageType: at.req.GetStorageConfig().GetStorageType(), + UseVirtualHost: at.req.GetStorageConfig().GetUseVirtualHost(), + Region: at.req.GetStorageConfig().GetRegion(), + CloudProvider: at.req.GetStorageConfig().GetCloudProvider(), + RequestTimeoutMs: at.req.GetStorageConfig().GetRequestTimeoutMs(), + SslCACert: at.req.GetStorageConfig().GetSslCACert(), + GcpCredentialJSON: at.req.GetStorageConfig().GetGcpCredentialJSON(), } numRowsMap := make(map[int64]int64) diff --git a/internal/indexnode/task_index.go b/internal/indexnode/task_index.go index 2b0961fd50..b9077272e2 100644 --- a/internal/indexnode/task_index.go +++ b/internal/indexnode/task_index.go @@ -248,20 +248,21 @@ func (it *indexBuildTask) Execute(ctx context.Context) error { } storageConfig := &indexcgopb.StorageConfig{ - Address: it.req.GetStorageConfig().GetAddress(), - AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(), - SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(), - UseSSL: it.req.GetStorageConfig().GetUseSSL(), - BucketName: it.req.GetStorageConfig().GetBucketName(), - RootPath: it.req.GetStorageConfig().GetRootPath(), - UseIAM: it.req.GetStorageConfig().GetUseIAM(), - IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(), - StorageType: it.req.GetStorageConfig().GetStorageType(), - UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(), - Region: it.req.GetStorageConfig().GetRegion(), - CloudProvider: it.req.GetStorageConfig().GetCloudProvider(), - RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(), - SslCACert: it.req.GetStorageConfig().GetSslCACert(), + Address: it.req.GetStorageConfig().GetAddress(), + AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(), + SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(), + UseSSL: it.req.GetStorageConfig().GetUseSSL(), + BucketName: it.req.GetStorageConfig().GetBucketName(), + RootPath: it.req.GetStorageConfig().GetRootPath(), + UseIAM: it.req.GetStorageConfig().GetUseIAM(), + IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(), + StorageType: it.req.GetStorageConfig().GetStorageType(), + UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(), + Region: it.req.GetStorageConfig().GetRegion(), + CloudProvider: it.req.GetStorageConfig().GetCloudProvider(), + RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(), + SslCACert: it.req.GetStorageConfig().GetSslCACert(), + GcpCredentialJSON: it.req.GetStorageConfig().GetGcpCredentialJSON(), } optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields())) diff --git a/internal/proto/clustering.proto b/internal/proto/clustering.proto index 02292798d3..6c41cb3b8c 100644 --- a/internal/proto/clustering.proto +++ b/internal/proto/clustering.proto @@ -20,6 +20,7 @@ message StorageConfig { string cloud_provider = 12; int64 request_timeout_ms = 13; string sslCACert = 14; + string GcpCredentialJSON = 15; } message InsertFiles { diff --git a/internal/proto/index_cgo_msg.proto b/internal/proto/index_cgo_msg.proto index 4973dd20d8..92e98100f3 100644 --- a/internal/proto/index_cgo_msg.proto +++ b/internal/proto/index_cgo_msg.proto @@ -48,6 +48,7 @@ message StorageConfig { string cloud_provider = 12; int64 request_timeout_ms = 13; string sslCACert = 14; + string GcpCredentialJSON = 15; } // Synchronously modify OptionalFieldInfo in index_coord.proto file diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 6ba6facb9f..7377954eba 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -218,6 +218,7 @@ message StorageConfig { string cloud_provider = 12; int64 request_timeout_ms = 13; string sslCACert = 14; + string GcpCredentialJSON = 15; } // Synchronously modify OptionalFieldInfo in index_cgo_msg.proto file diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 207793e356..f0fe8df10b 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -31,7 +31,8 @@ func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkMa UseVirtualHost(params.MinioCfg.UseVirtualHost.GetAsBool()), Region(params.MinioCfg.Region.GetValue()), RequestTimeout(params.MinioCfg.RequestTimeoutMs.GetAsInt64()), - CreateBucket(true)) + CreateBucket(true), + GcpCredentialJSON(params.MinioCfg.GcpCredentialJSON.GetValue())) } func NewChunkManagerFactory(persistentStorage string, opts ...Option) *ChunkManagerFactory { diff --git a/internal/storage/gcp_native_object_storage.go b/internal/storage/gcp_native_object_storage.go new file mode 100644 index 0000000000..614a5cf74f --- /dev/null +++ b/internal/storage/gcp_native_object_storage.go @@ -0,0 +1,308 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "cloud.google.com/go/storage" + "github.com/cockroachdb/errors" + "go.uber.org/zap" + "golang.org/x/oauth2/google" + "google.golang.org/api/googleapi" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/retry" +) + +type GcpNativeObjectStorage struct { + client *storage.Client +} + +func newGcpNativeObjectStorageWithConfig(ctx context.Context, c *config) (*GcpNativeObjectStorage, error) { + var client *storage.Client + var err error + + var opts []option.ClientOption + var projectId string + if c.address != "" { + completeAddress := "http://" + if c.useSSL { + completeAddress = "https://" + } + completeAddress = completeAddress + c.address + "/storage/v1/" + opts = append(opts, option.WithEndpoint(completeAddress)) + } + if c.gcpNativeWithoutAuth { + opts = append(opts, option.WithoutAuthentication()) + } else { + creds, err := google.CredentialsFromJSON(ctx, []byte(c.gcpCredentialJSON), storage.ScopeReadWrite) + if err != nil { + return nil, err + } + projectId, err = getProjectId(c.gcpCredentialJSON) + if err != nil { + return nil, err + } + opts = append(opts, option.WithCredentials(creds)) + } + + client, err = storage.NewClient(ctx, opts...) + if err != nil { + return nil, err + } + + if c.bucketName == "" { + return nil, merr.WrapErrParameterInvalidMsg("invalid empty bucket name") + } + // Check bucket validity + checkBucketFn := func() error { + bucket := client.Bucket(c.bucketName) + _, err := bucket.Attrs(ctx) + if err == storage.ErrBucketNotExist && c.createBucket { + log.Info("gcs bucket does not exist, create bucket.", zap.String("bucket name", c.bucketName)) + err = client.Bucket(c.bucketName).Create(ctx, projectId, nil) + if err != nil { + return err + } + return nil + } + return err + } + err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts)) + if err != nil { + return nil, err + } + return &GcpNativeObjectStorage{client: client}, nil +} + +func (gcs *GcpNativeObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, + offset int64, size int64, +) (FileReader, error) { + bucket := gcs.client.Bucket(bucketName) + _, err := bucket.Attrs(ctx) + if err != nil { + return nil, checkObjectStorageError(objectName, err) + } + + obj := bucket.Object(objectName) + _, err = obj.Attrs(ctx) + if err != nil { + return nil, checkObjectStorageError(objectName, err) + } + var reader *storage.Reader + if offset == 0 && size == 0 { + reader, err = obj.NewReader(ctx) + } else { + reader, err = obj.NewRangeReader(ctx, offset, size) + } + + if err != nil { + return nil, checkObjectStorageError(objectName, err) + } + return &GcsReader{reader: reader, obj: obj}, nil +} + +func (gcs *GcpNativeObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, + reader io.Reader, objectSize int64, +) error { + obj := gcs.client.Bucket(bucketName).Object(objectName) + writer := obj.NewWriter(ctx) + _, err := io.Copy(writer, reader) + if err != nil { + return checkObjectStorageError(objectName, err) + } + err = writer.Close() + if err != nil { + return checkObjectStorageError(objectName, err) + } + return nil +} + +func (gcs *GcpNativeObjectStorage) StatObject(ctx context.Context, bucketName, + objectName string, +) (int64, error) { + obj := gcs.client.Bucket(bucketName).Object(objectName) + attrs, err := obj.Attrs(ctx) + if err != nil { + return 0, checkObjectStorageError(objectName, err) + } + return attrs.Size, nil +} + +func (gcs *GcpNativeObjectStorage) WalkWithObjects(ctx context.Context, + bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc, +) error { + query := &storage.Query{ + Prefix: prefix, + } + if !recursive { + query.Delimiter = "/" + } + + it := gcs.client.Bucket(bucketName).Objects(ctx, query) + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return checkObjectStorageError(prefix, err) + } + if objAttrs.Prefix != "" { + continue + } + if !walkFunc(&ChunkObjectInfo{FilePath: objAttrs.Name, ModifyTime: objAttrs.Updated}) { + return nil + } + } + return nil +} + +func (gcs *GcpNativeObjectStorage) RemoveObject(ctx context.Context, bucketName, prefix string) error { + bucket := gcs.client.Bucket(bucketName) + query := &storage.Query{Prefix: prefix} + it := bucket.Objects(ctx, query) + + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return checkObjectStorageError(prefix, err) + } + + obj := bucket.Object(objAttrs.Name) + if err := obj.Delete(ctx); err != nil { + return checkObjectStorageError(objAttrs.Name, err) + } + } + + return nil +} + +func (gcs *GcpNativeObjectStorage) DeleteBucket(ctx context.Context, bucketName string) error { + bucket := gcs.client.Bucket(bucketName) + + err := gcs.RemoveObject(ctx, bucketName, "") + if err != nil { + return err + } + + err = bucket.Delete(ctx) + if err != nil { + return err + } + return nil +} + +type GcsReader struct { + reader *storage.Reader + obj *storage.ObjectHandle + position int64 +} + +func (gcsReader *GcsReader) Read(p []byte) (n int, err error) { + n, err = gcsReader.reader.Read(p) + if err != nil { + return n, err + } + gcsReader.position = gcsReader.position + int64(n) + return n, nil +} + +func (gcsReader *GcsReader) Close() error { + return gcsReader.reader.Close() +} + +func (gcsReader *GcsReader) ReadAt(p []byte, off int64) (n int, err error) { + reader, err := gcsReader.obj.NewRangeReader(context.Background(), off, int64(len(p))) + if err != nil { + return 0, err + } + defer reader.Close() + return io.ReadFull(reader, p) +} + +func (gcsReader *GcsReader) Seek(offset int64, whence int) (int64, error) { + var newOffset int64 + + switch whence { + case io.SeekStart: + newOffset = offset + case io.SeekCurrent: + newOffset = gcsReader.position + offset + case io.SeekEnd: + objectAttrs, err := gcsReader.obj.Attrs(context.Background()) + if err != nil { + return 0, err + } + newOffset = objectAttrs.Size + offset + default: + return 0, merr.WrapErrIoFailedReason("invalid whence") + } + + if newOffset < 0 { + return 0, merr.WrapErrIoFailedReason("negative offset") + } + + // Reset the underlying reader to the new offset + newReader, err := gcsReader.obj.NewRangeReader(context.Background(), newOffset, -1) + if err != nil { + if gErr, ok := err.(*googleapi.Error); ok { + if gErr.Code == 416 { + newReader, _ = gcsReader.obj.NewRangeReader(context.Background(), 0, 0) + } + } else { + return 0, err + } + } + + if gcsReader.reader != nil { + if err := gcsReader.reader.Close(); err != nil { + return 0, err + } + } + + // Update the reader and the current position + gcsReader.reader = newReader + gcsReader.position = newOffset + return newOffset, nil +} + +func getProjectId(gcpCredentialJSON string) (string, error) { + if gcpCredentialJSON == "" { + return "", errors.New("the JSON string is empty") + } + var data map[string]interface{} + if err := json.Unmarshal([]byte(gcpCredentialJSON), &data); err != nil { + return "", errors.New("failed to parse Google Cloud credentials as JSON") + } + propertyValue, ok := data["project_id"] + projectId := fmt.Sprintf("%v", propertyValue) + if !ok { + return "", errors.New("projectId doesn't exist") + } + return projectId, nil +} diff --git a/internal/storage/gcp_native_object_storage_test.go b/internal/storage/gcp_native_object_storage_test.go new file mode 100644 index 0000000000..56e7928b2c --- /dev/null +++ b/internal/storage/gcp_native_object_storage_test.go @@ -0,0 +1,387 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGcpNativeObjectStorage(t *testing.T) { + ctx := context.Background() + bucketName := "test-bucket" + config := config{ + address: "storage.gcs.127.0.0.1.nip.io:4443", + bucketName: bucketName, + createBucket: true, + useIAM: false, + cloudProvider: "gcpnative", + useSSL: false, + gcpNativeWithoutAuth: true, + } + + t.Run("test initialize", func(t *testing.T) { + var err error + config.bucketName = "" + _, err = newGcpNativeObjectStorageWithConfig(ctx, &config) + assert.Error(t, err) + config.bucketName = bucketName + _, err = newGcpNativeObjectStorageWithConfig(ctx, &config) + assert.Equal(t, err, nil) + }) + + t.Run("test load", func(t *testing.T) { + testCM, err := newGcpNativeObjectStorageWithConfig(ctx, &config) + assert.Equal(t, err, nil) + defer testCM.DeleteBucket(ctx, config.bucketName) + + prepareTests := []struct { + key string + value []byte + }{ + {"abc", []byte("123")}, + {"abcd", []byte("1234")}, + {"key_1", []byte("111")}, + {"key_2", []byte("222")}, + {"key_3", []byte("333")}, + } + + for _, test := range prepareTests { + err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), + int64(len(test.value))) + require.NoError(t, err) + } + + loadTests := []struct { + isvalid bool + loadKey string + expectedValue []byte + + description string + }{ + {true, "abc", []byte("123"), "load valid key abc"}, + {true, "abcd", []byte("1234"), "load valid key abcd"}, + {true, "key_1", []byte("111"), "load valid key key_1"}, + {true, "key_2", []byte("222"), "load valid key key_2"}, + {true, "key_3", []byte("333"), "load valid key key_3"}, + {false, "key_not_exist", []byte(""), "load invalid key key_not_exist"}, + {false, "/", []byte(""), "load leading slash"}, + } + + for _, test := range loadTests { + t.Run(test.description, func(t *testing.T) { + if test.isvalid { + got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024) + assert.NoError(t, err) + contentData, err := io.ReadAll(got) + assert.NoError(t, err) + assert.Equal(t, len(contentData), len(test.expectedValue)) + assert.Equal(t, test.expectedValue, contentData) + statSize, err := testCM.StatObject(ctx, config.bucketName, test.loadKey) + assert.NoError(t, err) + assert.Equal(t, statSize, int64(len(contentData))) + _, err = testCM.GetObject(ctx, config.bucketName, test.loadKey, 1, 1023) + assert.NoError(t, err) + } else { + got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024) + assert.Error(t, err) + assert.Empty(t, got) + } + }) + } + + loadWithPrefixTests := []struct { + isvalid bool + prefix string + expectedValue [][]byte + + description string + }{ + {true, "abc", [][]byte{[]byte("123"), []byte("1234")}, "load with valid prefix abc"}, + {true, "key_", [][]byte{[]byte("111"), []byte("222"), []byte("333")}, "load with valid prefix key_"}, + {true, "prefix", [][]byte{}, "load with valid but not exist prefix prefix"}, + } + + for _, test := range loadWithPrefixTests { + t.Run(test.description, func(t *testing.T) { + gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, + test.prefix, false) + assert.NoError(t, err) + assert.Equal(t, len(test.expectedValue), len(gotk)) + for _, key := range gotk { + err := testCM.RemoveObject(ctx, config.bucketName, key) + assert.NoError(t, err) + } + }) + } + }) + + t.Run("test list", func(t *testing.T) { + testCM, err := newGcpNativeObjectStorageWithConfig(ctx, &config) + assert.Equal(t, err, nil) + defer testCM.DeleteBucket(ctx, config.bucketName) + + prepareTests := []struct { + valid bool + key string + value []byte + }{ + {false, "abc/", []byte("123")}, + {true, "abc/d/", []byte("1234")}, + {false, "abc/d/e", []byte("12345")}, + {true, "abc/e/d", []byte("12354")}, + {true, "key_/1/1", []byte("111")}, + {true, "key_/1/2/", []byte("222")}, + {false, "key_/1/2/3", []byte("333")}, + {true, "key_/2/3", []byte("333")}, + {true, "key_/test.txt", []byte("333")}, + } + + for _, test := range prepareTests { + err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), + int64(len(test.value))) + require.Nil(t, err) + if !test.valid { + err := testCM.RemoveObject(ctx, config.bucketName, test.key) + require.Nil(t, err) + } + } + + insertWithPrefixTests := []struct { + recursive bool + prefix string + expectedValue []string + }{ + {true, "abc/", []string{"abc/e/d"}}, + {true, "key_/", []string{"key_/1/1", "key_/2/3", "key_/test.txt"}}, + {false, "abc/", []string{}}, + {false, "key_/", []string{"key_/test.txt"}}, + } + + for _, test := range insertWithPrefixTests { + t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) { + gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, + test.prefix, test.recursive) + assert.NoError(t, err) + assert.Equal(t, len(test.expectedValue), len(gotk)) + for _, key := range gotk { + assert.Contains(t, test.expectedValue, key) + } + }) + } + }) +} + +func TestGcpNativeReadFile(t *testing.T) { + ctx := context.Background() + bucketName := "test-bucket" + c := &config{ + address: "storage.gcs.127.0.0.1.nip.io:4443", + bucketName: bucketName, + createBucket: true, + useIAM: false, + cloudProvider: "gcpnative", + useSSL: false, + gcpNativeWithoutAuth: true, + } + rcm, err := NewRemoteChunkManager(ctx, c) + + t.Run("Read", func(t *testing.T) { + filePath := "test-Read" + data := []byte("Test data for Read.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + buffer = make([]byte, 6) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 6, n) + assert.Equal(t, " data ", string(buffer)) + + buffer = make([]byte, 40) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 9, n) + assert.Equal(t, "for Read.", string(buffer[:9])) + }) + + t.Run("ReadAt", func(t *testing.T) { + filePath := "test-ReadAt" + data := []byte("Test data for ReadAt.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.ReadAt(buffer, 5) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "data", string(buffer)) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + buffer = make([]byte, 4) + n, err = reader.ReadAt(buffer, 20) + assert.Error(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, ".", string(buffer[:1])) + + buffer = make([]byte, 4) + n, err = reader.ReadAt(buffer, 25) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek start", func(t *testing.T) { + filePath := "test-SeekStart" + data := []byte("Test data for Seek start.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + offset, err := reader.Seek(10, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(10), offset) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "for ", string(buffer)) + + offset, err = reader.Seek(40, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(40), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek current", func(t *testing.T) { + filePath := "test-SeekCurrent" + data := []byte("Test data for Seek current.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + offset, err := reader.Seek(10, io.SeekCurrent) + assert.NoError(t, err) + assert.Equal(t, int64(14), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Seek", string(buffer)) + + offset, err = reader.Seek(40, io.SeekCurrent) + assert.NoError(t, err) + assert.Equal(t, int64(58), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek end", func(t *testing.T) { + filePath := "test-SeekEnd" + data := []byte("Test data for Seek end.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + offset, err := reader.Seek(10, io.SeekEnd) + assert.NoError(t, err) + assert.Equal(t, int64(33), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + + offset, err = reader.Seek(10, 3) // Invalid whence + assert.Error(t, err) + assert.Equal(t, int64(0), offset) + }) + + t.Run("Close", func(t *testing.T) { + filePath := "test-Close" + data := []byte("Test data for Close.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + err = reader.Close() + assert.NoError(t, err) + }) +} diff --git a/internal/storage/options.go b/internal/storage/options.go index 14c8f08845..f85028ef66 100644 --- a/internal/storage/options.go +++ b/internal/storage/options.go @@ -2,20 +2,22 @@ package storage // Option for setting params used by chunk manager client. type config struct { - address string - bucketName string - accessKeyID string - secretAccessKeyID string - useSSL bool - sslCACert string - createBucket bool - rootPath string - useIAM bool - cloudProvider string - iamEndpoint string - useVirtualHost bool - region string - requestTimeoutMs int64 + address string + bucketName string + accessKeyID string + secretAccessKeyID string + useSSL bool + sslCACert string + createBucket bool + rootPath string + useIAM bool + cloudProvider string + iamEndpoint string + useVirtualHost bool + region string + requestTimeoutMs int64 + gcpCredentialJSON string + gcpNativeWithoutAuth bool // used for Unit Testing } func newDefaultConfig() *config { @@ -108,3 +110,9 @@ func RequestTimeout(requestTimeoutMs int64) Option { c.requestTimeoutMs = requestTimeoutMs } } + +func GcpCredentialJSON(gcpCredentialJSON string) Option { + return func(c *config) { + c.gcpCredentialJSON = gcpCredentialJSON + } +} diff --git a/internal/storage/remote_chunk_manager.go b/internal/storage/remote_chunk_manager.go index b3a34d5117..f510fe2e50 100644 --- a/internal/storage/remote_chunk_manager.go +++ b/internal/storage/remote_chunk_manager.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "io" + "net/http" "strings" "github.com/Azure/azure-sdk-for-go/sdk/azcore" @@ -29,6 +30,7 @@ import ( "go.uber.org/zap" "golang.org/x/exp/mmap" "golang.org/x/sync/errgroup" + "google.golang.org/api/googleapi" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -38,11 +40,12 @@ import ( ) const ( - CloudProviderGCP = "gcp" - CloudProviderAWS = "aws" - CloudProviderAliyun = "aliyun" - CloudProviderAzure = "azure" - CloudProviderTencent = "tencent" + CloudProviderGCP = "gcp" + CloudProviderGCPNative = "gcpnative" + CloudProviderAWS = "aws" + CloudProviderAliyun = "aliyun" + CloudProviderAzure = "azure" + CloudProviderTencent = "tencent" ) // ChunkObjectWalkFunc is the callback function for walking objects. @@ -78,6 +81,8 @@ func NewRemoteChunkManager(ctx context.Context, c *config) (*RemoteChunkManager, var err error if c.cloudProvider == CloudProviderAzure { client, err = newAzureObjectStorageWithConfig(ctx, c) + } else if c.cloudProvider == CloudProviderGCPNative { + client, err = newGcpNativeObjectStorageWithConfig(ctx, c) } else { client, err = newMinioObjectStorageWithConfig(ctx, c) } @@ -403,6 +408,11 @@ func checkObjectStorageError(fileName string, err error) error { return merr.WrapErrIoKeyNotFound(fileName, err.Error()) } return merr.WrapErrIoFailed(fileName, err) + case *googleapi.Error: + if err.Code == http.StatusNotFound { + return merr.WrapErrIoKeyNotFound(fileName, err.Error()) + } + return merr.WrapErrIoFailed(fileName, err) } if err == io.ErrUnexpectedEOF { return merr.WrapErrIoUnexpectEOF(fileName, err) diff --git a/internal/util/indexcgowrapper/build_index_info.go b/internal/util/indexcgowrapper/build_index_info.go index 72523a840e..755a7b93d2 100644 --- a/internal/util/indexcgowrapper/build_index_info.go +++ b/internal/util/indexcgowrapper/build_index_info.go @@ -52,6 +52,7 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) { cRegion := C.CString(config.Region) cCloudProvider := C.CString(config.CloudProvider) cSslCACert := C.CString(config.SslCACert) + cGcpCredentialJSON := C.CString(config.GcpCredentialJSON) defer C.free(unsafe.Pointer(cAddress)) defer C.free(unsafe.Pointer(cBucketName)) defer C.free(unsafe.Pointer(cAccessKey)) @@ -62,21 +63,23 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) { defer C.free(unsafe.Pointer(cRegion)) defer C.free(unsafe.Pointer(cCloudProvider)) defer C.free(unsafe.Pointer(cSslCACert)) + defer C.free(unsafe.Pointer(cGcpCredentialJSON)) storageConfig := C.CStorageConfig{ - address: cAddress, - bucket_name: cBucketName, - access_key_id: cAccessKey, - access_key_value: cAccessValue, - root_path: cRootPath, - storage_type: cStorageType, - iam_endpoint: cIamEndPoint, - cloud_provider: cCloudProvider, - useSSL: C.bool(config.UseSSL), - sslCACert: cSslCACert, - useIAM: C.bool(config.UseIAM), - region: cRegion, - useVirtualHost: C.bool(config.UseVirtualHost), - requestTimeoutMs: C.int64_t(config.RequestTimeoutMs), + address: cAddress, + bucket_name: cBucketName, + access_key_id: cAccessKey, + access_key_value: cAccessValue, + root_path: cRootPath, + storage_type: cStorageType, + iam_endpoint: cIamEndPoint, + cloud_provider: cCloudProvider, + useSSL: C.bool(config.UseSSL), + sslCACert: cSslCACert, + useIAM: C.bool(config.UseIAM), + region: cRegion, + useVirtualHost: C.bool(config.UseVirtualHost), + requestTimeoutMs: C.int64_t(config.RequestTimeoutMs), + gcp_credential_json: cGcpCredentialJSON, } status := C.NewBuildIndexInfo(&cBuildIndexInfo, storageConfig) diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index a43a16e2dc..88a4f4ae5a 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -138,6 +138,7 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error { cLogLevel := C.CString(params.MinioCfg.LogLevel.GetValue()) cRegion := C.CString(params.MinioCfg.Region.GetValue()) cSslCACert := C.CString(params.MinioCfg.SslCACert.GetValue()) + cGcpCredentialJSON := C.CString(params.MinioCfg.GcpCredentialJSON.GetValue()) defer C.free(unsafe.Pointer(cAddress)) defer C.free(unsafe.Pointer(cBucketName)) defer C.free(unsafe.Pointer(cAccessKey)) @@ -149,22 +150,24 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error { defer C.free(unsafe.Pointer(cRegion)) defer C.free(unsafe.Pointer(cCloudProvider)) defer C.free(unsafe.Pointer(cSslCACert)) + defer C.free(unsafe.Pointer(cGcpCredentialJSON)) storageConfig := C.CStorageConfig{ - address: cAddress, - bucket_name: cBucketName, - access_key_id: cAccessKey, - access_key_value: cAccessValue, - root_path: cRootPath, - storage_type: cStorageType, - iam_endpoint: cIamEndPoint, - cloud_provider: cCloudProvider, - useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()), - sslCACert: cSslCACert, - useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()), - log_level: cLogLevel, - region: cRegion, - useVirtualHost: C.bool(params.MinioCfg.UseVirtualHost.GetAsBool()), - requestTimeoutMs: C.int64_t(params.MinioCfg.RequestTimeoutMs.GetAsInt64()), + address: cAddress, + bucket_name: cBucketName, + access_key_id: cAccessKey, + access_key_value: cAccessValue, + root_path: cRootPath, + storage_type: cStorageType, + iam_endpoint: cIamEndPoint, + cloud_provider: cCloudProvider, + useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()), + sslCACert: cSslCACert, + useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()), + log_level: cLogLevel, + region: cRegion, + useVirtualHost: C.bool(params.MinioCfg.UseVirtualHost.GetAsBool()), + requestTimeoutMs: C.int64_t(params.MinioCfg.RequestTimeoutMs.GetAsInt64()), + gcp_credential_json: cGcpCredentialJSON, } status := C.InitRemoteChunkManagerSingleton(storageConfig) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index d38d7f8988..01258842b8 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -1115,6 +1115,7 @@ type MinioConfig struct { RootPath ParamItem `refreshable:"false"` UseIAM ParamItem `refreshable:"false"` CloudProvider ParamItem `refreshable:"false"` + GcpCredentialJSON ParamItem `refreshable:"false"` IAMEndpoint ParamItem `refreshable:"false"` LogLevel ParamItem `refreshable:"false"` Region ParamItem `refreshable:"false"` @@ -1255,16 +1256,29 @@ aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/lates p.CloudProvider = ParamItem{ Key: "minio.cloudProvider", DefaultValue: DefaultMinioCloudProvider, - Version: "2.2.0", + Version: "2.4.1", Doc: `Cloud Provider of S3. Supports: "aws", "gcp", "aliyun". +Cloud Provider of Google Cloud Storage. Supports: "gcpnative". You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio You can use "gcp" for other cloud provider supports S3 API with signature v2 You can use "aliyun" for other cloud provider uses virtual host style bucket +You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials +for authentication. When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now`, Export: true, } p.CloudProvider.Init(base.mgr) + p.GcpCredentialJSON = ParamItem{ + Key: "minio.gcpCredentialJSON", + Version: "2.4.1", + DefaultValue: "", + Doc: `The JSON content contains the gcs service account credentials. +Used only for the "gcpnative" cloud provider.`, + Export: true, + } + p.GcpCredentialJSON.Init(base.mgr) + p.IAMEndpoint = ParamItem{ Key: "minio.iamEndpoint", DefaultValue: DefaultMinioIAMEndpoint, diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 60eaf9edfc..e213b1aed8 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -204,6 +204,8 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, Params.IAMEndpoint.GetValue(), "") + assert.Equal(t, Params.GcpCredentialJSON.GetValue(), "") + t.Logf("Minio BucketName = %s", Params.BucketName.GetValue()) t.Logf("Minio rootpath = %s", Params.RootPath.GetValue()) diff --git a/scripts/core_build.sh b/scripts/core_build.sh index dee16e02b0..4b78c0480d 100755 --- a/scripts/core_build.sh +++ b/scripts/core_build.sh @@ -101,6 +101,7 @@ USE_ASAN="OFF" USE_DYNAMIC_SIMD="ON" USE_OPENDAL="OFF" INDEX_ENGINE="KNOWHERE" +: "${ENABLE_GCP_NATIVE:="OFF"}" while getopts "p:d:t:s:f:n:i:y:a:x:o:ulrcghzmebZ" arg; do case $arg in @@ -256,7 +257,8 @@ ${CMAKE_EXTRA_ARGS} \ -DUSE_DYNAMIC_SIMD=${USE_DYNAMIC_SIMD} \ -DCPU_ARCH=${CPU_ARCH} \ -DUSE_OPENDAL=${USE_OPENDAL} \ --DINDEX_ENGINE=${INDEX_ENGINE} " +-DINDEX_ENGINE=${INDEX_ENGINE} \ +-DENABLE_GCP_NATIVE=${ENABLE_GCP_NATIVE} " if [ -z "$BUILD_WITHOUT_AZURE" ]; then CMAKE_CMD=${CMAKE_CMD}"-DAZURE_BUILD_DIR=${AZURE_BUILD_DIR} \ -DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET} "