适应场景:
主要用于帮助DBA自动化很多日常工作,包括:
- 数据库状态监控
- 性能问题诊断
- 日志分析
- 自动巡检
- 问题告警
系统截图:
main.py
from flask import Blueprint, render_template, request, flash, redirect, url_for
from flask_login import login_required
from app.models.datasource import DataSource
from app import db
bp = Blueprint('main', __name__)
@bp.route('/')
@login_required
def index():
datasources = DataSource.query.all()
return render_template('index.html', datasources=datasources)
@bp.route('/datasource', methods=['GET', 'POST'])
@login_required
def datasource_list():
datasources = DataSource.query.all()
return render_template('datasource/list.html', datasources=datasources)
@bp.route('/datasource/add', methods=['GET', 'POST'])
@login_required
def datasource_add():
if request.method == 'POST':
try:
datasource = DataSource(
name=request.form['name'],
host=request.form['host'],
port=int(request.form['port']),
database=request.form['database'],
username=request.form['username'],
password=request.form['password']
)
db.session.add(datasource)
db.session.commit()
flash('数据源添加成功', 'success')
return redirect(url_for('main.datasource_list'))
except Exception as e:
flash(f'添加失败: {str(e)}', 'danger')
return render_template('datasource/form.html')
@bp.route('/datasource/edit/<int:id>', methods=['GET', 'POST'])
@login_required
def datasource_edit(id):
datasource = DataSource.query.get_or_404(id)
if request.method == 'POST':
try:
datasource.name = request.form['name']
datasource.host = request.form['host']
datasource.port = int(request.form['port'])
datasource.database = request.form['database']
datasource.username = request.form['username']
if request.form['password']: # 只在提供新密码时更新
datasource.password = request.form['password']
db.session.commit()
flash('数据源更新成功', 'success')
return redirect(url_for('main.datasource_list'))
except Exception as e:
flash(f'更新失败: {str(e)}', 'danger')
return render_template('datasource/form.html', datasource=datasource)
@bp.route('/datasource/delete/<int:id>')
@login_required
def datasource_delete(id):
datasource = DataSource.query.get_or_404(id)
try:
db.session.delete(datasource)
db.session.commit()
flash('数据源删除成功', 'success')
except Exception as e:
flash(f'删除失败: {str(e)}', 'danger')
return redirect(url_for('main.datasource_list'))
@bp.route('/datasource/toggle/<int:id>')
@login_required
def datasource_toggle(id):
datasource = DataSource.query.get_or_404(id)
try:
datasource.is_active = not datasource.is_active
db.session.commit()
flash('状态更新成功', 'success')
except Exception as e:
flash(f'更新失败: {str(e)}', 'danger')
return redirect(url_for('main.datasource_list'))
monitor.py
from flask import Blueprint, render_template, jsonify
from flask_login import login_required
from app.models.datasource import DataSource
from app.services.monitor_service import MonitorService
bp = Blueprint('monitor', __name__, url_prefix='/monitor')
@bp.route('/status/<int:id>')
@login_required
def status(id):
datasource = DataSource.query.get_or_404(id)
status = MonitorService.get_database_status(id)
return render_template('monitor/status.html', datasource=datasource, status=status)
@bp.route('/api/status/<int:id>')
@login_required
def api_status(id):
status = MonitorService.get_database_status(id)
return jsonify(status)
monitor_service.py
# import pyodbc # 暂时注释掉
from app.models.datasource import DataSource
import datetime
class MonitorService:
@staticmethod
def get_database_status(datasource_id):
datasource = DataSource.query.get(datasource_id)
if not datasource:
return None
# 模拟获取更丰富的数据库状态信息
return {
'basic_info': {
'database_id': 1,
'name': datasource.database,
'state': 'ONLINE',
'recovery_model': 'FULL',
'compatibility_level': '150',
'collation': 'Chinese_PRC_CI_AS',
'created_time': '2023-01-01 08:00:00',
'last_backup_time': '2024-03-10 03:00:00'
},
'size_info': {
'data_size': '1024 MB',
'log_size': '256 MB',
'total_size': '1280 MB',
'data_space_used': 75.5, # 百分比
'log_space_used': 45.2, # 百分比
'unallocated_space': '512 MB'
},
'performance': {
'cpu_usage': 35.5, # 百分比
'memory_usage': 4096, # MB
'buffer_cache_hit': 98.5, # 百分比
'page_life_expectancy': 1200, # 秒
'batch_requests': 450, # 每秒
'user_connections': 85,
'active_transactions': 12,
'blocked_processes': 0,
'deadlocks': 0,
'lock_waits': 2
},
'io_stats': {
'reads_per_sec': 250,
'writes_per_sec': 120,
'io_pending': 0,
'io_stall_ms': 150,
'read_latency_ms': 3,
'write_latency_ms': 5
},
'availability': {
'uptime': '15 days 6 hours',
'last_restart': '2024-02-25 00:00:00',
'failovers_last_24h': 0,
'mirror_status': 'Not Configured'
},
'alerts': [
{
'type': 'warning',
'message': '数据文件空间使用率超过75%',
'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
]
}
@staticmethod
def get_database_status_real(datasource_id):
datasource = DataSource.query.get(datasource_id)
if not datasource:
return None
conn_str = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={datasource.host},{datasource.port};"
f"DATABASE={datasource.database};"
f"UID={datasource.username};"
f"PWD={datasource.password}"
)
try:
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# 获取数据库状态
cursor.execute("""
SELECT
database_id,
name,
state_desc,
recovery_model_desc,
total_size = str(size*8/1024)+' MB'
FROM sys.databases
WHERE name = ?
""", datasource.database)
status = cursor.fetchone()
# 获取性能指标
cursor.execute("""
SELECT TOP 1
cpu_time,
total_worker_time,
total_physical_reads,
total_logical_writes,
total_logical_reads
FROM sys.dm_exec_query_stats
ORDER BY total_worker_time DESC
""")
performance = cursor.fetchone()
return {
'status': {
'database_id': status[0],
'name': status[1],
'state': status[2],
'recovery_model': status[3],
'size': status[4]
},
'performance': {
'cpu_time': performance[0],
'worker_time': performance[1],
'physical_reads': performance[2],
'logical_writes': performance[3],
'logical_reads': performance[4]
}
}
except Exception as e:
return {'error': str(e)}
finally:
if 'conn' in locals():
conn.close()