6.2 KiB
Timesync -- All The things you should know
Time Synchronization is the kernel part of Milvus 2.0, it affects all components of the system. This document describes the detailed desgin of Time Synchronization.
There are 2 kinds of events in Milvus 2.0:
- DDL events
- create collection
- drop collection
- create partition
- drop partition
- DML events
- insert
- search
- etc
All events have a Timestamp to indicate when this event occurs.
Suppose there are two users, u1 and u2. They connect to Milvus, and do following operations at respective timestamps.
| ts | u1 | u2 |
|---|---|---|
| t0 | create Collection C0 | - |
| t2 | - | search on C0 |
| t5 | insert A1 into C0 | - |
| t7 | - | search on C0 |
| t10 | insert A2 | - |
| t12 | - | search on C0 |
| t15 | delete A1 from C0 | - |
| t17 | - | search on C0 |
Ideally, u2 expects C0 is empty at t2, and could only sees A1 at t7; while u2 could see both A1 and A2 at t12, but only see A2 at t17.
It's easy to achieve this in a single-node database. But for a Distributed System, such like Milvus, it's a little difficult, following problems needs to be solved.
- If
u1andu2are on different nodes, and their time clock is not synchronized. To give an extreme example, suppose that the time ofu2is 24 hours later thanu1, then all the operations ofu1can't been seen byu2until next day. - Network latency. If
u2starts theSearch on C0att17, then how to guarantee that all theeventsbeforet17have been processed. If the events ofdelete A1 from C0has been delayed due to the network latency, then it would lead to incorrect state:u2would see bothA1andA2att17.
Time synchronization system is used to solve the above problems.
Timestamp Oracle(TSO)
Like TiKV, Milvus 2.0 provides TSO service, all the events must alloc timestamp from TSO,not use local timestamp, so the first problem can be solved.
TSO is provided by RootCoord component, clients could alloc one or more timestamp in a single request, the proto is defined as following.
service RootCoord {
...
rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {}
...
}
message AllocTimestampRequest {
common.MsgBase base = 1;
uint32 count = 3;
}
message AllocTimestampResponse {
common.Status status = 1;
uint64 timestamp = 2;
uint32 count = 3;
}
Timestamp is with type uint64, containing physical and logical parts.
This is the format of Timestamp
In an AllocTimestamp request, if AllocTimestampRequest.count is greater than 1, AllocTimestampResponse.timestamp indicates the first available timestamp in the response.
Time Synchronization
To understand the Time Synchronization better, let's introduce the data operation of Milvus 2.0 briefly.
Taking Insert Operation as an example.
- User can configure lots of
Proxyto achieve load balancing, inMilvus 2.0 - User can use
SDKto connect to anyProxy - When
ProxyreceivesInsertRequest fromSDK, it splitsInsertMsginto differentMsgStreamaccording to the hash value ofPrimary Key - Each
InsertMsgwould be assigned with aTimestampbefore sending to theMsgStream
Note:
MsgStreamis the wrapper of message queue, the default message queue inMilvus 2.0ispulsar
Based on above information, we can know that the MsgStream have the following characteristics:
- In
MsgStream,InsertMsgfrom the sameProxymust be incremented in timestamp - In
MsgStream,InsertMsgfrom differentProxyhave no relationship in timestamp
The following figure shows an example of InsertMsg in MsgStream, the snippet contains 5 InsertMsg, 3 of them from Proxy1 and others from Proxy2.
The 3 InsertMsg from Proxy1 are incremented in timestamp, and the 2 InsertMsg from Proxy2 are also incremented in timestamps, but there is no relationship between Proxy1 and Proxy2.
So the second problem has turned into this: after reading a message from MsgStream, how to make sure that all the messages with smaller timestamp have been consumed ?
For example, when read a message with timestamp 110 produced by Proxy2, but the message with timestamp 80 produced by Proxy1, is still in the MsgStream, how to handle this situation ?
The following graph shows the core logic of Time Synchronization System in Milvus 2.0, it should solve the second problem.
- Each
Proxywill periodically reports its latest timestamp of everyMsgStreamtoRootCoord, the default interval is200ms - For each
Msgstream,Rootcoordfinds the minimum timestamp of allProxyon thisMsgstream, and inserts this minimum timestamp into theMsgstream - When the consumer reads the timestamp inserted by the
RootCoordon theMsgStream, it indicates that all messages with smaller timestamp have been consumed, so all actions that depend on this timestamp can be executed safely - The message inserted by
RootCoordintoMsgStreamis type ofTimeTick
This is the Proto that used by Proxy to report timestamp to RootCoord:
service RootCoord {
...
rpc UpdateChannelTimeTick(internal.ChannelTimeTickMsg) returns (common.Status) {}
...
}
message ChannelTimeTickMsg {
common.MsgBase base = 1;
repeated string channelNames = 2;
repeated uint64 timestamps = 3;
uint64 default_timestamp = 4;
}
After inserting Timetick, the Msgstream should look like this:

MsgStream will process the messages in batches according to TimeTick, and ensure that the output messages meet the requirements of timestamp. For more details, please refer to the MsgStream design detail.



