##Statement cancelled due to timeout or client request 异常
Caused by: com.mysql.jdbc.exceptions.MySQLTimeoutException: Statement cancelled due to timeout or client request
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1932)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1251)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:3461)
at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:3459)
at com.alibaba.druid.filter.FilterAdapter.preparedStatement_execute(FilterAdapter.java:1081)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:3459)
at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:3459)
at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.execute(PreparedStatementProxyImpl.java:167)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:497)
at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.ibatis.logging.jdbc.PreparedStatementLogger.invoke(PreparedStatementLogger.java:59)
at com.sun.proxy.$Proxy60.execute(Unknown Source)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.query(PreparedStatementHandler.java:63)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.query(RoutingStatementHandler.java:79)
at org.apache.ibatis.executor.ReuseExecutor.doQuery(ReuseExecutor.java:60)
at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:324)
at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)
at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:136)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
at com.sun.proxy.$Proxy58.query(Unknown Source)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
at com.sun.proxy.$Proxy58.query(Unknown Source)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:148)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:141)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:136)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:433)
##5.1.48mysql驱动包 sql 语句超时设置
/**
* Sets the queryTimeout limit
*
* @param seconds
* -
* the new query timeout limit in seconds
*
* @exception SQLException
* if a database access error occurs
*/
public void setQueryTimeout(int seconds) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (seconds < 0) {
throw SQLError.createSQLException(Messages.getString("Statement.21"), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());
}
this.timeoutInMillis = seconds * 1000;
}
}
自定义的 CancelTask
类,它继承自 TimerTask
类,用于在 Java 应用程序中取消 MySQL 数据库中的查询任务。这个类是 JDBC 驱动程序的一部分,用于处理查询超时的情况。以下是代码的主要功能和流程:
-
构造函数 (
CancelTask(StatementImpl cancellee)
):- 接收一个
StatementImpl
对象作为参数,这个对象代表需要被取消的 SQL 语句。 - 复制连接的属性到
origConnProps
属性集合中。 - 保存原始连接的 URL 和 ID。
- 接收一个
-
run 方法:
- 这个方法是
TimerTask
的核心,当定时器触发时会调用此方法。 - 创建一个新的线程
cancelThread
来执行取消操作,以避免阻塞定时器线程。
- 这个方法是
-
cancelThread 线程:
- 在这个线程中,尝试获取物理连接
physicalConn
。 - 如果连接支持超时后关闭(
getQueryTimeoutKillsConnection()
返回true
),则标记 SQL 语句为已取消,并关闭连接。 - 如果连接不支持,尝试使用原始连接的属性和 URL 重新建立连接,并执行
KILL QUERY
命令来取消查询。 - 如果在尝试重新连接时捕获到
NullPointerException
,则忽略,因为这意味着连接已经关闭,查询已经超时。
- 在这个线程中,尝试获取物理连接
-
异常处理:
- 捕获
SQLException
和NullPointerException
,并在caughtWhileCancelling
变量中保存SQLException
。 - 在
finally
块中,关闭cancelStmt
和cancelConn
,并清理CancelTask
对象的引用。
- 捕获
这个类的主要目的是在查询超时时提供一个机制来取消正在执行的 SQL 查询。这是 JDBC 驱动程序中的一个高级特性,允许应用程序在查询执行时间过长时中断查询,以避免资源长时间占用。这种机制对于需要处理大量数据或执行复杂查询的应用程序尤其重要,因为它可以帮助提高应用程序的响应性和资源利用率。
##执行查询语句,设置超时任务timeoutTask = new CancelTask(this);
timeoutInMillis毫秒后,调度任务
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
private boolean executeInternal(String sql, boolean returnGeneratedKeys) throws SQLException {
MySQLConnection locallyScopedConn = checkClosed();
synchronized (locallyScopedConn.getConnectionMutex()) {
checkClosed();
checkNullOrEmptyQuery(sql);
resetCancelledState();
implicitlyCloseAllOpenResults();
if (sql.charAt(0) == '/') {
if (sql.startsWith(PING_MARKER)) {
doPingInstead();
return true;
}
}
char firstNonWsChar = StringUtils.firstAlphaCharUc(sql, findStartOfStatement(sql));
boolean maybeSelect = firstNonWsChar == 'S';
this.retrieveGeneratedKeys = returnGeneratedKeys;
this.lastQueryIsOnDupKeyUpdate = returnGeneratedKeys && firstNonWsChar == 'I' && containsOnDuplicateKeyInString(sql);
if (!maybeSelect && locallyScopedConn.isReadOnly()) {
throw SQLError.createSQLException(Messages.getString("Statement.27") + Messages.getString("Statement.28"), SQLError.SQL_STATE_ILLEGAL_ARGUMENT,
getExceptionInterceptor());
}
boolean readInfoMsgState = locallyScopedConn.isReadInfoMsgEnabled();
if (returnGeneratedKeys && firstNonWsChar == 'R') {
// If this is a 'REPLACE' query, we need to be able to parse the 'info' message returned from the server to determine the actual number of keys
// generated.
locallyScopedConn.setReadInfoMsgEnabled(true);
}
try {
setupStreamingTimeout(locallyScopedConn);
if (this.doEscapeProcessing) {
Object escapedSqlResult = EscapeProcessor.escapeSQL(sql, locallyScopedConn.serverSupportsConvertFn(), locallyScopedConn);
if (escapedSqlResult instanceof String) {
sql = (String) escapedSqlResult;
} else {
sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;
}
}
CachedResultSetMetaData cachedMetaData = null;
ResultSetInternalMethods rs = null;
this.batchedGeneratedKeys = null;
if (useServerFetch()) {
rs = createResultSetUsingServerFetch(sql);
} else {
CancelTask timeoutTask = null;
String oldCatalog = null;
try {
if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
}
if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
oldCatalog = locallyScopedConn.getCatalog();
locallyScopedConn.setCatalog(this.currentCatalog);
}
//
// Check if we have cached metadata for this query...
//
Field[] cachedFields = null;
if (locallyScopedConn.getCacheResultSetMetadata()) {
cachedMetaData = locallyScopedConn.getCachedMetaData(sql);
if (cachedMetaData != null) {
cachedFields = cachedMetaData.fields;
}
}
//
// Only apply max_rows to selects
//
locallyScopedConn.setSessionMaxRows(maybeSelect ? this.maxRows : -1);
statementBegins();
rs = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency, createStreamingResultSet(),
this.currentCatalog, cachedFields);
if (timeoutTask != null) {
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;
}
timeoutTask.cancel();
timeoutTask = null;
}
synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;
if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}
resetCancelledState();
throw cause;
}
}
} finally {
if (timeoutTask != null) {
timeoutTask.cancel();
locallyScopedConn.getCancelTimer().purge();
}
if (oldCatalog != null) {
locallyScopedConn.setCatalog(oldCatalog);
}
}
}
if (rs != null) {
this.lastInsertId = rs.getUpdateID();
this.results = rs;
rs.setFirstCharOfQuery(firstNonWsChar);
if (rs.reallyResult()) {
if (cachedMetaData != null) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData, this.results);
} else {
if (this.connection.getCacheResultSetMetadata()) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, null /* will be created */, this.results);
}
}
}
}
return ((rs != null) && rs.reallyResult());
} finally {
locallyScopedConn.setReadInfoMsgEnabled(readInfoMsgState);
this.statementExecuting.set(false);
}
}
}
##发送 cancelStmt.execute("KILL QUERY " + physicalConn.getId()); 给mysql服务端
class CancelTask extends TimerTask {
SQLException caughtWhileCancelling = null;
StatementImpl toCancel;
Properties origConnProps = null;
String origConnURL = "";
long origConnId = 0;
CancelTask(StatementImpl cancellee) throws SQLException {
this.toCancel = cancellee;
this.origConnProps = new Properties();
Properties props = StatementImpl.this.connection.getProperties();
Enumeration<?> keys = props.propertyNames();
while (keys.hasMoreElements()) {
String key = keys.nextElement().toString();
this.origConnProps.setProperty(key, props.getProperty(key));
}
this.origConnURL = StatementImpl.this.connection.getURL();
this.origConnId = StatementImpl.this.connection.getId();
}
@Override
public void run() {
Thread cancelThread = new Thread() {
@Override
public void run() {
Connection cancelConn = null;
java.sql.Statement cancelStmt = null;
try {
MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
if (physicalConn != null) {
if (physicalConn.getQueryTimeoutKillsConnection()) {
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
physicalConn.realClose(false, false, true,
new MySQLStatementCancelledException(Messages.getString("Statement.ConnectionKilledDueToTimeout")));
} else {
synchronized (StatementImpl.this.cancelTimeoutMutex) {
if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
// All's fine
cancelConn = physicalConn.duplicate();
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + physicalConn.getId());
} else {
try {
cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
} catch (NullPointerException npe) {
// Log this? "Failed to connect to " + origConnURL + " and KILL query"
}
}
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
}
}
}
} catch (SQLException sqlEx) {
CancelTask.this.caughtWhileCancelling = sqlEx;
} catch (NullPointerException npe) {
// Case when connection closed while starting to cancel.
// We can't easily synchronize this, because then one thread can't cancel() a running query.
// Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
} finally {
if (cancelStmt != null) {
try {
cancelStmt.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
if (cancelConn != null) {
try {
cancelConn.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
CancelTask.this.toCancel = null;
CancelTask.this.origConnProps = null;
CancelTask.this.origConnURL = null;
}
}
};
cancelThread.start();
}
}
##mysql8处理KILL QUERY C++源码
THD
类的 awake
方法,并向其传递了一个参数,这个参数决定了是只杀死查询(THD::KILL_QUERY
)还是关闭整个连接(THD::KILL_CONNECTION
)。awake
方法的作用是发送一个信号给目标线程,使其停止当前正在执行的操作。
- 如果
only_kill_query
参数为true
,则传递THD::KILL_QUERY
,这会导致目标线程停止当前的查询操作。 - 如果
only_kill_query
参数为false
,则传递THD::KILL_CONNECTION
,这会导致目标线程关闭整个连接,包括所有查询。
这个方法是线程间通信的一种方式,用于安全地中断另一个线程的执行。在 MySQL 服务器中,这是处理 KILL
命令的核心部分,允许管理员或有权限的用户终止长时间运行的查询或释放资源。
/**
kill on thread.
@param thd Thread class
@param id Thread id
@param only_kill_query Should it kill the query or the connection
@note
This is written such that we have a short lock on LOCK_thd_list
*/
static uint kill_one_thread(THD *thd, my_thread_id id, bool only_kill_query) {
uint error = ER_NO_SUCH_THREAD;
Find_thd_with_id find_thd_with_id(id);
DBUG_TRACE;
DBUG_PRINT("enter", ("id=%u only_kill=%d", id, only_kill_query));
DEBUG_SYNC(thd, "kill_thd_begin");
THD_ptr tmp = Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
Security_context *sctx = thd->security_context();
if (tmp) {
/*
If we're SUPER, we can KILL anything, including system-threads.
No further checks.
KILLer: thd->m_security_ctx->user could in theory be NULL while
we're still in "unauthenticated" state. This is a theoretical
case (the code suggests this could happen, so we play it safe).
KILLee: tmp->m_security_ctx->user will be NULL for system threads.
We need to check so Jane Random User doesn't crash the server
when trying to kill a) system threads or b) unauthenticated users'
threads (Bug#43748).
If user of both killer and killee are non-NULL, proceed with
slayage if both are string-equal.
*/
if (sctx->check_access(SUPER_ACL) ||
sctx->has_global_grant(STRING_WITH_LEN("CONNECTION_ADMIN")).first ||
sctx->user_matches(tmp->security_context())) {
/*
Process the kill:
if thread is not already undergoing any kill connection.
Killer must have SYSTEM_USER privilege iff killee has the same privilege
privilege
*/
if (tmp->killed != THD::KILL_CONNECTION) {
if (tmp->is_system_user() && !thd->is_system_user()) {
error = ER_KILL_DENIED_ERROR;
} else {
tmp->awake(only_kill_query ? THD::KILL_QUERY : THD::KILL_CONNECTION);
error = 0;
}
} else
error = 0;
} else
error = ER_KILL_DENIED_ERROR;
}
DEBUG_SYNC(thd, "kill_thd_end");
DBUG_PRINT("exit", ("%d", error));
return error;
}
/*
kills a thread and sends response
SYNOPSIS
sql_kill()
thd Thread class
id Thread id
only_kill_query Should it kill the query or the connection
*/
static void sql_kill(THD *thd, my_thread_id id, bool only_kill_query) {
uint error;
if (!(error = kill_one_thread(thd, id, only_kill_query))) {
if (!thd->killed) my_ok(thd);
} else
my_error(error, MYF(0), id);
}
##gdb调用栈
(gdb) b sql_kill
#0 kill_one_thread (thd=0x73e42003e450, id=444, only_kill_query=true) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:6518
#1 0x00006040ed8ae76a in sql_kill (thd=0x73e42003e450, id=444, only_kill_query=true) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:6582
#2 0x00006040ed8a71e1 in mysql_execute_command (thd=0x73e42003e450, first_level=true) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:4306
#3 0x00006040ed8aacb3 in dispatch_sql_command (thd=0x73e42003e450, parser_state=0x73e5594f79f0) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:5447
#4 0x00006040ed8a00d7 in dispatch_command (thd=0x73e42003e450, com_data=0x73e5594f8340, command=COM_QUERY) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:2112
#5 0x00006040ed89df77 in do_command (thd=0x73e42003e450) at /home/yym/mysql8/mysql-8.1.0/sql/sql_parse.cc:1459
#6 0x00006040edaf5835 in handle_connection (arg=0x6040f65a0060) at /home/yym/mysql8/mysql-8.1.0/sql/conn_handler/connection_handler_per_thread.cc:303
#7 0x00006040efa34bdc in pfs_spawn_thread (arg=0x6040f66eb480) at /home/yym/mysql8/mysql-8.1.0/storage/perfschema/pfs.cc:3043
#8 0x000073e569094ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#9 0x000073e569126850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81