系列文章目录
前言
在开始spring事务管理的源码分析之前,我们先自己尝试简单实现一下事务管理,实现事务的传递
一、事务的使用
有了spring之后,事务的使用变得简单,但是封装得也更深,功能也更复杂,也更难以理解。所以我们要先回顾一下最基础的使用,由浅入深
1. jdbc使用事务
我们最开始使用事务是通过jdbc的方式,这也是最直接的方式,跟在数据库使用事务没什么区别
先贴一下代码:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class TransactionExample {
public static void main(String[] args) {
// 数据库连接信息
String url = "jdbc:mysql://localhost:3306/your_database_name?serverTimezone=UTC";
String user = "your_username";
String password = "your_password";
Connection conn = null;
PreparedStatement pstmt1 = null;
PreparedStatement pstmt2 = null;
try {
// 加载JDBC驱动
Class.forName("com.mysql.cj.jdbc.Driver");
// 获取数据库连接
conn = DriverManager.getConnection(url, user, password);
// 将自动提交设置为false
conn.setAutoCommit(false);
// 从Alice的账户转出500
String sql1 = "UPDATE user_account SET balance = balance - 500 WHERE name = 'Alice'";
pstmt1 = conn.prepareStatement(sql1);
pstmt1.executeUpdate();
// 模拟异常,用于测试事务回滚
// if (true) throw new SQLException("模拟异常,触发回滚");
// 向Bob的账户转入500
String sql2 = "UPDATE user_account SET balance = balance + 500 WHERE name = 'Bob'";
pstmt2 = conn.prepareStatement(sql2);
pstmt2.executeUpdate();
// 提交事务
conn.commit();
System.out.println("事务处理成功!");
} catch (Exception e) {
e.printStackTrace();
// 发生异常,回滚事务
if (conn != null) {
try {
conn.rollback();
System.out.println("事务回滚!");
} catch (SQLException ex) {
ex.printStackTrace();
}
}
} finally {
// 关闭资源
try {
if (pstmt1 != null) pstmt1.close();
if (pstmt2 != null) pstmt2.close();
if (conn != null) conn.close();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
}
过程为
- 获取数据库连接
- 将连接的提交方式从自动提交改为手动提交(开启事务,为什么不需要Begin,是因为mysql改为手动提交后,执行增删改语句,就会隐式的开启事务,直到commit或者rollback)
- 执行sql
- commit提交事务
- 如果遇到错误,rollback回滚事务
这个过程与你在数据库连接工具中直接操作数据库事务一样,很好理解。但是每次都进行获取连接,关闭连接,开启事务,提交事务,回滚事务这些与业务无关的操作,代码既繁琐,又不利于维护。
所以我们需要将获取连接,关闭连接,开启事务,提交事务,回滚事务这些与业务无关的操作抽取出来,形成一个事务管理器。
2.自己实现事务管理器
2.1 简单实现
先将获取连接、关闭连接抽取出来
public class ConnectionUtil {
private static final String DB_DRIVER = "com.mysql.jdbc.Driver";
private static final String DB_URL = "jdbc:mysql://localhost:3306/j1910";
private static final String USER = "root";
private static final String PASS = "root";
/**获取连接*/
public static Connection getConnection() {
try {
Class.forName(DB_DRIVER);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Connection connection = null;
try {
connection = DriverManager.getConnection(DB_URL, USER, PASS);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return connection;
}
/**关闭连接*/
public static void closeConnection(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// 记录关闭连接时的异常,通常不会抛出
e.printStackTrace();
}
}
}
}
再来将开启事务、提交事务和回滚事务抽取到MyTransactionManager,并提供事务调用的方法doWithTransaction。开启事务、提交事务和回滚事务都是私有的,暴露出来的是事务调用的方法。我希望每次事务调用都是独立的,所以每次都应该重新获取连接,而不能将连接保存起来
public class MyTransactionManager{
/**获取连接*/
private Connection getConnection() {
return ConnectionUtil.getConnection();
}
/**关闭连接*/
private void closeConnection(Connection connection) {
ConnectionUtil.closeConnection(connection);
}
/**开启事务*/
private void startTransaction(Connection connection) {
try {
connection.setAutoCommit(false);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
/**提交事务*/
private void commit() {
try {
getConnection().commit();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
/**回滚事务*/
private void rollback() {
try {
getConnection().rollback();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
/**执行事务*/
public void doWithTransaction(SqlAction action) {
//获取连接
Connection connection = getConnection();
//开启事务
startTransaction(connection);
try {
//执行sql
action.execute(connection);
//提交事务
commit();
} catch (Exception e) {
e.printStackTrace();
//回滚事务
rollback();
}finally {
closeConnection(connection);
}
}
/**函数式接口,调用的时候实现*/
interface SqlAction {
void execute(Connection connection);
}
}
测试一下,其中QueryRunner是org.apache.commons.dbutils中的工具类,是对PrepareStatemet的封装,简化了sql执行操作。测试结果因为int i = 1/0报错,所以回滚了
public class MainTest {
public static void main(String[] args) {
MyTransactionManager transactionManager = new MyTransactionManager();
transactionManager.doWithTransaction(connection -> {
QueryRunner queryRunner = new QueryRunner();
try {
queryRunner.update(connection,"update student set stu_name = ? where stu_id = ?","lisi",7);
int i = 1/0;
queryRunner.update(connection,"update student set stu_name = ? where stu_id = ?","wangwu",8);
} catch (SQLException throwables) {
//把错误抛出去
throw new RuntimeException(throwables);
}
});
}
}
但是这里还有一个问题,比如两个使用事务的方法,方法A和方法B,方法A调用了方法B,即使方法B失败回滚了,方法A还是能提交成功,因为两个方法使用的是不同的数据库连接
public void A(){
transactionManager.doWithTransaction(connection -> {
executeSql;//数据库操作
B()//调用了B方法
}
}
public void B(){
transactionManager.doWithTransaction(connection -> {
数据库操作
}
}
那如果我希望在事务A调用事务B的时候,使用的是同一个事务,那怎么办呢?那自然需要有个地方去维护数据库连接,最理想的方式就是用ThreadLocal去保存连接了。
2.2 新的实现
但是这样就能解决问题了吗?也不能,因为如果事务A调用事务B时,事务B成功了提交事务时,因为是同一个数据库连接,也会把事务A已完成的操作提交了,假如事务A后面又出错,想回滚也回滚不了了。所以我们应该在事务提交或者回滚前判断一下是不是前面已经有事务,如果有就不提交也不回滚了,只是给这个连接标记一下是否应该回滚,所以我们写一个新的事务管理器MyNewTransactionManager,同时也要增加几个组件,功能如下:
- ConnectionHolder:对数据库连接的封装,让他具有标记是否回滚的功能
- ConnectionSyncManager:用于存放ConnectionHolder到ThreadLocal
- TransactionObject:事务对象,判断是否是新事务
- MyNewTransactionManager:管理事务(开启、提交、回滚事务)
- MyTransactionTemplate:提供事务调用的方法(从MyNewTransactionManager中抽取出来,这样MyNewTransactionManager只需要管理事务)
代码如下:
public class ConnectionHolder {
private Connection connection;
/**是否需要回滚*/
private boolean rollback = false;
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public boolean isRollback() {
return rollback;
}
public void setRollback(boolean rollback) {
this.rollback = rollback;
}
}
public class ConnectionSyncManager {
private static ThreadLocal<ConnectionHolder> THREAD_CONNECTIONS = new ThreadLocal<>();
public static ConnectionHolder getConnectionHolder() {
return THREAD_CONNECTIONS.get();
}
public static void setConnectionHolder(ConnectionHolder connection) {
THREAD_CONNECTIONS.set(connection);
}
}
public class MyNewTransactionManager {
/**新建连接*/
private Connection getConnection() {
return ConnectionUtil.getConnection();
}
/**关闭连接*/
private void closeConnection(Connection connection) {
ConnectionUtil.closeConnection(connection);
}
/**开启事务*/
public void startTransaction(TransactionObject transactionObject) {
//如果已经有连接了,说明之前开启过事务了,就不用重复开启了,如果没有连接,就新建连接,开启事务
ConnectionHolder connectionHolder = transactionObject.getConnectionHolder();
if (connectionHolder == null) {
connectionHolder = new ConnectionHolder();
connectionHolder.setConnection(getConnection());//新建连接
transactionObject.setNewConnection(true);//标记为新的事务
transactionObject.setConnectionHolder(connectionHolder);//维护连接
ConnectionSyncManager.setConnectionHolder(connectionHolder);//将新连接放到同步器中
//开启事务
try {
connectionHolder.getConnection().setAutoCommit(false);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}else {
transactionObject.setNewConnection(false);
}
}
/**提交事务*/
public void commit(TransactionObject transactionObject) {
//如果是新的事务,处理事务。如果不是新的事务,不处理
if (transactionObject.isNewConnection()) {
try {
ConnectionHolder connectionHolder = transactionObject.getConnectionHolder();
//如果connectionHolder中rollback为true则回滚
if (connectionHolder.isRollback()) {
rollback(transactionObject);
}else {//rollback不为true提交事务
connectionHolder.getConnection().commit();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
/**回滚事务*/
public void rollback(TransactionObject transactionObject) {
//如果是新的事务,直接回滚。如果不是新的事务,将ConnectionHolder的rollback标记为true
if (transactionObject.isNewConnection()) {
try {
transactionObject.getConnectionHolder().getConnection().rollback();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}else {
transactionObject.getConnectionHolder().setRollback(true);
}
}
/**事务完成后关闭连接*/
public void completeTransaction(TransactionObject transactionObject) {
//如果是新的事务,关闭连接
if (transactionObject.isNewConnection()) {
closeConnection(transactionObject.getConnectionHolder().getConnection());
}
}
/**获取事务对象*/
public TransactionObject getTransactionObject() {
//每个事务都新建一个事务对象
TransactionObject transactionObject = new TransactionObject();
//先从ConnectionSyncManager里面找连接
ConnectionHolder connectionHolder = ConnectionSyncManager.getConnectionHolder();
transactionObject.setConnectionHolder(connectionHolder);
return transactionObject;
}
/**事务对象,每次事务调用就生成一个事务对象*/
public class TransactionObject {
private ConnectionHolder connectionHolder;
/**是不是新创建的连接,用来判断是否前面已经有事务了*/
private boolean newConnection;
public ConnectionHolder getConnectionHolder() {
return connectionHolder;
}
public void setConnectionHolder(ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
public boolean isNewConnection() {
return newConnection;
}
public void setNewConnection(boolean newConnection) {
this.newConnection = newConnection;
}
}
}```
```java
public class MyTransactionTemplate {
private MyNewTransactionManager transactionManager;
public MyTransactionTemplate(MyNewTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
/**执行事务*/
public void doWithTransaction(SqlAction action) {
//获取事务
MyNewTransactionManager.TransactionObject transactionObject = transactionManager.getTransactionObject();
//开启事务
transactionManager.startTransaction(transactionObject);
try {
//执行sql
action.execute(transactionObject.getConnectionHolder().getConnection());
//提交事务
transactionManager.commit(transactionObject);
} catch (Exception e) {
e.printStackTrace();
//回滚事务
transactionManager.rollback(transactionObject);
}finally {
transactionManager.completeTransaction(transactionObject);
}
}
/**函数式接口,调用的时候实现*/
interface SqlAction {
void execute(Connection connection);
}
}
好了,我们来测试一下吧。为了代码简便,我们直接new了MyNewTransactionManager和transactionTemplate,调用ServiceA方法时直接传入(如果不嫌麻烦,可以将MyNewTransactionManager和transactionTemplate给spring管理,然后注入到ServiceA和ServiceB中,篇幅有限,我就不贴了)
public class MainTest2 {
public static void main(String[] args) {
MyNewTransactionManager transactionManager = new MyNewTransactionManager();
MyTransactionTemplate transactionTemplate = new MyTransactionTemplate(transactionManager);
ServiceA(transactionTemplate);
}
public static void ServiceA(MyTransactionTemplate transactionTemplate) {
transactionTemplate.doWithTransaction(connection -> {
QueryRunner queryRunner = new QueryRunner();
try {
queryRunner.update(connection,"update student set stu_name = ? where stu_id = ?","zhangsan",7);
ServiceB(transactionTemplate);
} catch (SQLException throwables) {
//把错误抛出去
throw new RuntimeException(throwables);
}
});
}
public static void ServiceB(MyTransactionTemplate transactionTemplate) {
transactionTemplate.doWithTransaction(connection -> {
QueryRunner queryRunner = new QueryRunner();
try {
queryRunner.update(connection,"update student set stu_name = ? where stu_id = ?","lisi",8);
int i = 1/0;
} catch (SQLException throwables) {
//把错误抛出去
throw new RuntimeException(throwables);
}
});
}
}
结果,因为ServiceB方法报错后给ConnectionHolder设置了回滚标记,最后整体都回滚了
到这里,其实已经把spring的事务管理器DataSourceTransactionManager的基本实现过程展示出来了(可以认为是对PROPAGATION_REQUIRED这种传播方式的实现),为避免篇幅过长,下一篇我们再来看看DataSourceTransactionManager是怎样实现事务管理的吧
总结
- 我们循序渐进,手写了MyNewTransactionManager ,完成了事务方法A调用事务方法B使用同一个事务,还原了事务传递的关键步骤:
1)使用相同的数据库连接(放到ThreadLocal中)
2)需要一个标识(newConnection)判断是否是新的事务,并根据这个标识决定是真正提交、回滚,还是只标记全局的状态(rollbackOnly)
3)需要一个全局的状态(rollbackOnly),用于真正提交、回滚