diff --git a/client/entity/collection.go b/client/entity/collection.go index f30cc05f59..cd4acc5030 100644 --- a/client/entity/collection.go +++ b/client/entity/collection.go @@ -33,6 +33,9 @@ type Collection struct { ConsistencyLevel ConsistencyLevel ShardNum int32 Properties map[string]string + + // collection update timestamp, usually used for internal change detection + UpdateTimestamp uint64 } // Partition represent partition meta in Milvus diff --git a/client/milvusclient/collection.go b/client/milvusclient/collection.go index 69f5da3dc2..94ad0830b9 100644 --- a/client/milvusclient/collection.go +++ b/client/milvusclient/collection.go @@ -95,6 +95,7 @@ func (c *Client) DescribeCollection(ctx context.Context, option DescribeCollecti ConsistencyLevel: entity.ConsistencyLevel(resp.ConsistencyLevel), ShardNum: resp.GetShardsNum(), Properties: entity.KvPairsMap(resp.GetProperties()), + UpdateTimestamp: resp.GetUpdateTimestamp(), } collection.Name = collection.Schema.CollectionName return nil diff --git a/client/milvusclient/common.go b/client/milvusclient/common.go index ea3e8e6027..7b9eb6ae2d 100644 --- a/client/milvusclient/common.go +++ b/client/milvusclient/common.go @@ -2,9 +2,14 @@ package milvusclient import ( "context" + "math" + + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/client/v2/entity" "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/retry" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -32,6 +37,11 @@ func (c *CollectionCache) GetCollection(ctx context.Context, collName string) (* return coll, err } +// Evict removes the collection cache related to the provided collection name. +func (c *CollectionCache) Evict(collName string) { + c.collections.Remove(collName) +} + // Reset clears all cached info, used when client switching env. func (c *CollectionCache) Reset() { c.collections = typeutil.NewConcurrentMap[string, *entity.Collection]() @@ -47,3 +57,24 @@ func NewCollectionCache(fetcher func(context.Context, string) (*entity.Collectio func (c *Client) getCollection(ctx context.Context, collName string) (*entity.Collection, error) { return c.collCache.GetCollection(ctx, collName) } + +func (c *Client) retryIfSchemaError(ctx context.Context, collName string, work func(ctx context.Context) (uint64, error)) error { + var lastTs uint64 = math.MaxUint64 + return retry.Handle(ctx, func() (bool, error) { + ts, err := work(ctx) + if err != nil { + // if schema error + if errors.Is(err, merr.ErrCollectionSchemaMismatch) { + sameTs := ts == lastTs + lastTs = ts + if !sameTs { + c.collCache.Evict(collName) + } + // retry if not same ts + return !sameTs, err + } + return false, err + } + return false, nil + }) +} diff --git a/client/milvusclient/common_test.go b/client/milvusclient/common_test.go new file mode 100644 index 0000000000..4128cedea6 --- /dev/null +++ b/client/milvusclient/common_test.go @@ -0,0 +1,83 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package milvusclient + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/pkg/v2/util/merr" +) + +type CommonSuite struct { + MockSuiteBase +} + +func (s *CommonSuite) TestRetryIfSchemaError() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("normal_no_error", func() { + counter := atomic.Int32{} + err := s.client.retryIfSchemaError(ctx, "test_coll", func(ctx context.Context) (uint64, error) { + counter.Add(1) + return 10, nil + }) + s.NoError(err) + s.EqualValues(1, counter.Load()) + }) + + s.Run("other_error", func() { + counter := atomic.Int32{} + err := s.client.retryIfSchemaError(ctx, "test_coll", func(ctx context.Context) (uint64, error) { + counter.Add(1) + return 10, merr.WrapErrServiceInternal("mocked") + }) + s.Error(err) + s.EqualValues(1, counter.Load()) + }) + + s.Run("transient_schema_err", func() { + counter := atomic.Int32{} + err := s.client.retryIfSchemaError(ctx, "test_coll", func(ctx context.Context) (uint64, error) { + epoch := counter.Load() + counter.Add(1) + if epoch == 0 { + return 10, merr.WrapErrCollectionSchemaMisMatch("mocked") + } + return 11, nil + }) + s.NoError(err) + s.EqualValues(2, counter.Load()) + }) + + s.Run("consistent_schema_err", func() { + counter := atomic.Int32{} + err := s.client.retryIfSchemaError(ctx, "test_coll", func(ctx context.Context) (uint64, error) { + counter.Add(1) + return 10, merr.WrapErrCollectionSchemaMisMatch("mocked") + }) + s.Error(err) + s.EqualValues(2, counter.Load()) + }) +} + +func TestCommonFunc(t *testing.T) { + suite.Run(t, new(CommonSuite)) +} diff --git a/client/milvusclient/database_example_test.go b/client/milvusclient/database_example_test.go new file mode 100644 index 0000000000..d3fa1bd399 --- /dev/null +++ b/client/milvusclient/database_example_test.go @@ -0,0 +1,80 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// nolint +package milvusclient_test + +import ( + "context" + "log" + + "github.com/milvus-io/milvus/client/v2/milvusclient" +) + +func ExampleClient_CreateDatabase() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbName := `test_db` + cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{ + Address: milvusAddr, + }) + if err != nil { + // handle err + } + + err = cli.CreateDatabase(ctx, milvusclient.NewCreateDatabaseOption(dbName)) + if err != nil { + // handle err + } +} + +func ExampleClient_CreateDatabase_withProperties() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbName := `test_db_2` + cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{ + Address: milvusAddr, + }) + if err != nil { + // handle err + } + + err = cli.CreateDatabase(ctx, milvusclient.NewCreateDatabaseOption(dbName).WithProperty("database.replica.number", 3)) + if err != nil { + // handle err + } +} + +func ExampleClient_DescribeDatabase() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbName := `test_db` + cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{ + Address: milvusAddr, + }) + if err != nil { + // handle err + } + + db, err := cli.DescribeDatabase(ctx, milvusclient.NewDescribeDatabaseOption(dbName)) + if err != nil { + // handle err + } + log.Println(db) +} diff --git a/client/milvusclient/write.go b/client/milvusclient/write.go index 863d52f790..f4dc640a76 100644 --- a/client/milvusclient/write.go +++ b/client/milvusclient/write.go @@ -18,6 +18,7 @@ package milvusclient import ( "context" + "math" "google.golang.org/grpc" @@ -33,32 +34,34 @@ type InsertResult struct { func (c *Client) Insert(ctx context.Context, option InsertOption, callOptions ...grpc.CallOption) (InsertResult, error) { result := InsertResult{} - collection, err := c.getCollection(ctx, option.CollectionName()) - if err != nil { - return result, err - } - req, err := option.InsertRequest(collection) - if err != nil { - return result, err - } - - err = c.callService(func(milvusService milvuspb.MilvusServiceClient) error { - resp, err := milvusService.Insert(ctx, req, callOptions...) - - err = merr.CheckRPCCall(resp, err) + err := c.retryIfSchemaError(ctx, option.CollectionName(), func(ctx context.Context) (uint64, error) { + collection, err := c.getCollection(ctx, option.CollectionName()) if err != nil { - return err + return math.MaxUint64, err + } + req, err := option.InsertRequest(collection) + if err != nil { + return collection.UpdateTimestamp, err } - result.InsertCount = resp.GetInsertCnt() - result.IDs, err = column.IDColumns(collection.Schema, resp.GetIDs(), 0, -1) - if err != nil { - return err - } + return collection.UpdateTimestamp, c.callService(func(milvusService milvuspb.MilvusServiceClient) error { + resp, err := milvusService.Insert(ctx, req, callOptions...) - // write back pks if needed - // pks values shall be written back to struct if receiver field exists - return option.WriteBackPKs(collection.Schema, result.IDs) + err = merr.CheckRPCCall(resp, err) + if err != nil { + return err + } + + result.InsertCount = resp.GetInsertCnt() + result.IDs, err = column.IDColumns(collection.Schema, resp.GetIDs(), 0, -1) + if err != nil { + return err + } + + // write back pks if needed + // pks values shall be written back to struct if receiver field exists + return option.WriteBackPKs(collection.Schema, result.IDs) + }) }) return result, err } @@ -89,25 +92,27 @@ type UpsertResult struct { func (c *Client) Upsert(ctx context.Context, option UpsertOption, callOptions ...grpc.CallOption) (UpsertResult, error) { result := UpsertResult{} - collection, err := c.getCollection(ctx, option.CollectionName()) - if err != nil { - return result, err - } - req, err := option.UpsertRequest(collection) - if err != nil { - return result, err - } - err = c.callService(func(milvusService milvuspb.MilvusServiceClient) error { - resp, err := milvusService.Upsert(ctx, req, callOptions...) - if err = merr.CheckRPCCall(resp, err); err != nil { - return err - } - result.UpsertCount = resp.GetUpsertCnt() - result.IDs, err = column.IDColumns(collection.Schema, resp.GetIDs(), 0, -1) + err := c.retryIfSchemaError(ctx, option.CollectionName(), func(ctx context.Context) (uint64, error) { + collection, err := c.getCollection(ctx, option.CollectionName()) if err != nil { - return err + return math.MaxUint64, err } - return nil + req, err := option.UpsertRequest(collection) + if err != nil { + return collection.UpdateTimestamp, err + } + return collection.UpdateTimestamp, c.callService(func(milvusService milvuspb.MilvusServiceClient) error { + resp, err := milvusService.Upsert(ctx, req, callOptions...) + if err = merr.CheckRPCCall(resp, err); err != nil { + return err + } + result.UpsertCount = resp.GetUpsertCnt() + result.IDs, err = column.IDColumns(collection.Schema, resp.GetIDs(), 0, -1) + if err != nil { + return err + } + return nil + }) }) return result, err } diff --git a/client/milvusclient/write_options.go b/client/milvusclient/write_options.go index fd5ac604c7..eadcc6c284 100644 --- a/client/milvusclient/write_options.go +++ b/client/milvusclient/write_options.go @@ -257,10 +257,11 @@ func (opt *columnBasedDataOption) InsertRequest(coll *entity.Collection) (*milvu return nil, err } return &milvuspb.InsertRequest{ - CollectionName: opt.collName, - PartitionName: opt.partitionName, - FieldsData: fieldsData, - NumRows: uint32(rowNum), + CollectionName: opt.collName, + PartitionName: opt.partitionName, + FieldsData: fieldsData, + NumRows: uint32(rowNum), + SchemaTimestamp: coll.UpdateTimestamp, }, nil } @@ -270,10 +271,11 @@ func (opt *columnBasedDataOption) UpsertRequest(coll *entity.Collection) (*milvu return nil, err } return &milvuspb.UpsertRequest{ - CollectionName: opt.collName, - PartitionName: opt.partitionName, - FieldsData: fieldsData, - NumRows: uint32(rowNum), + CollectionName: opt.collName, + PartitionName: opt.partitionName, + FieldsData: fieldsData, + NumRows: uint32(rowNum), + SchemaTimestamp: coll.UpdateTimestamp, }, nil }