diff --git a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py
index c8b0a14dff..29125e4a75 100644
--- a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py
+++ b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py
@@ -58,23 +58,29 @@ class MilvusCDCPerformance:
'target_count': []
}
self.cdc_events = []
- self.cdc_pause_start_time = None
- self.cdc_pause_end_time = None
- self.sync_completion_time = None
- self.catch_up_count = None
- self.catch_up_time = None
+ self.cdc_metrics = {
+ 'pause_start': time.time(),
+ 'pause_end': time.time(),
+ 'time_to_90_percent': 0,
+ 'time_to_99_percent': 0
+ }
+ # self.cdc_pause_start_time = None
+ # self.cdc_pause_end_time = None
+ # self.sync_completion_time = None
+ # self.catch_up_count = None
+ # self.catch_up_time = None
- def create_interactive_plot(self, data, count_data, cdc_events):
+ def create_interactive_plot(self, data, count_data, cdc_metrics):
# Convert timestamps to datetime
data['datetime'] = pd.to_datetime(data['timestamp'], unit='s')
count_data['datetime'] = pd.to_datetime(count_data['timestamp'], unit='s')
# Create a figure with subplots
- fig = make_subplots(rows=3, cols=1,
+ fig = make_subplots(rows=4, cols=1,
subplot_titles=("Source and Target Collection Count over Time",
"Insert and Sync Throughput over Time",
"Latency over Time"),
- vertical_spacing=0.1)
+ vertical_spacing=0.2)
# Plot 1: Source and Target Collection Count
fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['source_count'],
@@ -83,36 +89,39 @@ class MilvusCDCPerformance:
fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['target_count'],
mode='lines', name='Target Count'),
row=1, col=1)
- # Add CDC pause and resume events as annotations
- for event in cdc_events:
- event_time = pd.to_datetime(event['timestamp'], unit='s')
- color = "red" if event['action'] == 'pause' else "green"
- fig.add_annotation(
- x=event_time,
- y=1,
- yref="paper",
- text=f"CDC {event['action'].capitalize()}
{event_time.strftime('%Y-%m-%d %H:%M:%S')}",
- showarrow=True,
- arrowhead=2,
- arrowsize=1,
- arrowwidth=2,
- arrowcolor=color,
- ax=0,
- ay=-60,
- bordercolor=color,
- borderwidth=2,
- borderpad=4,
- bgcolor="white",
- opacity=0.8,
- font=dict(size=10),
- align="center",
- row=1, col=1
- )
- if self.sync_completion_time:
- fig.add_annotation(x=pd.to_datetime(self.sync_completion_time, unit='s'), y=1, yref="paper",
- text=f"Sync Completion (99.9%)
Catch-up: {self.catch_up_count:,} entities in {self.catch_up_time:.2f} seconds",
- showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2,
- arrowcolor="blue", ax=0, ay=-60, row=1, col=1)
+ # # Add CDC pause and resume events as annotations
+ # for event in cdc_events:
+ # event_time = pd.to_datetime(event['timestamp'], unit='s')
+ # color = "red" if event['action'] == 'pause' else "green"
+ # fig.add_annotation(
+ # x=event_time,
+ # y=1,
+ # yref="paper",
+ # text=f"CDC {event['action'].capitalize()}
{event_time.strftime('%Y-%m-%d %H:%M:%S')}",
+ # showarrow=True,
+ # arrowhead=2,
+ # arrowsize=1,
+ # arrowwidth=2,
+ # arrowcolor=color,
+ # ax=0,
+ # ay=-60,
+ # bordercolor=color,
+ # borderwidth=2,
+ # borderpad=4,
+ # bgcolor="white",
+ # opacity=0.8,
+ # font=dict(size=10),
+ # align="center",
+ # row=1, col=1
+ # )
+ # if self.sync_completion_time:
+ # fig.add_annotation(x=pd.to_datetime(self.sync_completion_time, unit='s'), y=1, yref="paper",
+ # text=f"Sync Completion (99.9%)
Catch-up: {self.catch_up_count:,} entities in {self.catch_up_time:.2f} seconds",
+ # showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2,
+ # arrowcolor="blue", ax=0, ay=-60, row=1, col=1)
+
+
+
# Plot 2: Insert and Sync Throughput
fig.add_trace(go.Scatter(x=data['datetime'], y=data['insert_throughput'],
mode='lines', name='Insert Throughput'),
@@ -121,19 +130,23 @@ class MilvusCDCPerformance:
mode='lines', name='Sync Throughput'),
row=2, col=1)
- # Plot 3: Latency
- # fig.add_trace(go.Scatter(x=data['datetime'], y=data['avg_latency'],
- # mode='lines', name='Average Latency'),
- # row=3, col=1)
fig.add_trace(go.Scatter(x=data['datetime'], y=data['real_time_latency'],
mode='lines', name='Real-time Latency'),
row=3, col=1)
- #
- # # Calculate and plot p99 latency
- # p99_latency = [np.percentile(data['avg_latency'][:i + 1], 99) for i in range(len(data['avg_latency']))]
- # fig.add_trace(go.Scatter(x=data['datetime'], y=p99_latency,
- # mode='lines', name='P99 Latency'),
- # row=3, col=1)
+
+ # Plot 4: CDC Performance Metrics Table
+ fig.add_trace(go.Table(
+ header=dict(values=['Metric', 'Value'],
+ fill_color='paleturquoise',
+ align='left'),
+ cells=dict(values=[['CDC Pause Start', 'CDC Pause End', 'Time to 90% Recovery', 'Time to 99% Recovery'],
+ [cdc_metrics['pause_start'],
+ cdc_metrics['pause_end'],
+ cdc_metrics['time_to_90_percent'],
+ cdc_metrics['time_to_99_percent']]],
+ fill_color='lavender',
+ align='left')
+ ), row=4, col=1)
# Update layout
fig.update_layout(height=1200, width=1000, title_text="Milvus CDC Performance Metrics")
@@ -245,11 +258,11 @@ class MilvusCDCPerformance:
def pause_and_resume_cdc_tasks(self, duration):
time.sleep(duration / 3)
- self.cdc_pause_start_time = time.time()
+ self.cdc_metrics['pause_start'] = datetime.now().strftime("%Y%m%d%H%M%S")
self.pause_cdc_tasks()
time.sleep(duration / 3)
self.resume_cdc_tasks()
- self.cdc_pause_end_time = time.time()
+ self.cdc_metrics['pause_end'] = datetime.now().strftime("%Y%m%d%H%M%S")
def setup_collections(self):
self.resume_cdc_tasks()
@@ -385,18 +398,30 @@ class MilvusCDCPerformance:
self.count_series_data['source_count'].append(self.source_count)
self.count_series_data['target_count'].append(self.target_count)
logger.debug(f"sync progress {self.target_count}/{self.source_count} {progress:.2f}%")
- # 检查同步是否接近完成(99.9%)
- if progress >= 99.9 and self.cdc_pause_end_time is not None and self.sync_completion_time is None:
- self.sync_completion_time = time.time()
+ # 检查同步是否接近完成(99%)
+ if progress >= 90 and time.time() > self.cdc_metrics['pause_end'] and self.cdc_metrics['time_to_90_percent'] == 0:
# 计算 catch-up time
- self.catch_up_time = self.sync_completion_time - self.cdc_pause_end_time
+ catchup_time = time.time() - self.cdc_metrics['pause_end']
# 计算catch-up count: 当前的target count - count_series_data中第一个大于cdc_pause_end_time的target_count
for i in range(len(self.count_series_data['timestamp'])):
- if self.count_series_data['timestamp'][i] > self.cdc_pause_end_time:
- self.catch_up_count = self.target_count - self.count_series_data['target_count'][i]
+ if self.count_series_data['timestamp'][i] > self.cdc_metrics['pause_end']:
+ catch_up_count = self.target_count - self.count_series_data['target_count'][i]
+
break
- logger.info(f"Sync nearly completed (99.9%) at {datetime.fromtimestamp(self.sync_completion_time)}"
- f" with {self.catch_up_count:,} entities caught up in {self.catch_up_time:.2f} seconds")
+ self.cdc_metrics['time_to_90_percent'] = f"catch up {catch_up_count} entities in {catchup_time:.2f} seconds"
+
+ # 检查同步是否接近完成(99%)
+ if progress >= 99 and time.time() > self.cdc_metrics['pause_end'] and self.cdc_metrics['time_to_99_percent'] == 0:
+ # 计算 catch-up time
+ catchup_time = time.time() - self.cdc_metrics['pause_end']
+ # 计算catch-up count: 当前的target count - count_series_data中第一个大于cdc_pause_end_time的target_count
+ for i in range(len(self.count_series_data['timestamp'])):
+ if self.count_series_data['timestamp'][i] > self.cdc_metrics['pause_end']:
+ catch_up_count = self.target_count - self.count_series_data['target_count'][i]
+
+ break
+ self.cdc_metrics['time_to_99_percent'] = f"catch up {catch_up_count} entities in {catchup_time:.2f} seconds"
+
except Exception as e:
logger.error(f"Count failed: {e}")
time.sleep(0.01) # Query interval