mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
add cdc test
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
88c6910061
commit
9341b99a27
@ -44,6 +44,11 @@ class MilvusCDCPerformance:
|
||||
'sync_throughput': [],
|
||||
'avg_latency': []
|
||||
}
|
||||
self.count_series_data = {
|
||||
'timestamp': [],
|
||||
'source_count': [],
|
||||
'target_count': []
|
||||
}
|
||||
|
||||
def report_realtime_metrics(self):
|
||||
current_time = time.time()
|
||||
@ -83,8 +88,15 @@ class MilvusCDCPerformance:
|
||||
time.sleep(interval)
|
||||
|
||||
def plot_time_series_data(self):
|
||||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
|
||||
|
||||
fig, (ax0, ax1, ax2) = plt.subplots(3, 1, figsize=(12, 10))
|
||||
# Plot count
|
||||
ax0.plot(self.count_series_data['timestamp'], self.count_series_data['source_count'], label='Source Count')
|
||||
ax0.plot(self.count_series_data['timestamp'], self.count_series_data['target_count'], label='Target Count')
|
||||
ax0.set_xlabel('Time (seconds)')
|
||||
ax0.set_ylabel('Entity Count')
|
||||
ax0.set_title('Source and Target Collection Count over Time')
|
||||
ax0.legend()
|
||||
ax0.grid(True)
|
||||
# Plot throughput
|
||||
ax1.plot(self.time_series_data['timestamp'], self.time_series_data['insert_throughput'],
|
||||
label='Insert Throughput')
|
||||
@ -232,40 +244,57 @@ class MilvusCDCPerformance:
|
||||
time.sleep(0.01) # Query interval
|
||||
|
||||
def continuous_count(self):
|
||||
|
||||
def count_target(self):
|
||||
try:
|
||||
t0 = time.time()
|
||||
results = self.target_collection.query(
|
||||
expr="",
|
||||
output_fields=["count(*)"],
|
||||
timeout=5
|
||||
)
|
||||
tt = time.time() - t0
|
||||
self.target_count = results[0]['count(*)']
|
||||
except Exception as e:
|
||||
logger.error(f"Target count failed: {e}")
|
||||
self.target_count = self.last_target_count
|
||||
|
||||
def count_source(self):
|
||||
try:
|
||||
t0 = time.time()
|
||||
results = self.source_collection.query(
|
||||
expr="",
|
||||
output_fields=["count(*)"],
|
||||
timeout=5
|
||||
)
|
||||
tt = time.time() - t0
|
||||
self.source_count = results[0]['count(*)']
|
||||
except Exception as e:
|
||||
logger.error(f"Source count failed: {e}")
|
||||
self.source_count = self.last_source_count
|
||||
previous_count = self.target_collection.query(
|
||||
expr="",
|
||||
output_fields=["count(*)"],
|
||||
)[0]['count(*)']
|
||||
while not self.stop_query:
|
||||
try:
|
||||
try:
|
||||
t0 = time.time()
|
||||
results = self.target_collection.query(
|
||||
expr="",
|
||||
output_fields=["count(*)"],
|
||||
)
|
||||
tt = time.time() - t0
|
||||
target_count = results[0]['count(*)']
|
||||
except Exception as e:
|
||||
logger.error(f"Count failed: {e}")
|
||||
|
||||
target_count = self.last_target_count
|
||||
try:
|
||||
t0 = time.time()
|
||||
results = self.source_collection.query(
|
||||
expr="",
|
||||
output_fields=["count(*)"],
|
||||
)
|
||||
tt = time.time() - t0
|
||||
source_count = results[0]['count(*)']
|
||||
except Exception as e:
|
||||
logger.error(f"Count failed: {e}")
|
||||
source_count = self.last_source_count
|
||||
progress = (target_count / source_count) * 100 if source_count > 0 else 0
|
||||
self.source_count = source_count
|
||||
self.target_count = target_count
|
||||
self.sync_count = target_count - previous_count
|
||||
logger.debug(f"sync progress {target_count}/{source_count} {progress:.2f}%")
|
||||
thread1 = threading.Thread(target=count_target)
|
||||
thread2 = threading.Thread(target=count_source)
|
||||
|
||||
thread1.start()
|
||||
thread2.start()
|
||||
|
||||
thread1.join()
|
||||
thread2.join()
|
||||
|
||||
progress = (self.target_count / self.source_count) * 100 if self.source_count > 0 else 0
|
||||
|
||||
self.sync_count = self.target_count - previous_count
|
||||
self.count_series_data['timestamp'].append(time.time())
|
||||
self.count_series_data['source_count'].append(self.source_count)
|
||||
self.count_series_data['target_count'].append(self.target_count)
|
||||
logger.debug(f"sync progress {self.target_count}/{self.source_count} {progress:.2f}%")
|
||||
except Exception as e:
|
||||
logger.error(f"Count failed: {e}")
|
||||
time.sleep(0.01) # Query interval
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user