add cdc event

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-08-12 14:56:04 +08:00
parent f09c310cf4
commit 83c3fc0914

View File

@ -18,7 +18,7 @@ logger.remove()
logger.add(sink=sys.stdout, level="DEBUG")
def create_interactive_plot(data, count_data):
def create_interactive_plot(data, count_data, cdc_events):
# Convert timestamps to datetime
data['datetime'] = pd.to_datetime(data['timestamp'], unit='s')
count_data['datetime'] = pd.to_datetime(count_data['timestamp'], unit='s')
@ -37,6 +37,11 @@ def create_interactive_plot(data, count_data):
fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['target_count'],
mode='lines', name='Target Count'),
row=1, col=1)
# Add CDC pause and resume events
for event in cdc_events:
event_time = pd.to_datetime(event['timestamp'], unit='s')
fig.add_vline(x=event_time, line_dash="dash", line_color="red" if event['action'] == 'pause' else "green",
annotation_text=f"CDC {event['action'].capitalize()}", row=1, col=1)
# Plot 2: Insert and Sync Throughput
fig.add_trace(go.Scatter(x=data['datetime'], y=data['insert_throughput'],
@ -114,6 +119,7 @@ class MilvusCDCPerformance:
'source_count': [],
'target_count': []
}
self.cdc_events = []
def report_realtime_metrics(self):
current_time = time.time()
@ -158,7 +164,7 @@ class MilvusCDCPerformance:
def plot_time_series_data(self):
df = pd.DataFrame(self.time_series_data)
count_df = pd.DataFrame(self.count_series_data)
fig = create_interactive_plot(df, count_df)
fig = create_interactive_plot(df, count_df, self.cdc_events)
pio.write_html(fig, file='milvus_cdc_performance.html', auto_open=True)
logger.info("Interactive performance plot saved as 'milvus_cdc_performance.html'")
@ -184,6 +190,7 @@ class MilvusCDCPerformance:
result = response.json()
logger.info(f"Pause CDC task {task_id} response: {result}")
self.cdc_paused = True
self.cdc_events.append({'timestamp': time.time(), 'action': 'pause'}) # Record pause event
logger.info("All CDC tasks paused")
def resume_cdc_tasks(self):
@ -199,6 +206,7 @@ class MilvusCDCPerformance:
result = response.json()
logger.info(f"Resume CDC task {task_id} response: {result}")
self.cdc_paused = False
self.cdc_events.append({'timestamp': time.time(), 'action': 'resume'}) # Record resume event
logger.info("All CDC tasks resumed")
def pause_and_resume_cdc_tasks(self, duration):