update cdc plot

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-08-15 10:28:55 +08:00
parent 8a099da5bd
commit 1cf0dde353

View File

@ -58,23 +58,29 @@ class MilvusCDCPerformance:
'target_count': []
}
self.cdc_events = []
self.cdc_pause_start_time = None
self.cdc_pause_end_time = None
self.sync_completion_time = None
self.catch_up_count = None
self.catch_up_time = None
self.cdc_metrics = {
'pause_start': time.time(),
'pause_end': time.time(),
'time_to_90_percent': 0,
'time_to_99_percent': 0
}
# self.cdc_pause_start_time = None
# self.cdc_pause_end_time = None
# self.sync_completion_time = None
# self.catch_up_count = None
# self.catch_up_time = None
def create_interactive_plot(self, data, count_data, cdc_events):
def create_interactive_plot(self, data, count_data, cdc_metrics):
# Convert timestamps to datetime
data['datetime'] = pd.to_datetime(data['timestamp'], unit='s')
count_data['datetime'] = pd.to_datetime(count_data['timestamp'], unit='s')
# Create a figure with subplots
fig = make_subplots(rows=3, cols=1,
fig = make_subplots(rows=4, cols=1,
subplot_titles=("Source and Target Collection Count over Time",
"Insert and Sync Throughput over Time",
"Latency over Time"),
vertical_spacing=0.1)
vertical_spacing=0.2)
# Plot 1: Source and Target Collection Count
fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['source_count'],
@ -83,36 +89,39 @@ class MilvusCDCPerformance:
fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['target_count'],
mode='lines', name='Target Count'),
row=1, col=1)
# Add CDC pause and resume events as annotations
for event in cdc_events:
event_time = pd.to_datetime(event['timestamp'], unit='s')
color = "red" if event['action'] == 'pause' else "green"
fig.add_annotation(
x=event_time,
y=1,
yref="paper",
text=f"CDC {event['action'].capitalize()}<br>{event_time.strftime('%Y-%m-%d %H:%M:%S')}",
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor=color,
ax=0,
ay=-60,
bordercolor=color,
borderwidth=2,
borderpad=4,
bgcolor="white",
opacity=0.8,
font=dict(size=10),
align="center",
row=1, col=1
)
if self.sync_completion_time:
fig.add_annotation(x=pd.to_datetime(self.sync_completion_time, unit='s'), y=1, yref="paper",
text=f"Sync Completion (99.9%)<br>Catch-up: {self.catch_up_count:,} entities in {self.catch_up_time:.2f} seconds",
showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2,
arrowcolor="blue", ax=0, ay=-60, row=1, col=1)
# # Add CDC pause and resume events as annotations
# for event in cdc_events:
# event_time = pd.to_datetime(event['timestamp'], unit='s')
# color = "red" if event['action'] == 'pause' else "green"
# fig.add_annotation(
# x=event_time,
# y=1,
# yref="paper",
# text=f"CDC {event['action'].capitalize()}<br>{event_time.strftime('%Y-%m-%d %H:%M:%S')}",
# showarrow=True,
# arrowhead=2,
# arrowsize=1,
# arrowwidth=2,
# arrowcolor=color,
# ax=0,
# ay=-60,
# bordercolor=color,
# borderwidth=2,
# borderpad=4,
# bgcolor="white",
# opacity=0.8,
# font=dict(size=10),
# align="center",
# row=1, col=1
# )
# if self.sync_completion_time:
# fig.add_annotation(x=pd.to_datetime(self.sync_completion_time, unit='s'), y=1, yref="paper",
# text=f"Sync Completion (99.9%)<br>Catch-up: {self.catch_up_count:,} entities in {self.catch_up_time:.2f} seconds",
# showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2,
# arrowcolor="blue", ax=0, ay=-60, row=1, col=1)
# Plot 2: Insert and Sync Throughput
fig.add_trace(go.Scatter(x=data['datetime'], y=data['insert_throughput'],
mode='lines', name='Insert Throughput'),
@ -121,19 +130,23 @@ class MilvusCDCPerformance:
mode='lines', name='Sync Throughput'),
row=2, col=1)
# Plot 3: Latency
# fig.add_trace(go.Scatter(x=data['datetime'], y=data['avg_latency'],
# mode='lines', name='Average Latency'),
# row=3, col=1)
fig.add_trace(go.Scatter(x=data['datetime'], y=data['real_time_latency'],
mode='lines', name='Real-time Latency'),
row=3, col=1)
#
# # Calculate and plot p99 latency
# p99_latency = [np.percentile(data['avg_latency'][:i + 1], 99) for i in range(len(data['avg_latency']))]
# fig.add_trace(go.Scatter(x=data['datetime'], y=p99_latency,
# mode='lines', name='P99 Latency'),
# row=3, col=1)
# Plot 4: CDC Performance Metrics Table
fig.add_trace(go.Table(
header=dict(values=['Metric', 'Value'],
fill_color='paleturquoise',
align='left'),
cells=dict(values=[['CDC Pause Start', 'CDC Pause End', 'Time to 90% Recovery', 'Time to 99% Recovery'],
[cdc_metrics['pause_start'],
cdc_metrics['pause_end'],
cdc_metrics['time_to_90_percent'],
cdc_metrics['time_to_99_percent']]],
fill_color='lavender',
align='left')
), row=4, col=1)
# Update layout
fig.update_layout(height=1200, width=1000, title_text="Milvus CDC Performance Metrics")
@ -245,11 +258,11 @@ class MilvusCDCPerformance:
def pause_and_resume_cdc_tasks(self, duration):
time.sleep(duration / 3)
self.cdc_pause_start_time = time.time()
self.cdc_metrics['pause_start'] = datetime.now().strftime("%Y%m%d%H%M%S")
self.pause_cdc_tasks()
time.sleep(duration / 3)
self.resume_cdc_tasks()
self.cdc_pause_end_time = time.time()
self.cdc_metrics['pause_end'] = datetime.now().strftime("%Y%m%d%H%M%S")
def setup_collections(self):
self.resume_cdc_tasks()
@ -385,18 +398,30 @@ class MilvusCDCPerformance:
self.count_series_data['source_count'].append(self.source_count)
self.count_series_data['target_count'].append(self.target_count)
logger.debug(f"sync progress {self.target_count}/{self.source_count} {progress:.2f}%")
# 检查同步是否接近完成99.9%
if progress >= 99.9 and self.cdc_pause_end_time is not None and self.sync_completion_time is None:
self.sync_completion_time = time.time()
# 检查同步是否接近完成99%
if progress >= 90 and time.time() > self.cdc_metrics['pause_end'] and self.cdc_metrics['time_to_90_percent'] == 0:
# 计算 catch-up time
self.catch_up_time = self.sync_completion_time - self.cdc_pause_end_time
catchup_time = time.time() - self.cdc_metrics['pause_end']
# 计算catch-up count: 当前的target count - count_series_data中第一个大于cdc_pause_end_time的target_count
for i in range(len(self.count_series_data['timestamp'])):
if self.count_series_data['timestamp'][i] > self.cdc_pause_end_time:
self.catch_up_count = self.target_count - self.count_series_data['target_count'][i]
if self.count_series_data['timestamp'][i] > self.cdc_metrics['pause_end']:
catch_up_count = self.target_count - self.count_series_data['target_count'][i]
break
logger.info(f"Sync nearly completed (99.9%) at {datetime.fromtimestamp(self.sync_completion_time)}"
f" with {self.catch_up_count:,} entities caught up in {self.catch_up_time:.2f} seconds")
self.cdc_metrics['time_to_90_percent'] = f"catch up {catch_up_count} entities in {catchup_time:.2f} seconds"
# 检查同步是否接近完成99%
if progress >= 99 and time.time() > self.cdc_metrics['pause_end'] and self.cdc_metrics['time_to_99_percent'] == 0:
# 计算 catch-up time
catchup_time = time.time() - self.cdc_metrics['pause_end']
# 计算catch-up count: 当前的target count - count_series_data中第一个大于cdc_pause_end_time的target_count
for i in range(len(self.count_series_data['timestamp'])):
if self.count_series_data['timestamp'][i] > self.cdc_metrics['pause_end']:
catch_up_count = self.target_count - self.count_series_data['target_count'][i]
break
self.cdc_metrics['time_to_99_percent'] = f"catch up {catch_up_count} entities in {catchup_time:.2f} seconds"
except Exception as e:
logger.error(f"Count failed: {e}")
time.sleep(0.01) # Query interval