feat: [ExternalTable Part1] Enable create external collection (#46886)

issue: #45881

- Add ExternalSource and ExternalSpec fields to collection schema
- Add ExternalField mapping for field schema to map external columns
- Implement ValidateExternalCollectionSchema() to enforce restrictions:
  - No primary key (virtual PK generated automatically)
  - No dynamic fields, partition keys, clustering keys, or auto ID
  - No text match or function features
  - All user fields must have external_field mapping
- Return virtual PK schema for external collections in
GetPrimaryFieldSchema()
- Skip primary key validation for external collections during creation
- Add comprehensive unit tests and integration tests
- Add design document and user guide

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
Co-authored-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
wei liu 2026-01-13 10:15:27 +08:00 committed by GitHub
parent dcdd897809
commit b79a655f1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 2684 additions and 6 deletions

View File

@ -234,6 +234,7 @@ type Field struct {
DefaultValue *schemapb.ValueField
Nullable bool
StructSchema *StructSchema
ExternalField string // external field name - maps to column name in external source
}
// ProtoMessage generates corresponding FieldSchema
@ -253,6 +254,7 @@ func (f *Field) ProtoMessage() *schemapb.FieldSchema {
ElementType: schemapb.DataType(f.ElementType),
Nullable: f.Nullable,
DefaultValue: f.DefaultValue,
ExternalField: f.ExternalField,
}
}
@ -460,6 +462,12 @@ func (f *Field) WithStructSchema(schema *StructSchema) *Field {
return f
}
// WithExternalField sets the external field name for external collection fields.
func (f *Field) WithExternalField(externalField string) *Field {
f.ExternalField = externalField
return f
}
// ReadProto parses FieldSchema
func (f *Field) ReadProto(p *schemapb.FieldSchema) *Field {
f.ID = p.GetFieldID()
@ -476,6 +484,7 @@ func (f *Field) ReadProto(p *schemapb.FieldSchema) *Field {
f.ElementType = FieldType(p.GetElementType())
f.DefaultValue = p.GetDefaultValue()
f.Nullable = p.GetNullable()
f.ExternalField = p.GetExternalField()
return f
}

View File

@ -65,6 +65,8 @@ type Schema struct {
Fields []*Field
EnableDynamicField bool
Functions []*Function
ExternalSource string // External data source (e.g., "s3://bucket/path")
ExternalSpec string // External source config (JSON)
pkField *Field
}
@ -96,6 +98,18 @@ func (s *Schema) WithDynamicFieldEnabled(dynamicEnabled bool) *Schema {
return s
}
// WithExternalSource sets the external source for the schema (e.g., "s3://bucket/path").
func (s *Schema) WithExternalSource(externalSource string) *Schema {
s.ExternalSource = externalSource
return s
}
// WithExternalSpec sets the external spec configuration (JSON format).
func (s *Schema) WithExternalSpec(externalSpec string) *Schema {
s.ExternalSpec = externalSpec
return s
}
// WithField adds a field into schema and returns schema itself.
func (s *Schema) WithField(f *Field) *Schema {
if f.PrimaryKey {
@ -117,6 +131,8 @@ func (s *Schema) ProtoMessage() *schemapb.CollectionSchema {
Description: s.Description,
AutoID: s.AutoID,
EnableDynamicField: s.EnableDynamicField,
ExternalSource: s.ExternalSource,
ExternalSpec: s.ExternalSpec,
}
r.Fields = lo.FilterMap(s.Fields, func(field *Field, _ int) (*schemapb.FieldSchema, bool) {
if field.DataType == FieldTypeArray && field.ElementType == FieldTypeStruct {
@ -162,6 +178,8 @@ func (s *Schema) ReadProto(p *schemapb.CollectionSchema) *Schema {
s.Description = p.GetDescription()
s.CollectionName = p.GetName()
s.EnableDynamicField = p.GetEnableDynamicField()
s.ExternalSource = p.GetExternalSource()
s.ExternalSpec = p.GetExternalSpec()
// fields
s.Fields = make([]*Field, 0, len(p.GetFields()))
for _, fp := range p.GetFields() {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,691 @@
# Milvus External Table User Guide
## 1. Overview
### 1.1 What is External Table
External Table (External Collection) is a special type of data collection in Milvus that allows users to directly access data stored in external storage systems (such as S3, HDFS, etc.) without copying the data into Milvus local storage.
This enables Milvus to serve as a query layer over existing data lakes while maintaining compatibility with standard Milvus query interfaces.
### 1.2 Core Benefits
- **Zero Data Copy**: Query data directly from external storage without ETL process
- **Unified Query Interface**: Use standard Search/Query APIs to query external data
- **Vector Index Support**: Build vector indexes on external data for efficient similarity search
- **Data Lake Integration**: Seamlessly integrate with existing data lake infrastructure
### 1.3 Use Cases
- Large amounts of vector data already stored in S3 or other object storage
- Need to perform vector search on data lake data
- Want to maintain separation between data storage and query engine
- Need to query periodically updated external data
---
## 2. Quick Start
### 2.1 Create External Collection
External Collection is created through the standard `CreateCollection` API by setting `external_source` in the schema.
#### Python SDK Example
```python
from pymilvus import MilvusClient, DataType
client = MilvusClient("http://localhost:19530")
# Define Schema
schema = client.create_schema()
# Add fields - must specify external_field to map to column names in external data source
schema.add_field(
field_name="text",
datatype=DataType.VARCHAR,
max_length=256,
external_field="source_text_column" # Maps to column name in external Parquet file
)
schema.add_field(
field_name="vector",
datatype=DataType.FLOAT_VECTOR,
dim=128,
external_field="embedding_column" # Maps to column name in external Parquet file
)
# Set external data source
schema.external_source = "s3://my-bucket/path/to/data"
schema.external_spec = '{"format": "parquet"}'
# Create Collection
client.create_collection(
collection_name="my_external_collection",
schema=schema
)
```
#### Go SDK Example
```go
package main
import (
"context"
"log"
"github.com/milvus-io/milvus/client/v2"
"github.com/milvus-io/milvus/client/v2/entity"
)
func main() {
ctx := context.Background()
// Connect to Milvus
cli, err := client.New(ctx, &client.ClientConfig{
Address: "localhost:19530",
})
if err != nil {
log.Fatal(err)
}
defer cli.Close(ctx)
// Define Schema with external source
schema := entity.NewSchema().
WithName("my_external_collection").
WithExternalSource("s3://my-bucket/path/to/data").
WithExternalSpec(`{"format": "parquet"}`).
WithField(entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256).
WithExternalField("source_text_column")). // Maps to external column
WithField(entity.NewField().
WithName("vector").
WithDataType(entity.FieldTypeFloatVector).
WithDim(128).
WithExternalField("embedding_column")) // Maps to external column
// Create Collection
err = cli.CreateCollection(ctx, client.NewCreateCollectionOption(
"my_external_collection",
schema,
))
if err != nil {
log.Fatal(err)
}
}
```
### 2.2 Field Mapping Rules
| Schema Parameter | Description | Example |
|-----------------|-------------|---------|
| `external_source` | External data source path | `s3://bucket/path` |
| `external_spec` | Data source configuration (JSON format) | `{"format": "parquet"}` |
| `external_field` | Maps field to external column name | Must be specified for each field |
**Note**: All user-defined fields must set `external_field` to map to column names in the external data source.
---
## 3. Supported Operations
### 3.1 Load Collection
```python
# Load External Collection into memory
client.load_collection("my_external_collection")
```
### 3.2 Vector Search
```python
# Execute vector search
results = client.search(
collection_name="my_external_collection",
data=[[0.1, 0.2, ...]], # Query vector
anns_field="vector",
limit=10,
output_fields=["text"]
)
```
### 3.3 Scalar Query
```python
# Execute scalar query
results = client.query(
collection_name="my_external_collection",
filter="text like 'hello%'",
output_fields=["text", "vector"],
limit=10
)
```
### 3.4 Create Index
```python
# Create index on vector field
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type="L2",
params={"M": 16, "efConstruction": 200}
)
client.create_index(
collection_name="my_external_collection",
index_params=index_params
)
```
### 3.5 Drop Collection
```python
# Drop External Collection
client.drop_collection("my_external_collection")
```
---
## 4. Unsupported Operations
External Collection is **read-only**. The following operations are not supported:
| Operation | Status | Description |
|-----------|--------|-------------|
| Insert | Not Supported | Data must be modified at external source |
| Delete | Not Supported | Data must be modified at external source |
| Upsert | Not Supported | Data must be modified at external source |
| Import | Not Supported | Data comes directly from external source |
| Flush | Not Supported | No local data cache |
| Add Field | Not Supported | Schema is fixed after creation |
| Alter Field | Not Supported | Schema is fixed after creation |
| Create/Drop Partition | Not Supported | Partitions not supported |
| Manual Compaction | Not Supported | Not needed |
### 4.1 Schema Restrictions
When creating an External Collection, the following features cannot be used:
| Feature | Status | Reason |
|---------|--------|--------|
| Primary Key Field | Not Allowed | System auto-generates virtual PK |
| Dynamic Field | Not Allowed | Schema must be fixed |
| Partition Key | Not Allowed | External data partitioning not supported |
| Clustering Key | Not Allowed | No clustering compaction |
| Auto ID | Not Allowed | Uses virtual PK |
| Text Match | Not Allowed | Requires internal indexing |
| Namespace Field | Not Allowed | External isolation not supported |
---
## 5. Data Updates
External table data refresh is **manually triggered** using the `RefreshExternalTable` API. This design gives you full control over when data synchronization occurs and allows you to track progress.
### 5.1 Refresh APIs
#### 5.1.1 RefreshExternalTable
Triggers a data refresh job for an external collection.
```python
# Basic refresh - re-scan current data source
response = client.refresh_external_table(
collection_name="my_external_collection"
)
job_id = response.job_id
print(f"Refresh job started: {job_id}")
# Refresh with updated data source path
response = client.refresh_external_table(
collection_name="my_external_collection",
external_source="s3://my-bucket/path/to/new_data",
external_spec='{"format": "parquet"}'
)
```
**Parameters:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collection_name` | str | Yes | Name of the external collection |
| `external_source` | str | No | New external source path (optional) |
| `external_spec` | str | No | New external spec configuration (optional) |
**Returns:** `job_id` for tracking progress
#### 5.1.2 GetRefreshExternalTableProgress
Gets the current progress and status of a refresh job.
```python
# Get progress of a specific job
progress = client.get_refresh_external_table_progress(job_id="job_123456")
print(f"State: {progress.state}") # Pending/InProgress/Completed/Failed
print(f"Progress: {progress.progress}%")
print(f"New segments: {progress.new_segments}")
print(f"Dropped segments: {progress.dropped_segments}")
print(f"Kept segments: {progress.kept_segments}")
if progress.state == "Failed":
print(f"Error: {progress.reason}")
```
**Progress States:**
| State | Description |
|-------|-------------|
| `Pending` | Job is queued, waiting to execute |
| `InProgress` | Job is currently executing |
| `Completed` | Job completed successfully |
| `Failed` | Job failed with error |
#### 5.1.3 ListRefreshExternalTableJobs
Lists all refresh jobs for a collection.
```python
# List all jobs for a specific collection
jobs = client.list_refresh_external_table_jobs(
collection_name="my_external_collection",
limit=10
)
for job in jobs:
print(f"Job: {job.job_id}")
print(f" State: {job.state}")
print(f" Progress: {job.progress}%")
print(f" Started: {job.start_time}")
print(f" Source: {job.external_source}")
# List all external table refresh jobs across all collections
all_jobs = client.list_refresh_external_table_jobs()
```
### 5.2 Complete Refresh Workflow
```python
from pymilvus import MilvusClient
import time
client = MilvusClient("http://localhost:19530")
# Step 1: Trigger refresh
response = client.refresh_external_table(
collection_name="my_external_collection"
)
job_id = response.job_id
print(f"Refresh job started: {job_id}")
# Step 2: Poll for completion
while True:
progress = client.get_refresh_external_table_progress(job_id=job_id)
print(f"Progress: {progress.progress}% ({progress.state})")
if progress.state == "Completed":
print("Refresh completed successfully!")
print(f" New segments: {progress.new_segments}")
print(f" Dropped segments: {progress.dropped_segments}")
print(f" Kept segments: {progress.kept_segments}")
break
elif progress.state == "Failed":
print(f"Refresh failed: {progress.reason}")
break
time.sleep(5) # Poll every 5 seconds
# Step 3: Re-load collection to query refreshed data
client.load_collection("my_external_collection")
```
### 5.3 Incremental Update Strategy
The system uses segment-level incremental update strategy:
1. **Keep**: Segments whose external fragments are unchanged remain intact
2. **Drop**: Segments whose corresponding external fragments are deleted/modified are removed
3. **Add**: New external fragments are organized into new segments
This strategy minimizes data reloading during updates.
**Note**: Current version does not support automatic detection of external data source changes. Users must manually trigger refresh using `refresh_external_table`.
---
## 6. Supported Data Formats
| Format | Status | Description |
|--------|--------|-------------|
| Parquet | Supported | Apache Parquet format |
---
## 7. Storage Configuration
### 7.1 S3 Configuration Example
```python
schema.external_source = "s3://my-bucket/vector-data/"
schema.external_spec = '''
{
"format": "parquet"
}
'''
```
External Collection reuses storage configuration from Milvus configuration file (`minio.*` or `s3.*` configuration items).
---
## 8. Important Notes
1. **Immutable Schema**: Schema cannot be modified after creation. Plan carefully before creation.
2. **Read-Only Mode**: All data modifications must be done at the external data source.
3. **Manual Refresh**: External data changes require manual trigger using `refresh_external_table` API. Use `get_refresh_external_table_progress` to track progress.
4. **Field Mapping**: Each field must correctly map to column names in the external data source.
5. **Data Type Matching**: Ensure Milvus field types are compatible with external data column types.
6. **Re-load After Refresh**: After refresh job completes, call `load_collection` to make the updated data available for queries.
---
## 9. Complete Example
### 9.1 Python SDK Complete Example
```python
from pymilvus import MilvusClient, DataType
# Connect to Milvus
client = MilvusClient("http://localhost:19530")
# Create Schema
schema = client.create_schema()
# Add text field
schema.add_field(
field_name="title",
datatype=DataType.VARCHAR,
max_length=512,
external_field="doc_title"
)
# Add vector field
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
external_field="text_embedding"
)
# Configure external data source
schema.external_source = "s3://my-data-lake/documents/"
schema.external_spec = '{"format": "parquet"}'
# Create External Collection
client.create_collection(
collection_name="document_search",
schema=schema
)
# Create vector index
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 32, "efConstruction": 256}
)
client.create_index("document_search", index_params)
# ============================================
# Refresh data when external source changes
# ============================================
import time
# Step 1: Trigger refresh job
response = client.refresh_external_table(
collection_name="document_search",
external_source="s3://my-data-lake/documents/v1",
external_spec='{"format": "parquet"}',
)
job_id = response.job_id
print(f"Refresh job started: {job_id}")
# Step 2: Poll for completion
while True:
progress = client.get_refresh_external_table_progress(job_id=job_id)
print(f"Progress: {progress.progress}% ({progress.state})")
if progress.state == "Completed":
print("Refresh completed!")
break
elif progress.state == "Failed":
print(f"Refresh failed: {progress.reason}")
break
time.sleep(5)
# Step 3: Re-load collection to query refreshed data
client.load_collection("document_search")
# Now search will use the refreshed data
results = client.search(
collection_name="document_search",
data=[query_embedding],
anns_field="embedding",
limit=10,
output_fields=["title"]
)
# ============================================
# List all refresh jobs for this collection
# ============================================
jobs = client.list_refresh_external_table_jobs(
collection_name="document_search"
)
for job in jobs:
print(f"Job {job.job_id}: {job.state} ({job.progress}%)")
```
### 9.2 Go SDK Complete Example
```go
package main
import (
"context"
"fmt"
"log"
"github.com/milvus-io/milvus/client/v2"
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
)
func main() {
ctx := context.Background()
// Connect to Milvus
cli, err := client.New(ctx, &client.ClientConfig{
Address: "localhost:19530",
})
if err != nil {
log.Fatal(err)
}
defer cli.Close(ctx)
collectionName := "document_search"
// ============================================
// Create External Collection
// ============================================
// Define Schema with external source
schema := entity.NewSchema().
WithName(collectionName).
WithExternalSource("s3://my-data-lake/documents/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(entity.NewField().
WithName("title").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("doc_title")).
WithField(entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("text_embedding"))
// Create Collection
err = cli.CreateCollection(ctx, client.NewCreateCollectionOption(collectionName, schema))
if err != nil {
log.Fatal(err)
}
fmt.Println("External collection created successfully")
// ============================================
// Create Vector Index
// ============================================
indexTask, err := cli.CreateIndex(ctx, client.NewCreateIndexOption(
collectionName,
"embedding",
index.NewHNSWIndex(entity.COSINE, 32, 256),
))
if err != nil {
log.Fatal(err)
}
err = indexTask.Await(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Index created successfully")
// ============================================
// Load Collection
// ============================================
loadTask, err := cli.LoadCollection(ctx, client.NewLoadCollectionOption(collectionName))
if err != nil {
log.Fatal(err)
}
err = loadTask.Await(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Collection loaded successfully")
// ============================================
// Search
// ============================================
// Query embedding (replace with actual query vector)
queryEmbedding := make([]float32, 768)
for i := range queryEmbedding {
queryEmbedding[i] = 0.1
}
results, err := cli.Search(ctx, client.NewSearchOption(
collectionName,
10, // limit
[]entity.Vector{entity.FloatVector(queryEmbedding)},
).WithANNSField("embedding").WithOutputFields("title"))
if err != nil {
log.Fatal(err)
}
for _, result := range results {
for i := 0; i < result.ResultCount; i++ {
title, _ := result.Fields.GetColumn("title").Get(i)
fmt.Printf("Result %d: title=%v, score=%f\n", i, title, result.Scores[i])
}
}
// ============================================
// Drop Collection (cleanup)
// ============================================
err = cli.DropCollection(ctx, client.NewDropCollectionOption(collectionName))
if err != nil {
log.Fatal(err)
}
fmt.Println("Collection dropped successfully")
}
```
---
## 10. Future Plans (Roadmap)
The following features are planned for future releases:
### 10.1 Scalar Index Support
Support creating scalar indexes on external collections to accelerate filtering queries:
```python
# Future: Create scalar index on external collection
index_params.add_index(
field_name="category",
index_type="INVERTED"
)
```
### 10.2 Function Support
Support embedding functions and other built-in transformation functions for external collections:
```python
# Future: Use embedding function with external collection
schema.add_function(
name="text_to_vector",
function_type=FunctionType.EMBEDDING,
input_field="text",
output_field="vector",
params={"model": "text-embedding-3-small"}
)
```
### 10.3 Schema Evolution (Add/Drop Fields)
Support adding or removing fields from external collections after creation:
```python
# Future: Add new field to external collection
client.add_field(
collection_name="my_external_collection",
field_name="new_column",
datatype=DataType.VARCHAR,
max_length=128,
external_field="source_new_column"
)
# Future: Drop field from external collection
client.drop_field(
collection_name="my_external_collection",
field_name="old_column"
)
```
### 10.4 Additional Planned Features
| Feature | Description | Priority |
|---------|-------------|----------|
| **More Data Formats** | Support Apache Iceberg, Delta Lake, ORC formats | High |
| **Auto Data Sync** | Automatic detection of external data source changes with scheduled refresh | Low |
| **Partition Mapping** | Map external data partitions to Milvus partitions | Medium |
| **Text Match** | Support full-text search on external collections | Medium |
| **Cross-source Query** | Query across multiple external data sources | Low |
| **Change Data Capture** | Support CDC-based incremental updates | Low |
---
## 11. Related Documentation
- [Design Document: External Table](../design_docs/20260105-external_table.md)

View File

@ -149,7 +149,7 @@ func newWriteNode(
// pkfield is a immutable property of the collection, so we can get it from any schema
collSchema := config.metacache.GetSchema(0)
pkField, err := typeutil.GetPrimaryFieldSchema(collSchema)
if err != nil {
if err != nil && !typeutil.IsExternalCollection(collSchema) {
return nil, err
}

View File

@ -55,6 +55,8 @@ type Collection struct {
SchemaVersion int32
ShardInfos map[string]*ShardInfo
FileResourceIds []int64
ExternalSource string
ExternalSpec string
}
type ShardInfo struct {
@ -94,6 +96,8 @@ func (c *Collection) ShallowClone() *Collection {
SchemaVersion: c.SchemaVersion,
ShardInfos: c.ShardInfos,
FileResourceIds: c.FileResourceIds,
ExternalSource: c.ExternalSource,
ExternalSpec: c.ExternalSpec,
}
}
@ -132,6 +136,8 @@ func (c *Collection) Clone() *Collection {
SchemaVersion: c.SchemaVersion,
ShardInfos: shardInfos,
FileResourceIds: slices.Clone(c.FileResourceIds),
ExternalSource: c.ExternalSource,
ExternalSpec: c.ExternalSpec,
}
}
@ -180,6 +186,8 @@ func (c *Collection) ApplyUpdates(header *message.AlterCollectionMessageHeader,
c.Functions = UnmarshalFunctionModels(updates.Schema.Functions)
c.StructArrayFields = UnmarshalStructArrayFieldModels(updates.Schema.StructArrayFields)
c.SchemaVersion = updates.Schema.Version
c.ExternalSource = updates.Schema.ExternalSource
c.ExternalSpec = updates.Schema.ExternalSpec
}
}
}
@ -238,6 +246,8 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
SchemaVersion: coll.Schema.Version,
ShardInfos: shardInfos,
FileResourceIds: coll.Schema.GetFileResourceIds(),
ExternalSource: coll.Schema.ExternalSource,
ExternalSpec: coll.Schema.ExternalSpec,
}
}
@ -290,6 +300,8 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
DbName: coll.DBName,
Version: coll.SchemaVersion,
FileResourceIds: coll.FileResourceIds,
ExternalSource: coll.ExternalSource,
ExternalSpec: coll.ExternalSpec,
}
if c.withFields {

View File

@ -25,6 +25,7 @@ type Field struct {
DefaultValue *schemapb.ValueField
ElementType schemapb.DataType
Nullable bool
ExternalField string
}
func (f *Field) Available() bool {
@ -49,6 +50,7 @@ func (f *Field) Clone() *Field {
DefaultValue: f.DefaultValue,
ElementType: f.ElementType,
Nullable: f.Nullable,
ExternalField: f.ExternalField,
}
}
@ -80,7 +82,8 @@ func (f *Field) Equal(other Field) bool {
proto.Equal(f.DefaultValue, other.DefaultValue) &&
f.ElementType == other.ElementType &&
f.IsFunctionOutput == other.IsFunctionOutput &&
f.Nullable == other.Nullable
f.Nullable == other.Nullable &&
f.ExternalField == other.ExternalField
}
func CheckFieldsEqual(fieldsA, fieldsB []*Field) bool {
@ -121,6 +124,7 @@ func MarshalFieldModel(field *Field) *schemapb.FieldSchema {
DefaultValue: proto.Clone(field.DefaultValue).(*schemapb.ValueField),
ElementType: field.ElementType,
Nullable: field.Nullable,
ExternalField: field.ExternalField,
}
}
@ -157,6 +161,7 @@ func UnmarshalFieldModel(fieldSchema *schemapb.FieldSchema) *Field {
DefaultValue: fieldSchema.DefaultValue,
ElementType: fieldSchema.ElementType,
Nullable: fieldSchema.Nullable,
ExternalField: fieldSchema.ExternalField,
}
}

View File

@ -210,6 +210,8 @@ func (node *CachedProxyServiceProvider) DescribeCollection(ctx context.Context,
Properties: c.schema.CollectionSchema.Properties,
Functions: c.schema.CollectionSchema.Functions,
DbName: c.schema.CollectionSchema.DbName,
ExternalSource: c.schema.CollectionSchema.ExternalSource,
ExternalSpec: c.schema.CollectionSchema.ExternalSpec,
}
// Restore struct field names from internal format (structName[fieldName]) to original format

View File

@ -418,6 +418,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
t.schema.AutoID = false
t.schema.DbName = t.GetDbName()
isExternalCollection := typeutil.IsExternalCollection(t.schema)
if err := typeutil.ValidateExternalCollectionSchema(t.schema); err != nil {
return err
}
disableCheck, err := common.IsDisableFuncRuntimeCheck(t.GetProperties()...)
if err != nil {
return err
@ -449,9 +454,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}
// validate primary key definition
if err := validatePrimaryKey(t.schema); err != nil {
return err
// validate primary key definition when needed
if !isExternalCollection {
if err := validatePrimaryKey(t.schema); err != nil {
return err
}
}
// validate dynamic field
@ -1007,6 +1014,8 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Schema.Description = result.Schema.Description
t.result.Schema.AutoID = result.Schema.AutoID
t.result.Schema.EnableDynamicField = result.Schema.EnableDynamicField
t.result.Schema.ExternalSource = result.Schema.ExternalSource
t.result.Schema.ExternalSpec = result.Schema.ExternalSpec
t.result.CollectionID = result.CollectionID
t.result.VirtualChannelNames = result.VirtualChannelNames
t.result.PhysicalChannelNames = result.PhysicalChannelNames
@ -1038,6 +1047,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
ElementType: field.ElementType,
Nullable: field.Nullable,
IsFunctionOutput: field.IsFunctionOutput,
ExternalField: field.GetExternalField(),
}
}

View File

@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
@ -1727,6 +1728,147 @@ func TestCreateCollectionTask(t *testing.T) {
})
}
func TestCreateCollectionTaskExternalCollection(t *testing.T) {
mix := NewMixCoordMock()
ctx := context.Background()
collectionName := "external_collection_" + funcutil.GenRandomStr()
buildExternalSchema := func() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: collectionName,
ExternalSource: "s3://bucket/prefix",
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
Name: "text_field",
DataType: schemapb.DataType_VarChar,
ExternalField: "text_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "64"},
},
},
{
FieldID: 2,
Name: "vec_field",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vec_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "16"},
},
},
},
}
}
marshal := func(schema *schemapb.CollectionSchema) []byte {
bytes, err := proto.Marshal(schema)
require.NoError(t, err)
return bytes
}
task := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
CollectionName: collectionName,
Schema: marshal(buildExternalSchema()),
ShardsNum: common.DefaultShardsNum,
},
ctx: ctx,
mixCoord: mix,
}
t.Run("valid external schema", func(t *testing.T) {
err := task.OnEnqueue()
require.NoError(t, err)
err = task.PreExecute(ctx)
require.NoError(t, err)
})
t.Run("functions forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Functions = []*schemapb.FunctionSchema{{Name: "test_func"}}
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
assert.Contains(t, err.Error(), "does not support functions")
})
t.Run("dynamic field forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.EnableDynamicField = true
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("primary key forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Fields[0].IsPrimaryKey = true
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("partition key forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Fields[0].IsPartitionKey = true
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("clustering key forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Fields[0].IsClusteringKey = true
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("auto id forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Fields[0].AutoID = true
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("text match forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.Fields[0].TypeParams = append(schema.Fields[0].TypeParams, &commonpb.KeyValuePair{
Key: "enable_match",
Value: "true",
})
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("struct field forbidden", func(t *testing.T) {
schema := buildExternalSchema()
schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{
{
FieldID: 3,
Name: "struct_array_field",
Fields: []*schemapb.FieldSchema{
{
FieldID: 4,
Name: "nested_array",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int64,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxCapacityKey, Value: "10"},
},
},
},
},
}
task.CreateCollectionRequest.Schema = marshal(schema)
err := task.PreExecute(ctx)
assert.Error(t, err)
})
}
func TestHasCollectionTask(t *testing.T) {
mixc := NewMixCoordMock()
defer mixc.Close()

View File

@ -722,7 +722,10 @@ func validatePrimaryKey(coll *schemapb.CollectionSchema) error {
}
}
if idx == -1 {
return errors.New("primary key is not specified")
// External collections may not have a primary key
if !typeutil.IsExternalCollection(coll) {
return errors.New("primary key is not specified")
}
}
for _, structArrayField := range coll.StructArrayFields {

View File

@ -177,6 +177,10 @@ func (t *createCollectionTask) validateSchema(ctx context.Context, schema *schem
return err
}
if err := typeutil.ValidateExternalCollectionSchema(schema); err != nil {
return err
}
if hasSystemFields(schema, []string{RowIDFieldName, TimeStampFieldName, MetaFieldName, NamespaceFieldName}) {
log.Ctx(ctx).Error("schema contains system field",
zap.String("RowIDFieldName", RowIDFieldName),
@ -317,6 +321,10 @@ func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema
return nil
}
if typeutil.IsExternalCollection(schema) {
return merr.WrapErrParameterInvalidMsg("external collection does not support namespace field")
}
if hasIsolation {
iso, err := common.IsPartitionKeyIsolationKvEnabled(t.Req.Properties...)
if err != nil {

View File

@ -901,6 +901,107 @@ func Test_createCollectionTask_validateSchema(t *testing.T) {
err := task.validateSchema(context.TODO(), schema)
assert.NoError(t, err)
})
t.Run("external schema valid case", func(t *testing.T) {
collectionName := funcutil.GenRandomStr()
task := createCollectionTask{
Req: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
CollectionName: collectionName,
},
}
schema := &schemapb.CollectionSchema{
Name: collectionName,
ExternalSource: "s3://bucket/object",
Fields: []*schemapb.FieldSchema{
{
Name: "text_field",
DataType: schemapb.DataType_VarChar,
ExternalField: "text_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "64"},
},
},
{
Name: "vec_field",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vec_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "16"},
},
},
},
}
err := task.validateSchema(context.TODO(), schema)
assert.NoError(t, err)
})
t.Run("external schema reject primary key", func(t *testing.T) {
collectionName := funcutil.GenRandomStr()
task := createCollectionTask{
Req: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
CollectionName: collectionName,
},
}
schema := &schemapb.CollectionSchema{
Name: collectionName,
ExternalSource: "s3://bucket/object",
Fields: []*schemapb.FieldSchema{
{
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
ExternalField: "pk_col",
},
{
Name: "vec_field",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vec_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "16"},
},
},
},
}
err := task.validateSchema(context.TODO(), schema)
assert.Error(t, err)
})
t.Run("external schema reject functions", func(t *testing.T) {
collectionName := funcutil.GenRandomStr()
task := createCollectionTask{
Req: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
CollectionName: collectionName,
},
}
schema := &schemapb.CollectionSchema{
Name: collectionName,
ExternalSource: "s3://bucket/object",
Fields: []*schemapb.FieldSchema{
{
Name: "text_field",
DataType: schemapb.DataType_VarChar,
ExternalField: "text_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "64"},
},
},
{
Name: "vec_field",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vec_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "16"},
},
},
},
Functions: []*schemapb.FunctionSchema{{Name: "test_func"}},
}
err := task.validateSchema(context.TODO(), schema)
assert.Error(t, err)
})
}
func Test_createCollectionTask_prepareSchema(t *testing.T) {
@ -1790,6 +1891,53 @@ func TestNamespaceProperty(t *testing.T) {
err := task.handleNamespaceField(ctx, schema)
assert.Error(t, err)
})
t.Run("test namespace enabled with external collection", func(t *testing.T) {
// External collection is identified by having ExternalField set on fields
schema := &schemapb.CollectionSchema{
Name: collectionName,
ExternalSource: "s3://bucket/path",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
ExternalField: "text_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "256"},
},
},
{
FieldID: 101,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vector_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "1024"},
},
},
},
}
task := &createCollectionTask{
Req: &milvuspb.CreateCollectionRequest{
CollectionName: collectionName,
Properties: []*commonpb.KeyValuePair{
{
Key: common.NamespaceEnabledKey,
Value: "true",
},
},
},
header: &message.CreateCollectionMessageHeader{},
body: &message.CreateCollectionRequest{
CollectionSchema: schema,
},
}
err := task.handleNamespaceField(ctx, schema)
assert.Error(t, err)
})
}
func Test_validateMultiAnalyzerParams(t *testing.T) {

View File

@ -212,6 +212,8 @@ func newCollectionModel(header *message.CreateCollectionMessageHeader, body *mes
SchemaVersion: 0,
ShardInfos: shardInfos,
FileResourceIds: body.CollectionSchema.GetFileResourceIds(),
ExternalSource: body.CollectionSchema.ExternalSource,
ExternalSpec: body.CollectionSchema.ExternalSpec,
}
}

View File

@ -1078,6 +1078,8 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
EnableDynamicField: collInfo.EnableDynamicField,
Properties: collInfo.Properties,
FileResourceIds: collInfo.FileResourceIds,
ExternalSource: collInfo.ExternalSource,
ExternalSpec: collInfo.ExternalSpec,
}
resp.CollectionID = collInfo.CollectionID
resp.VirtualChannelNames = collInfo.VirtualChannelNames

View File

@ -1705,6 +1705,21 @@ func GetAllFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSche
return all
}
// IsExternalCollection returns true when schema describes an external collection.
// External collections are identified by having fields with ExternalField set,
// since ExternalSource can be null for empty external collections.
func IsExternalCollection(schema *schemapb.CollectionSchema) bool {
if schema == nil {
return false
}
for _, field := range schema.GetFields() {
if field.GetExternalField() != "" {
return true
}
}
return false
}
// GetVectorFieldSchemas get vector fields schema from collection schema.
func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema {
ret := make([]*schemapb.FieldSchema, 0)
@ -1745,6 +1760,57 @@ func GetPrimaryFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSc
return nil, errors.New("primary field is not found")
}
// ValidateExternalCollectionSchema ensures unsupported features are disabled for external collections.
func ValidateExternalCollectionSchema(schema *schemapb.CollectionSchema) error {
if !IsExternalCollection(schema) {
return nil
}
if len(schema.GetFunctions()) > 0 {
return fmt.Errorf("external collection %s does not support functions", schema.GetName())
}
if schema.GetEnableDynamicField() {
return fmt.Errorf("external collection %s does not support dynamic field", schema.GetName())
}
if len(schema.GetStructArrayFields()) > 0 {
return fmt.Errorf("external collection %s does not support struct fields", schema.GetName())
}
for _, field := range schema.GetFields() {
// Skip system fields (RowID and Timestamp)
if field.GetName() == common.RowIDFieldName || field.GetName() == common.TimeStampFieldName {
continue
}
if field.GetIsPrimaryKey() {
return fmt.Errorf("external collection %s does not support primary key field %s", schema.GetName(), field.GetName())
}
if field.GetIsPartitionKey() {
return fmt.Errorf("external collection %s does not support partition key field %s", schema.GetName(), field.GetName())
}
if field.GetIsClusteringKey() {
return fmt.Errorf("external collection %s does not support clustering key field %s", schema.GetName(), field.GetName())
}
if field.GetAutoID() {
return fmt.Errorf("external collection %s does not support auto id on field %s", schema.GetName(), field.GetName())
}
helper := CreateFieldSchemaHelper(field)
if helper.EnableMatch() {
return fmt.Errorf("external collection %s does not support text match on field %s", schema.GetName(), field.GetName())
}
// Validate external_field mapping is set for all user fields
if field.GetExternalField() == "" {
return fmt.Errorf("field '%s' in external collection %s must have external_field mapping", field.GetName(), schema.GetName())
}
}
return nil
}
func IsFieldSparseFloatVector(schema *schemapb.CollectionSchema, fieldID int64) bool {
fieldSchema := GetField(schema, fieldID)
return fieldSchema != nil && IsSparseFloatVectorType(fieldSchema.DataType)

View File

@ -4976,3 +4976,145 @@ func TestSchemaHelper_CanRetrieveRawFieldData(t *testing.T) {
}
assert.False(t, helper.CanRetrieveRawFieldData(orphanField))
}
func TestIsExternalCollection(t *testing.T) {
// nil schema
assert.False(t, IsExternalCollection(nil))
// empty schema with no fields
schema := &schemapb.CollectionSchema{}
assert.False(t, IsExternalCollection(schema))
// schema with fields but no ExternalField set
schema.Fields = []*schemapb.FieldSchema{
{Name: "field1", ExternalField: ""},
}
assert.False(t, IsExternalCollection(schema))
// schema with ExternalSource but no ExternalField set
schema.ExternalSource = "s3://bucket/path"
assert.False(t, IsExternalCollection(schema))
// schema with ExternalField set (empty ExternalSource is allowed)
schema.ExternalSource = ""
schema.Fields = []*schemapb.FieldSchema{
{Name: "field1", ExternalField: "ext_field1"},
}
assert.True(t, IsExternalCollection(schema))
// schema with both ExternalSource and ExternalField set
schema.ExternalSource = "s3://bucket/path"
assert.True(t, IsExternalCollection(schema))
}
func TestValidateExternalCollectionSchema(t *testing.T) {
buildSchema := func() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: "external",
ExternalSource: "s3://bucket/path",
Fields: []*schemapb.FieldSchema{
{
Name: "text",
DataType: schemapb.DataType_VarChar,
ExternalField: "text_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "32"},
},
},
{
Name: "vec",
DataType: schemapb.DataType_FloatVector,
ExternalField: "vec_col",
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "16"},
},
},
},
}
}
t.Run("non external schema skipped", func(t *testing.T) {
// A non-external schema has no ExternalField set on any field
schema := &schemapb.CollectionSchema{
Name: "regular_collection",
Fields: []*schemapb.FieldSchema{
{
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "32"},
},
},
},
}
assert.NoError(t, ValidateExternalCollectionSchema(schema))
})
t.Run("functions disabled", func(t *testing.T) {
schema := buildSchema()
schema.Functions = []*schemapb.FunctionSchema{{Name: "test_func"}}
err := ValidateExternalCollectionSchema(schema)
assert.Error(t, err)
assert.Contains(t, err.Error(), "does not support functions")
})
t.Run("dynamic field disabled", func(t *testing.T) {
schema := buildSchema()
schema.EnableDynamicField = true
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("struct fields disabled", func(t *testing.T) {
schema := buildSchema()
schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{
{Name: "struct_field", Fields: []*schemapb.FieldSchema{{Name: "nested", DataType: schemapb.DataType_Array}}},
}
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("primary key disabled", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].IsPrimaryKey = true
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("partition key disabled", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].IsPartitionKey = true
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("clustering key disabled", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].IsClusteringKey = true
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("auto id disabled", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].AutoID = true
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("text match disabled", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].TypeParams = append(schema.Fields[0].TypeParams, &commonpb.KeyValuePair{
Key: "enable_match",
Value: "true",
})
assert.Error(t, ValidateExternalCollectionSchema(schema))
})
t.Run("external_field mapping required", func(t *testing.T) {
schema := buildSchema()
schema.Fields[0].ExternalField = ""
err := ValidateExternalCollectionSchema(schema)
assert.Error(t, err)
assert.Contains(t, err.Error(), "must have external_field mapping")
})
t.Run("valid schema passes", func(t *testing.T) {
err := ValidateExternalCollectionSchema(buildSchema())
assert.NoError(t, err)
})
}

View File

@ -0,0 +1,316 @@
package testcases
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
"github.com/milvus-io/milvus/tests/go_client/common"
hp "github.com/milvus-io/milvus/tests/go_client/testcases/helper"
)
// TestCreateExternalCollection tests creating an external collection
func TestCreateExternalCollection(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256).
WithExternalField("text_column"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, true)
// Verify collection exists
has, err := mc.HasCollection(ctx, client.NewHasCollectionOption(collName))
common.CheckErr(t, err, true)
require.True(t, has)
// Describe collection and verify schema
coll, err := mc.DescribeCollection(ctx, client.NewDescribeCollectionOption(collName))
common.CheckErr(t, err, true)
// Verify external source and spec
require.Equal(t, "s3://test-bucket/data/", coll.Schema.ExternalSource)
require.Equal(t, `{"format": "parquet"}`, coll.Schema.ExternalSpec)
// Verify fields
require.Len(t, coll.Schema.Fields, 2)
// Verify text field
textField := findFieldByName(coll.Schema.Fields, "text")
require.NotNil(t, textField)
require.Equal(t, entity.FieldTypeVarChar, textField.DataType)
require.Equal(t, "text_column", textField.ExternalField)
// Verify embedding field
embeddingField := findFieldByName(coll.Schema.Fields, "embedding")
require.NotNil(t, embeddingField)
require.Equal(t, entity.FieldTypeFloatVector, embeddingField.DataType)
require.Equal(t, "embedding_column", embeddingField.ExternalField)
// List collections and verify
collections, err := mc.ListCollections(ctx, client.NewListCollectionOption())
common.CheckErr(t, err, true)
require.Contains(t, collections, collName)
}
// TestCreateExternalCollectionMissingExternalField tests that creating external collection
// without external_field mapping fails
func TestCreateExternalCollectionMissingExternalField(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source but missing external_field
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256),
// Missing WithExternalField()
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection should fail
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, false, "must have external_field mapping")
}
// TestCreateExternalCollectionWithPrimaryKey tests that creating external collection
// with primary key field fails
func TestCreateExternalCollectionWithPrimaryKey(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source and primary key (not allowed)
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("id").
WithDataType(entity.FieldTypeInt64).
WithIsPrimaryKey(true).
WithExternalField("id_column"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection should fail - external collections don't support primary key
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, false, "does not support primary key")
}
// TestCreateExternalCollectionWithDynamicField tests that creating external collection
// with dynamic field enabled fails
func TestCreateExternalCollectionWithDynamicField(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source and dynamic field (not allowed)
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithDynamicFieldEnabled(true).
WithField(
entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256).
WithExternalField("text_column"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection should fail - external collections don't support dynamic field
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, false, "does not support dynamic field")
}
// TestCreateExternalCollectionWithAutoID tests that creating external collection
// with auto ID field fails
func TestCreateExternalCollectionWithAutoID(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source and auto ID (not allowed)
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("id").
WithDataType(entity.FieldTypeInt64).
WithIsAutoID(true).
WithExternalField("id_column"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection should fail - external collections don't support auto ID
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, false, "does not support auto id")
}
// TestCreateExternalCollectionWithPartitionKey tests that creating external collection
// with partition key field fails
func TestCreateExternalCollectionWithPartitionKey(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source and partition key (not allowed)
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("category").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256).
WithIsPartitionKey(true).
WithExternalField("category_column"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("embedding_column"),
)
// Create collection should fail - external collections don't support partition key
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, false, "does not support partition key")
}
// TestCreateExternalCollectionMultipleVectorFields tests creating external collection
// with multiple vector fields
func TestCreateExternalCollectionMultipleVectorFields(t *testing.T) {
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
mc := hp.CreateDefaultMilvusClient(ctx, t)
collName := common.GenRandomString("external", 6)
// Create schema with external source and multiple vector fields
schema := entity.NewSchema().
WithName(collName).
WithExternalSource("s3://test-bucket/data/").
WithExternalSpec(`{"format": "parquet"}`).
WithField(
entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(256).
WithExternalField("text_column"),
).
WithField(
entity.NewField().
WithName("dense_embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(common.DefaultDim).
WithExternalField("dense_embedding_column"),
).
WithField(
entity.NewField().
WithName("binary_embedding").
WithDataType(entity.FieldTypeBinaryVector).
WithDim(common.DefaultDim).
WithExternalField("binary_embedding_column"),
)
// Create collection
err := mc.CreateCollection(ctx, client.NewCreateCollectionOption(collName, schema))
common.CheckErr(t, err, true)
// Describe collection and verify schema
coll, err := mc.DescribeCollection(ctx, client.NewDescribeCollectionOption(collName))
common.CheckErr(t, err, true)
// Verify external source
require.Equal(t, "s3://test-bucket/data/", coll.Schema.ExternalSource)
// Verify fields
require.Len(t, coll.Schema.Fields, 3)
// Verify vector fields
denseField := findFieldByName(coll.Schema.Fields, "dense_embedding")
require.NotNil(t, denseField)
require.Equal(t, entity.FieldTypeFloatVector, denseField.DataType)
require.Equal(t, "dense_embedding_column", denseField.ExternalField)
binaryField := findFieldByName(coll.Schema.Fields, "binary_embedding")
require.NotNil(t, binaryField)
require.Equal(t, entity.FieldTypeBinaryVector, binaryField.DataType)
require.Equal(t, "binary_embedding_column", binaryField.ExternalField)
}
// findFieldByName finds a field by name in the fields slice
func findFieldByName(fields []*entity.Field, name string) *entity.Field {
for _, f := range fields {
if f.Name == name {
return f
}
}
return nil
}