5.3 KiB
Flush Collection
Flush operation is used to make sure that the data has been writen into the persistent storage, this document introduce how Flush operation works in Milvus 2.0. The following figure shows the execution flow of Flush
- Firstly,
SDKstarts aFlushrequest toProxyviaGrpc, theprotois defined as follows:
service MilvusService {
...
rpc Flush(FlushRequest) returns (FlushResponse) {}
...
}
message FlushRequest {
common.MsgBase base = 1;
string db_name = 2;
repeated string collection_names = 3;
}
message FlushResponse{
common.Status status = 1;
string db_name = 2;
map<string, schema.LongArray> coll_segIDs = 3;
}
- When received the
Flushrequest, theProxywould wraps this request intoFlushTask, and pushs this task intoDdTaskQueuequeue. After that,Proxywould call method ofWatiToFinishto wait until the task finished.
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
type FlushTask struct {
Condition
*milvuspb.FlushRequest
ctx context.Context
dataCoord types.DataCoord
result *milvuspb.FlushResponse
}
-
There is a backgroud service in
Proxy, this service would get theFlushTaskfromDdTaskQueue, and executes it in three phases.PreExecute,FlushTaskdoes nothing at this phase, and return directlyExecute, at this phase,Proxywould sendFlushrequest toDataCoordviaGrpc,and wait for the reponse, theprotois defined as follow:
service DataCoord { ... rpc Flush(FlushRequest) returns (FlushResponse) {} ... } message FlushRequest { common.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 4; } message FlushResponse { common.Status status = 1; int64 dbID = 2; int64 collectionID = 3; repeated int64 segmentIDs = 4;PostExecute,FlushTaskdoes nothing at this phase, and return directly
-
After receiving
Flushrequest fromProxy,DataCoordwould callSealAllSegmentsto seal all the growing segments that belong to thisCollection, and no longer allocate newIDs for these segments. After that,DataCoordwould send response toProxy, and the response should contain all the sealed segment ID. -
In
Milvus 2.0, theFlushis an asynchronous operation. So whenSDKreceives the response ofFlush, it only means that theDataCoordhas sealed these segments, and there are 2 problem that we have to soluved.- The sealed segments might still in the memory, and not have been writen into persistent storage yet.
DataCoordwould no longer allocate newIDs for these sealed segments, but how to make sure all the allocatedIDs have been consumed byDataNode.
-
For the first problem,
SDKshould sendGetSegmentInforequest toDataCoordperiodically, until all the sealed segment are in state ofFlushed. theprotois defined as following.
service DataCoord {
...
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
...
}
message GetSegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_of_rows = 5;
common.SegmentState state = 6;
internal.MsgPosition dml_position = 7;
int64 max_row_num = 8;
uint64 last_expire_time = 9;
internal.MsgPosition start_position = 10;
}
enum SegmentState {
SegmentStateNone = 0;
NotExist = 1;
Growing = 2;
Sealed = 3;
Flushed = 4;
Flushing = 5;
}
- For second problem,
DataNodewould report a timestamp toDataCoordevery time it consumes a package fromMsgStream,the Proto is define as follow.
message DataNodeTtMsg {
common.MsgBase base =1;
string channel_name = 2;
uint64 timestamp = 3;
}
- There is a backgroud service,
startDataNodeTsLoop, inDataCoordto process the message ofDataNodeTtMsg.- Firstly,
DataCoordwould extractchannel_namefromDataNodeTtMsg, and filter out all the sealed segments that attached on thischannel_name - Compare the timestamp when the segment enters into state of
Sealedwith theDataNodeTtMsg.timestamp, ifDataNodeTtMsg.timestampis greater, it means that all theIDs belong to that segment have been consumed byDataNode,so it's safe to notifyDataNodeto write that segment into persistent storage. Theprotois defined as follow.
- Firstly,
service DataNode {
...
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
...
}
message FlushSegmentsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 3;
repeated int64 segmentIDs = 4;
}
