Seata源码—6.Seata AT模式的数据源代理一

news2025/5/19 7:45:05

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码

1.Seata的Resource资源接口源码

数据源代理DataSourceProxy不仅实现了Seata的Resource资源接口,同时还继承了实现了SeataDataSourceProxy接口的抽象类AbstractDataSourceProxy。

由于SeataDataSourceProxy接口又继承自JDK提供的DataSource接口,所以通过数据源连接池DataSource接口的方法,可以获取数据源的连接。

注意:这里的数据源==数据库。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    ...
}

public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {
    ...
}

public interface SeataDataSourceProxy extends DataSource {
    ...
}

public interface DataSource extends CommonDataSource, Wrapper {
    //获取数据源连接
    Connection getConnection() throws SQLException;
    Connection getConnection(String username, String password) throws SQLException;
}

Seata的Resource资源接口有三个方法:

一.getResourceGroupId()方法用来获取资源分组

比如主从节点同属一个分组。

二.getResourceId()方法用来获取数据源ID

比如数据源连接URL可作为数据源ID。

三.getBranchType()方法用来获取分支事务类型

比如类型有:AT、TCC、SAGA、XA。

//Resource that can be managed by Resource Manager and involved into global transaction.
//资源是由RM资源管理组件来负责管理的
//RM资源管理器组件会负责把一个个的资源纳入到全局事务里去
//比如RM可以管理数据库资源,把一个数据库本地事务纳入到全局事务里去
public interface Resource {
    //Get the resource group id.
    //e.g. master and slave data-source should be with the same resource group id.
    //获取到资源分组ID
    //主从架构的数据源关联到同一个资源分组ID
    //比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID
    String getResourceGroupId();

    //Get the resource id.
    //e.g. url of a data-source could be the id of the db data-source resource.
    //比如数据源连接URL可以作为数据源的ID
    String getResourceId();

    //get resource type, AT, TCC, SAGA and XA
    //branchType表示分支事务类型:AT、TCC、SAGA、XA
    BranchType getBranchType();
}

2.Seata数据源连接池代理的实现源码

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

SeataDataSourceProxy数据源代理在继承DataSource数据源连接池的基础上,增加了两个方法:一个是获取代理的目标数据源连接池的方法,一个是获取代理的目标数据源连接池对应的分支事务类型的方法。

public interface SeataDataSourceProxy extends DataSource {
    //Gets target data source. 
    //获取代理的目标数据源连接池
    DataSource getTargetDataSource();
    //Gets branch type. 
    //获取代理的目标数据源连接池对应的分支事务类型
    BranchType getBranchType();
}

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

AbstractDataSourceProxy抽象类的主要工作是封装代理的目标数据源连接池targetDataSource。

//The type Abstract data source proxy.
//AbstractDataSourceProxy主要的工作就是:
//封装了代理的目标数据源连接池targetDataSource
public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {
    //The Target data source.
    //代理目标的连接池,可以通过targetDataSource来获取连接
    protected DataSource targetDataSource;

    //Instantiates a new Abstract data source proxy.
    public AbstractDataSourceProxy(){ }

    //Instantiates a new Abstract data source proxy.
    public AbstractDataSourceProxy(DataSource targetDataSource) {
        this.targetDataSource = targetDataSource;
    }

    //Gets target data source.
    @Override
    public DataSource getTargetDataSource() {
        return targetDataSource;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return targetDataSource.unwrap(iface);
    }

    //判断目标连接池targetDataSource是否包装了指定的接口iface
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return targetDataSource.isWrapperFor(iface);
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return targetDataSource.getLogWriter();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        targetDataSource.setLogWriter(out);
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        targetDataSource.setLoginTimeout(seconds);
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return targetDataSource.getLoginTimeout();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return targetDataSource.getParentLogger();
    }
}

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

初始化数据源连接池代理DataSourceProxy的具体逻辑是:首先从目标数据库连接池dataSource中获取一个数据库连接,然后根据这个数据库连接Connection去初始化jdbcUrl和dbType,接着根据数据库连接地址jdbcUrl初始化resourceId,然后把当前数据库连接池代理DataSourceProxy作为一个资源注册到默认的RM即DefaultResourceManager里去,最后设置RootContext上下文即线程本地变量副本中的分支事务类型。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);
    //默认资源分组ID
    private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";
    //Enable the table meta checker,默认是不启用的
    private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);
    //Table meta checker interval,默认是60s
    private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);
    //资源组ID,比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID
    private String resourceGroupId;
    //代理的目标数据源连接url,这个数据源连接url也可以作为resourceId
    private String jdbcUrl;
    //数据源ID,比如数据库连接url就可以作为一个数据源ID
    private String resourceId;
    //数据源类型
    private String dbType;
    //数据源连接用户名
    private String userName;
    //定时调度的线程池,定时检查表里的元数据
    private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tableMetaChecker", 1, true));

    //Instantiates a new Data source proxy.
    public DataSourceProxy(DataSource targetDataSource) {
        this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
    }

    //Instantiates a new Data source proxy.
    //@param targetDataSource the target data source
    //@param resourceGroupId  the resource group id
    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        init(targetDataSource, resourceGroupId);
    }

    //初始化数据源连接池代理DataSourceProxy
    private void init(DataSource dataSource, String resourceGroupId) {
        //资源分组ID
        this.resourceGroupId = resourceGroupId;

        //从目标数据库连接池dataSource中,获取一个数据库连接
        try (Connection connection = dataSource.getConnection()) {
            //获取数据库连接connection里的元数据的连接url
            jdbcUrl = connection.getMetaData().getURL();
            //根据连接url获取到数据库类型
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                //如果数据库类型等于oracle,则需要获取数据库连接connection的元数据的用户名
                userName = connection.getMetaData().getUserName();
            } else if (JdbcConstants.MARIADB.equals(dbType)) {
                //如果数据库类型等于mariadb,则需要对数据库类型进行赋值为MySQL
                dbType = JdbcConstants.MYSQL;
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }

        //初始化资源ID,也就是获取数据库连接url来初始化resourceID
        initResourceId();

        //把当前数据库连接池代理,作为一个资源,注册到默认的RM里,也就是DefaultResourceManager
        DefaultResourceManager.get().registerResource(this);

        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExecutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                      
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        //设置RootContext上下文,即线程本地变量副本中的分支事务类型
        RootContext.setDefaultBranchType(this.getBranchType());
    }
    
    private void initResourceId() {
        if (JdbcConstants.POSTGRESQL.equals(dbType)) {
            initPGResourceId();
        } else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {
            initDefaultResourceId();
            resourceId = resourceId + "/" + userName;
        } else if (JdbcConstants.MYSQL.equals(dbType)) {
            initMysqlResourceId();
        } else {
            initDefaultResourceId();
        }
    }
    
    private void initMysqlResourceId() {
        String startsWith = "jdbc:mysql:loadbalance://";
        if (jdbcUrl.startsWith(startsWith)) {
            String url;
            if (jdbcUrl.contains("?")) {
                url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
            } else {
                url = jdbcUrl;
            }
            resourceId = url.replace(",", "|");
        } else {
            initDefaultResourceId();
        }
    }
    ...
}

3.Client向Server发起注册RM的源码

初始化数据源连接池代理DataSourceProxy时,会将数据库连接池代理作为资源,注册到DefaultResourceManager资源管理器中。

而初始化DefaultResourceManager时,会通过SPI机制加载所有的ResourceManager。

因此在执行DataSourceProxy的init()方法进行初始化时,由于会调用DefaultResourceManager的registerResource()方法,所以最后会执行到DataSourceManager的registerResource()方法。

在DataSourceManager的registerResource()方法中,首先会把数据源连接池代理DataSourceProxy放入一个Map中进行缓存,然后通过RmNettyRemotingClient构造一个注册RM的请求把数据源连接池代理DataSourceProxy作为资源注册到Seata Server中。

public class DefaultResourceManager implements ResourceManager {
    //all resource managers
    protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
    
    private static class SingletonHolder {
        private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
    }
    
    //Get resource manager.
    public static DefaultResourceManager get() {
        return SingletonHolder.INSTANCE;
    }
    
    private DefaultResourceManager() {
        initResourceManagers();
    }
    
    protected void initResourceManagers() {
        //init all resource managers
        //通过SPI加载所有的ResourceManager资源管理器
        //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA
        List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
        if (CollectionUtils.isNotEmpty(allResourceManagers)) {
            for (ResourceManager rm : allResourceManagers) {
                resourceManagers.put(rm.getBranchType(), rm);
            }
        }
    }
    
    @Override
    public void registerResource(Resource resource) {
        getResourceManager(resource.getBranchType()).registerResource(resource);
    }
    
    public ResourceManager getResourceManager(BranchType branchType) {
        ResourceManager rm = resourceManagers.get(branchType);
        if (rm == null) {
            throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
        }
        return rm;
    }
    ...
}

//The type Data source manager.
//DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {
    //异步化worker
    private final AsyncWorker asyncWorker = new AsyncWorker(this);
    //RM负责管理的一些resource资源
    private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
    ...
    
    @Override
    public void registerResource(Resource resource) {
        DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
        //根据资源ID和数据源代理,把数据源连接池代理DataSourceProxy放入到map里去
        dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
        super.registerResource(dataSourceProxy);
    }
    ...
}

public abstract class AbstractResourceManager implements ResourceManager {
    ...
    @Override
    public void registerResource(Resource resource) {
        //通过RmNettyRemotingClient把RM注册到Seata Server中
        RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
    }
    ...
}

4.Client向Server注册RM时的交互源码

(1)Client异步发送注册RM的请求给Server

(2)Server收到注册RM的请求后的处理及异步响应

(1)Client异步发送注册RM的请求给Server

public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
    ...
    //Register new db key.
    public void registerResource(String resourceGroupId, String resourceId) {
        //Resource registration cannot be performed until the RM client is initialized
        if (StringUtils.isBlank(transactionServiceGroup)) {
            return;
        }

        if (getClientChannelManager().getChannels().isEmpty()) {
            getClientChannelManager().reconnect(transactionServiceGroup);
            return;
        }

        synchronized (getClientChannelManager().getChannels()) {
            //向每一个Server发起注册
            for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
                String serverAddress = entry.getKey();
                Channel rmChannel = entry.getValue();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("will register resourceId:{}", resourceId);
                }
                sendRegisterMessage(serverAddress, rmChannel, resourceId);
            }
        }
    }
    
    public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
        RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
        message.setResourceIds(resourceId);
        try {
            //异步发送注册RM的请求
            super.sendAsyncRequest(channel, message);
        } catch (FrameworkException e) {
            if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
                getClientChannelManager().releaseChannel(channel, serverAddress);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove not writable channel:{}", channel);
                }
            } else {
                LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);
            }
        }
    }
    ...
}

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...
    @Override
    public void sendAsyncRequest(Channel channel, Object msg) {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
            return;
        }
        RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
            ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
            : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
        if (rpcMessage.getBody() instanceof MergeMessage) {
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
        }
        super.sendAsync(channel, rpcMessage);
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    //rpc async request.
    protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
        channelWritableCheck(channel, rpcMessage.getBody());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                destroyChannel(future.channel());
            }
        });
    }
    ...
}

(2)Server收到注册RM的请求后的处理及异步响应

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
            processMessage(ctx, (RpcMessage) msg);
        }
        ...
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    //Rpc message processing.
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
            //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        ...
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    ...
}

public class RegRmProcessor implements RemotingProcessor {
    ...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        onRegRmMessage(ctx, rpcMessage);
    }

    private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
        //获取请求的发送地址
        String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        boolean isSuccess = false;
        String errorInfo = StringUtils.EMPTY;
        try {
            if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
                //通过Channel管理组件ChannelManager,注册RM网络连接
                ChannelManager.registerRMChannel(message, ctx.channel());
                Version.putChannelVersion(ctx.channel(), message.getVersion());
                isSuccess = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
                }
            } else {
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
                }
            }
        } catch (Exception exx) {
            isSuccess = false;
            errorInfo = exx.getMessage();
            LOGGER.error("RM register fail, error message:{}", errorInfo);
        }
        RegisterRMResponse response = new RegisterRMResponse(isSuccess);
        if (StringUtils.isNotEmpty(errorInfo)) {
            response.setMsg(errorInfo);
        }
        //返回响应给客户端
        remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
        if (isSuccess && LOGGER.isInfoEnabled()) {
            LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), message.getVersion());
        }
    }
    ...
}

public class ChannelManager {
    ...
    public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {
        Version.checkVersion(resourceManagerRequest.getVersion());
        Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
        RpcContext rpcContext;
        if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
            rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
                resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
                resourceManagerRequest.getResourceIds(), channel);
            rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
        } else {
            rpcContext = IDENTIFIED_CHANNELS.get(channel);
            rpcContext.addResources(dbkeySet);
        }
        if (dbkeySet == null || dbkeySet.isEmpty()) { 
            return; 
        }
        for (String resourceId : dbkeySet) {
            String clientIp;
            ConcurrentMap<Integer, RpcContext> portMap =
                CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
                .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
                .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());

            rpcContext.holdInResourceManagerChannels(resourceId, portMap);
            updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
        }
    }
    ...
}

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @Override
    public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
        Channel clientChannel = channel;
        if (!(msg instanceof HeartbeatMessage)) {
            clientChannel = ChannelManager.getSameClientChannel(channel);
        }
        if (clientChannel != null) {
            RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
                ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE);
            super.sendAsync(clientChannel, rpcMsg);
        } else {
            throw new RuntimeException("channel is error.");
        }
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    //rpc async request.
    protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
        channelWritableCheck(channel, rpcMessage.getBody());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                destroyChannel(future.channel());
            }
        });
    }
    ...
}

5.数据源连接代理与SQL句柄代理的初始化源码

(1)数据库操作的三剑客之连接、句柄和结果

(2)数据源连接代理的初始化

(3)数据源连接代理对SQL进行预编译

(4)SQL句柄代理的初始化

(5)SQL句柄代理执行SQL

(1)数据库操作的三剑客之连接、句柄和结果

Seata Client或者Seata Server进行数据库操作的大致流程如下所示:

public class LogStoreDataBaseDAO implements LogStore {
    //The Log store data source. 数据源连接池
    protected DataSource logStoreDataSource = null;
    ...
    @Override
    public GlobalTransactionDO queryGlobalTransactionDO(long transactionId) {
        String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByTransactionId(globalTable);
        Connection conn = null;//连接
        PreparedStatement ps = null;//句柄
        ResultSet rs = null;//结果
        try {
            //1.从数据源连接池中获取数据源连接
            conn = logStoreDataSource.getConnection();
            conn.setAutoCommit(true);
            //2.对sql语句进行预编译
            ps = conn.prepareStatement(sql);
            ps.setLong(1, transactionId);
            //3.执行sql语句
            rs = ps.executeQuery();
            if (rs.next()) {
                return convertGlobalTransactionDO(rs);
            } else {
                return null;
            }
        } catch (SQLException e) {
            throw new DataAccessException(e);
        } finally {
            IOUtil.close(rs, ps, conn);
        }
    }
    ...
}

(2)数据源连接代理的初始化

Seata Client或者Seata Server进行数据库操作时,首先会通过数据库连接池代理DataSourceProxy获取数据库连接,也就是会通过DataSourceProxy的getConnection()方法获取数据源连接代理ConnectionProxy,其中就会根据获取到的一个数据源连接Connection初始化一个数据源连接代理ConnectionProxy。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    ...
    @Override
    public ConnectionProxy getConnection() throws SQLException {
        //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
    ...
}

public class ConnectionProxy extends AbstractConnectionProxy {
    //Instantiates a new Connection proxy.
    public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
        super(dataSourceProxy, targetConnection);
    }
    ...
}

public abstract class AbstractConnectionProxy implements Connection {
    //The Data source proxy. 数据源连接池代理
    protected DataSourceProxy dataSourceProxy;
    //The Target connection. 目标数据源连接
    protected Connection targetConnection;
    //Instantiates a new Abstract connection proxy.
    public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
        this.dataSourceProxy = dataSourceProxy;
        this.targetConnection = targetConnection;
    }
    ...
}

(3)数据源连接代理对SQL进行预编译

数据源连接代理ConnectionProxy在进行数据库操作时,获取到数据库连接Connection之后,就需要对要执行的SQL进行预编译,也就是会调用AbstractConnectionProxy的prepareStatement()方法。

public abstract class AbstractConnectionProxy implements Connection {
    ...
    //对SQL进行预编译
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        //support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        //如果是AT模式
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(
                        getTargetConnection(),
                        sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()
                    );
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql, pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        //返回一个SQL句柄代理
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }
    ...
}

(4)SQL句柄代理的初始化

SQL句柄代理PreparedStatementProxy的初始化主要是设置目标SQL、目标句柄和数据源连接代理。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
    //Instantiates a new Prepared statement proxy.
    public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {
        super(connectionProxy, targetStatement, targetSQL);
    }
    ...
}

public abstract class AbstractPreparedStatementProxy extends StatementProxy<PreparedStatement> implements PreparedStatement {
    protected Map<Integer, ArrayList<Object>> parameters;
    
    private void initParameterHolder() {
        this.parameters = new HashMap<>();
    }
    
    //Instantiates a new Abstract prepared statement proxy.
    public AbstractPreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {
        super(connectionProxy, targetStatement, targetSQL);
        initParameterHolder();
    }
    ...
}

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
    //Instantiates a new Statement proxy.
    public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) throws SQLException {
        super(connectionWrapper, targetStatement, targetSQL);
    }
    ...
}

public abstract class AbstractStatementProxy<T extends Statement> implements Statement {
    //The Connection proxy.
    protected AbstractConnectionProxy connectionProxy;
    //The Target statement.
    protected T targetStatement;
    //The Target sql.
    protected String targetSQL;
    ...
    //Instantiates a new Abstract statement proxy.
    public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) throws SQLException {
        this.connectionProxy = connectionProxy;
        this.targetStatement = targetStatement;
        this.targetSQL = targetSQL;
    }
    ...
}

(5)SQL句柄代理执行SQL

从数据源连接池中获取到数据源连接,以及对SQL语句进行预编译后,就可以调用SQL句柄代理PreparedStatementProxy的executeQuery()等方法执行SQL语句。

6.Seata基于SQL句柄代理执行SQL的源码

(1)Spring的JdbcTemplate操作数据库的三剑客

(2)基于SQL句柄代理执行SQL的流程

(1)Spring的JdbcTemplate操作数据库的三剑客

连接、句柄和结果。

@Disabled
public class LocalTransactionWithGlobalLockDataSourceBasicTest {
    private static ClassPathXmlApplicationContext context;
    private static JdbcTemplate jdbcTemplate;
    
    @BeforeAll
    public static void before() {
        context = new ClassPathXmlApplicationContext("basic-test-context.xml");
        jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");
    }
    
    @Test
    public void testInsert() {
        RootContext.bindGlobalLockFlag();
        jdbcTemplate.update("insert into user0 (id, name, gmt) values (?, ?, ?)", new Object[]{2, "xxx", new Date()});
    }
    ...
}

public class JdbcTemplate extends JdbcAccessor implements JdbcOperations {
    ...
    @Override
    public int update(String sql, @Nullable Object... args) throws DataAccessException {
        return update(sql, newArgPreparedStatementSetter(args));
    }
    
    @Override
    public int update(String sql, @Nullable PreparedStatementSetter pss) throws DataAccessException {
        return update(new SimplePreparedStatementCreator(sql), pss);
    }
    
    protected int update(final PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss) throws DataAccessException {
        logger.debug("Executing prepared SQL update");
        return updateCount(execute(psc, ps -> {
            try {
                if (pss != null) {
                    pss.setValues(ps);
                }
                //PreparedStatement执行SQL
                int rows = ps.executeUpdate();
                if (logger.isTraceEnabled()) {
                    logger.trace("SQL update affected " + rows + " rows");
                }
                return rows;
            } finally {
                if (pss instanceof ParameterDisposer) {
                    ((ParameterDisposer) pss).cleanupParameters();
                }
            }
        }, true));
    }
    
    @Nullable
    private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources) throws DataAccessException {
        Assert.notNull(psc, "PreparedStatementCreator must not be null");
        Assert.notNull(action, "Callback object must not be null");
        if (logger.isDebugEnabled()) {
            String sql = getSql(psc);
            logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
        }
        //1.获取连接
        Connection con = DataSourceUtils.getConnection(obtainDataSource());
        PreparedStatement ps = null;
        try {
            //2.创建句柄
            ps = psc.createPreparedStatement(con);
            applyStatementSettings(ps);
            //3.执行SQL的结果
            T result = action.doInPreparedStatement(ps);
            handleWarnings(ps);
            return result;
        } catch (SQLException ex) {
            if (psc instanceof ParameterDisposer) {
                ((ParameterDisposer) psc).cleanupParameters();
            }
            String sql = getSql(psc);
            psc = null;
            JdbcUtils.closeStatement(ps);
            ps = null;
            DataSourceUtils.releaseConnection(con, getDataSource());
            con = null;
            throw translateException("PreparedStatementCallback", sql, ex);
        } finally {
            if (closeResources) {
                if (psc instanceof ParameterDisposer) {
                    ((ParameterDisposer) psc).cleanupParameters();
                }
                JdbcUtils.closeStatement(ps);
                DataSourceUtils.releaseConnection(con, getDataSource());
            }
        }
    }
    ...
}

(2)基于SQL句柄代理执行SQL的流程

SQL句柄代理PreparedStatementProxy在调用execute()方法执行SQL时,就会调用到ExecuteTemplate执行模版的execute()方法。

而ExecuteTemplate执行模版的execute()方法,如果发现不需要全局锁 + 没有开启全局事务,那么就普通执行本地事务。否则,最终就会调用到BaseTransactionalExecutor的excute()方法。

在BaseTransactionalExecutor的excute()方法中,首先会从线程本地变量副本中获取xid,然后再执行SQL。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
    ...
    @Override
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
    }
    
    @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }
    ...
}

public class ExecuteTemplate {
    ...
    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }
    
    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
        //如果发现不需要全局锁,而且没有开启AT模式下的全局事务,那么就普通执行本地事务
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            //Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        //获取到DB的类型
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        //通过SPI机制加载InsertExecutor
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            //比如下面最终会调用BaseTransactionalExecutor.excute()方法
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
    ...
}

@LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE)
public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable {
    ...
    //Instantiates a new Abstract dml base executor.
    public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }
    ...
}

public abstract class BaseInsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> implements InsertExecutor<T> {
    ...
    public BaseInsertExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }
    ...
}

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    ...
    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }
    
    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
    ...
}

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    //The Statement proxy.
    protected StatementProxy<S> statementProxy;
    //The Statement callback.
    protected StatementCallback<T, S> statementCallback;
    //The Sql recognizer.
    protected SQLRecognizer sqlRecognizer;
    ...
    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
        SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }
    ...
    @Override
    public T execute(Object... args) throws Throwable {
        //获取xid
        String xid = RootContext.getXID();
        if (xid != null) {
            statementProxy.getConnectionProxy().bind(xid);
        }
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }
    
    //Do execute object.
    protected abstract T doExecute(Object... args) throws Throwable;
    ...
}

7.执行SQL语句前取消自动提交事务的源码

执行ExecuteTemplate执行模版的execute()方法时,最终会调用到BaseTransactionalExecutor基础事务执行器的excute()方法。

执行BaseTransactionalExecutor的execute()方法时,又会执行到AbstractDMLBaseExecutor的doExecute()方法。该方法会判断目标数据库连接是否会自动提交本地事务,默认情况下本地事务都是自动提交的。如果是,则取消自动提交本地事务。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    //The Statement proxy.
    protected StatementProxy<S> statementProxy;
    //The Statement callback.
    protected StatementCallback<T, S> statementCallback;
    //The Sql recognizer.
    protected SQLRecognizer sqlRecognizer;
    ...
    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
        SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }
    ...
    @Override
    public T execute(Object... args) throws Throwable {
        //获取xid
        String xid = RootContext.getXID();
        if (xid != null) {
            statementProxy.getConnectionProxy().bind(xid);
        }
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }
    
    //Do execute object.
    protected abstract T doExecute(Object... args) throws Throwable;
    ...
}

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    ...
    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }
    
    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
    ...
}

public abstract class AbstractConnectionProxy implements Connection {
    ...
    @Override
    public boolean getAutoCommit() throws SQLException {
        //判断目标数据库连接是否是自动提交,默认情况是都是自动提交的
        return targetConnection.getAutoCommit();
    }
    ...
}

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    ...
    //Execute auto commit true t.
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            //修改自动提交事务的设置,此时需要阻止自动提交事务
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);//执行SQL语句
                connectionProxy.commit();//手动提交本地事务
                return result;
            });
        } catch (Exception e) {
            //when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
    ...
}

public class ConnectionProxy extends AbstractConnectionProxy {
    private final ConnectionContext context = new ConnectionContext();
    ...
    //change connection autoCommit to false by seata
    public void changeAutoCommit() throws SQLException {
        getContext().setAutoCommitChanged(true);
        setAutoCommit(false);
    }
    
    //Gets context.
    public ConnectionContext getContext() {
        return context;
    }
    
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
            //change autocommit from false to true, we should commit() first according to JDBC spec.
            doCommit();
        }
        //把目标数据源连接的自动提交事务设置为false
        targetConnection.setAutoCommit(autoCommit);
    }
    ...
}

8.执行SQL语句前后构建数据镜像的源码

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

(2)以UpdateExecuto为例构建前后镜像

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

一.首先设置数据源连接阻止其自动提交事务

二.根据目标SQL语句构建beforeImage前镜像

三.执行目标SQL语句(但还没提交其对应的事务)

四.根据beforeImage前镜像构建afterImage后镜像

五.根据前镜像和后镜像构建UndoLog数据

六.手动提交数据源连接代理的事务

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    ...
    //Execute auto commit true t.
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            //修改数据源连接的自动提交事务的设置,此时需要阻止自动提交事务
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);//执行SQL语句
                connectionProxy.commit();//手动提交本地事务
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
    
    //Execute auto commit false t.
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //根据目标SQL语句构建beforeImage,表示目标SQL执行前的数据镜像
        TableRecords beforeImage = beforeImage();
        //接下来真正去执行这条SQL语句,但是此时本地事务还不会提交
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        int updateCount = statementProxy.getUpdateCount();
        if (updateCount > 0) {
            //根据beforeImage构建afterImage,表示目标SQL执行后的数据镜像
            TableRecords afterImage = afterImage(beforeImage);
            //根据beforeImage和afterImage准备undoLog数据到数据源连接代理中
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }
    ...
}

(2)以UpdateExecutor为例构建前后镜像

public class TableRecords implements java.io.Serializable {
    //表的元数据
    private transient TableMeta tableMeta;
    //表的名称
    private String tableName;
    //表的多行数据
    private List<Row> rows = new ArrayList<Row>();
    ...
}

public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
    private static final Configuration CONFIG = ConfigurationFactory.getInstance();
    private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);
    
    //Instantiates a new Update executor.
    public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    protected TableRecords beforeImage() throws SQLException {
        ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
        TableMeta tmeta = getTableMeta();
        //根据主键ID值拼接一个SQL语句,查询这条数据更新前的镜像
        String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
        return buildTableRecords(tmeta, selectSQL, paramAppenderList);
    }

    private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
        SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
        List<String> updateColumns = recognizer.getUpdateColumns();
        StringBuilder prefix = new StringBuilder("SELECT ");
        StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
        String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
        String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);
        String limitCondition = buildLimitCondition(recognizer, paramAppenderList);
        if (StringUtils.isNotBlank(whereCondition)) {
            suffix.append(WHERE).append(whereCondition);
        }
        if (StringUtils.isNotBlank(orderByCondition)) {
            suffix.append(" ").append(orderByCondition);
        }
        if (StringUtils.isNotBlank(limitCondition)) {
            suffix.append(" ").append(limitCondition);
        }
        suffix.append(" FOR UPDATE");
        StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
        if (ONLY_CARE_UPDATE_COLUMNS) {
            if (!containsPK(updateColumns)) {
                selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
            }
            for (String columnName : updateColumns) {
                selectSQLJoin.add(columnName);
            }

            //The on update xxx columns will be auto update by db, so it's also the actually updated columns
            List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
            onUpdateColumns.removeAll(updateColumns);
            for (String onUpdateColumn : onUpdateColumns) {
                selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
            }
        } else {
            for (String columnName : tableMeta.getAllColumns().keySet()) {
                selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
            }
        }
        return selectSQLJoin.toString();
    }

    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        TableMeta tmeta = getTableMeta();
        if (beforeImage == null || beforeImage.size() == 0) {
            return TableRecords.empty(getTableMeta());
        }
        String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
        ResultSet rs = null;
        try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
            SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
            rs = pst.executeQuery();
            return TableRecords.buildRecords(tmeta, rs);
        } finally {
            IOUtil.close(rs);
        }
    }

    private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
        StringBuilder prefix = new StringBuilder("SELECT ");
        String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());
        String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;
        StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
        if (ONLY_CARE_UPDATE_COLUMNS) {
            SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
            List<String> updateColumns = recognizer.getUpdateColumns();
            if (!containsPK(updateColumns)) {
                selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
            }
            for (String columnName : updateColumns) {
                selectSQLJoiner.add(columnName);
            }

            //The on update xxx columns will be auto update by db, so it's also the actually updated columns
            List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
            onUpdateColumns.removeAll(updateColumns);
            for (String onUpdateColumn : onUpdateColumns) {
                selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
            }
        } else {
            for (String columnName : tableMeta.getAllColumns().keySet()) {
                selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
            }
        }
        return selectSQLJoiner.toString();
    }
}

9.构建全局锁的key和UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

(2)构建全局锁的key的源码

(3)构建UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

在基础事务执行器BaseTransactionalExecutor的prepareUndoLog()方法中,会构建全局锁的key和构建UndoLog数据,并把它们设置到数据源连接代理ConnectionProxy中。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    ...
    //prepare undo log.
    //@param beforeImage the before image
    //@param afterImage  the after image
    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
            return;
        }
        if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
            if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
            }
        }
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;

        //构建全局锁的key
        //比如更新了一批数据,那么需要针对这批数据的主键ID,来构建这批数据的全局锁的key
        String lockKeys = buildLockKey(lockKeyRecords);

        if (null != lockKeys) {
            //将全局锁key设置到数据源连接代理ConnectionProxy中
            connectionProxy.appendLockKey(lockKeys);
            //构建UndoLog
            SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
            //将UndoLog设置到数据源连接代理ConnectionProxy中
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }
    ...
}

(2)构建全局锁的key的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    ...
    //build lockKey
    //@param rowsIncludingPK the records
    //@return the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
    protected String buildLockKey(TableRecords rowsIncludingPK) {
        if (rowsIncludingPK.size() == 0) {
            return null;
        }

        //构建出来的全局锁的key形式为:table_name:id_11001
        StringBuilder sb = new StringBuilder();
        sb.append(rowsIncludingPK.getTableMeta().getTableName());
        sb.append(":");
        int filedSequence = 0;
        //pksRows指的是,更新的每一行数据主键字段和主键的值
        List<Map<String, Field>> pksRows = rowsIncludingPK.pkRows();
        //获取到主键字段名称,主键可能是联合主键,主键字段的名称可能有多个
        List<String> primaryKeysOnlyName = getTableMeta().getPrimaryKeyOnlyName();
        //rowMap就是一行数据,rowMap中的key是字段名称,value是字段值
        for (Map<String, Field> rowMap : pksRows) {
            int pkSplitIndex = 0;
            //遍历和提取这行数据里多个主键字段的名称
            for (String pkName : primaryKeysOnlyName) {
                if (pkSplitIndex > 0) {
                    sb.append("_");
                }
                //获取到多个主键字段的value,然后拼接在一起
                sb.append(rowMap.get(pkName).getValue());
                pkSplitIndex++;
            }
            filedSequence++;
            if (filedSequence < pksRows.size()) {
                sb.append(",");
            }
        }

        //最终拼成的key形如:table_name:1101_aadd,table_name:xxxx_xxx
        return sb.toString();
    }
    ...
}

(3)构建UndoLog数据的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    ...
    //build a SQLUndoLog
    //@param beforeImage the before image
    //@param afterImage  the after image
    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
        SQLType sqlType = sqlRecognizer.getSQLType();
        String tableName = sqlRecognizer.getTableName();

        SQLUndoLog sqlUndoLog = new SQLUndoLog();
        sqlUndoLog.setSqlType(sqlType);//SQL的类型可能为insert、update、delete
        sqlUndoLog.setTableName(tableName);//表的名称
        sqlUndoLog.setBeforeImage(beforeImage);//SQL执行前的数据镜像
        sqlUndoLog.setAfterImage(afterImage);//SQL执行后的数据镜像
        return sqlUndoLog;
    }
    ...
}

public class SQLUndoLog implements java.io.Serializable {
    private SQLType sqlType;
    private String tableName;
    private TableRecords beforeImage;
    private TableRecords afterImage;
    ...
}

10.Seata Client发起分支事务注册的源码

(1)ConnectionProxy.commit()提交事务

(2)ConnectionProxy.register()注册分支事务

(1)ConnectionProxy.commit()提交事务

执行数据源连接代理ConnectionProxy的commit()方法提交事务的时候,首先会先调用数据源连接代理ConnectionProxy的register()方法注册分支事务。

public class ConnectionProxy extends AbstractConnectionProxy {
    private final ConnectionContext context = new ConnectionContext();
    ...
    @Override
    public void commit() throws SQLException {
        try {
            //通过全局锁重试策略组件来执行本地事务的提交
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    
    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

    private void processLocalCommitWithGlobalLocks() throws SQLException {
        //检查全局锁keys
        checkLock(context.buildLockKeys());
        try {
            //目标数据源连接提交事务
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            //注册分支事务
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //目标数据源连接提交事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    ...
}

(2)ConnectionProxy.register()注册分支事务

执行数据源连接代理ConnectionProxy的register()方法注册分支事务的时候,会调用资源管理器DefaultResourceManager的branchRegister()方法,然后会继续调用AbstractResourceManager的branchRegister()方法来注册分支事务。

在AbstractResourceManager的branchRegister()方法中,首先会构造分支事务注册请求,然后通过RmNettyRemotingClient将分支事务注册请求发送给Seata Server。

//The type Connection proxy.
//数据源连接代理
public class ConnectionProxy extends AbstractConnectionProxy {
    private final ConnectionContext context = new ConnectionContext();
    ...
    private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //分支事务注册
        Long branchId = DefaultResourceManager.get().branchRegister(
            BranchType.AT,//事务类型
            getDataSourceProxy().getResourceId(),//资源id,资源是已经注册过了的
            null,
            context.getXid(),
            context.getApplicationData(),
            context.buildLockKeys()//注册分支事物时带上全局锁keys
        );
        context.setBranchId(branchId);
    }
    ...
}

public class DefaultResourceManager implements ResourceManager {
    protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
    
    private static class SingletonHolder {
        private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
    }
    
    public static DefaultResourceManager get() {
        return SingletonHolder.INSTANCE;
    }
    
    private DefaultResourceManager() {
        initResourceManagers();
    }
    
    protected void initResourceManagers() {
        //通过SPI加载所有的ResourceManager资源管理器
        //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA
        List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
        if (CollectionUtils.isNotEmpty(allResourceManagers)) {
            for (ResourceManager rm : allResourceManagers) {
                resourceManagers.put(rm.getBranchType(), rm);
            }
        }
    }
    
    //注册分支事务
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
    }
    
    public ResourceManager getResourceManager(BranchType branchType) {
        ResourceManager rm = resourceManagers.get(branchType);
        if (rm == null) {
            throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
        }
        return rm;
    }
    ...
}

public abstract class AbstractResourceManager implements ResourceManager {
    ...
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            request.setXid(xid);//xid是全局事务id
            request.setLockKey(lockKeys);//这次分支事务要更新数据全局锁key
            request.setResourceId(resourceId);//分支事务对应的资源id
            request.setBranchType(branchType);//分支事务类型
            request.setApplicationData(applicationData);//应用数据

            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
            }
            return response.getBranchId();
        } catch (TimeoutException toe) {
            throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
        } catch (RuntimeException rex) {
            throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
        }
    }
    ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379128.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机科技笔记: 容错计算机设计05 n模冗余系统 TMR 三模冗余系统

NMR&#xff08;N-Modular Redundancy&#xff0c;N 模冗余&#xff09;是一种通用的容错设计架构&#xff0c;通过引入 N 个冗余模块&#xff08;N ≥ 3 且为奇数&#xff09;&#xff0c;并采用多数投票机制&#xff0c;来提升系统的容错能力与可靠性。单个模块如果可靠性小于…

Spring Boot 与 RabbitMQ 的深度集成实践(一)

引言 ** 在当今的分布式系统架构中&#xff0c;随着业务复杂度的不断提升以及系统规模的持续扩张&#xff0c;如何实现系统组件之间高效、可靠的通信成为了关键问题。消息队列作为一种重要的中间件技术&#xff0c;应运而生并发挥着举足轻重的作用。 消息队列的核心价值在于其…

黑马程序员2024新版C++笔记 第2章 语句

1.if逻辑判断语句 语法主体&#xff1a; if(要执行的判断&#xff0c;结果是bool型){判断结果是true会执行的代码; } 2.AI大模型辅助编程 在Clion中搜索并安装对应插件&#xff1a; 右上角齿轮点击后找到插件(TRONGYI LINGMA和IFLYCODE)安装后重启ide即可。 重启后会有通义登…

前端动画库 Anime.js 的V4 版本,兼容 Vue、React

前端动画库 Anime.js 更新了 V4 版本&#xff0c;并对其官网进行了全面更新&#xff0c;增加了许多令人惊艳的效果&#xff0c;尤其是时间轴动画效果&#xff0c;让开发者可以更精确地控制动画节奏。 这一版本的发布不仅带来了全新的模块化 API 和显著的性能提升&#xff0c;还…

用 PyTorch 从零实现简易GPT(Transformer 模型)

用 PyTorch 从零实现简易GPT&#xff08;Transformer 模型&#xff09; 本文将结合示例代码&#xff0c;通俗易懂地拆解大模型&#xff08;Transformer&#xff09;从数据预处理到推理预测的核心组件与流程&#xff0c;并通过 Mermaid 流程图直观展示整体架构。文章结构分为四…

【通用大模型】Serper API 详解:搜索引擎数据获取的核心工具

Serper API 详解&#xff1a;搜索引擎数据获取的核心工具 一、Serper API 的定义与核心功能二、技术架构与核心优势2.1 技术实现原理2.2 对比传统方案的突破性优势 三、典型应用场景与代码示例3.1 SEO 监控系统3.2 竞品广告分析 四、使用成本与配额策略五、开发者注意事项六、替…

Spring3+Vue3项目中的知识点——JWT

全称&#xff1a;JOSN Web Token 定义了一种简洁的、自包含的格式&#xff0c;用于通信双方以json数据格式的安全传输信息 组成&#xff1a; 第一部分&#xff1a;Header&#xff08;头&#xff09;&#xff0c;记录令牌类型、签名算法等。 第二部分&#xff1a;Payload&am…

python3GUI--智慧交通分析平台:By:PyQt5+YOLOv8(详细介绍)

文章目录 一&#xff0e;前言二&#xff0e;效果预览1.目标识别与检测2.可视化展示1.车流量统计2. 目标类别占比3. 拥堵情况展示4.目标数量可视化 3.控制台4.核心内容区1.目标检测参数2.帧转QPixmap3.数据管理 5.项目结构 三&#xff0e;总结 平台规定gif最大5M&#xff0c;所以…

Linux任务管理与守护进程

一、任务管理 &#xff08;一&#xff09;进程组、作业、会话概念 &#xff08;1&#xff09;进程组概念&#xff1a;进程组是由一个或多个进程组成的集合&#xff0c;这些进程在某些方面具有关联性。在操作系统中&#xff0c;进程组是用于对进程进行分组管理的一种机制。每个…

C#里与嵌入式系统W5500网络通讯(2)

在嵌入式代码里,需要从嵌入式的MCU访问W5500芯片。 这个是通过SPI通讯来实现的,所以要先连接SPI的硬件通讯线路。 接着下来,就是怎么样访问这个芯片了。 要访问这个芯片,需要通过SPI来发送数据,而发送数据又要有一定的约定格式, 于是芯片厂商就定义下面的通讯格式: …

EMQX开源版安装指南:Linux/Windows全攻略

EMQX开源版安装教程-linux/windows 因最近自己需要使用MQTT&#xff0c;需要搭建一个MQTT服务器&#xff0c;所以想到了很久以前用到的EMQX。但是当时的EMQX使用的是开源版的&#xff0c;在官网可以直接下载。而现在再次打开官网时发现怎么也找不大开源版本了&#xff0c;所以…

【计算机视觉】OpenCV实战项目:GraspPicture 项目深度解析:基于图像分割的抓取点检测系统

GraspPicture 项目深度解析&#xff1a;基于图像分割的抓取点检测系统 一、项目概述项目特点 二、项目运行方式与执行步骤&#xff08;一&#xff09;环境准备&#xff08;二&#xff09;项目结构&#xff08;三&#xff09;执行步骤 三、重要逻辑代码解析&#xff08;一&#…

MySQL 数据库备份与还原

作者&#xff1a;IvanCodes 日期&#xff1a;2025年5月18日 专栏&#xff1a;MySQL教程 思维导图 备份 (Backup) 与 冗余 (Redundancy) 的核心区别: &#x1f3af; 备份是指创建数据的副本并将其存储在不同位置或介质&#xff0c;主要目的是在发生数据丢失、损坏或逻辑错误时进…

Kubernetes控制平面组件:Kubelet详解(四):gRPC 与 CRI gRPC实现

云原生学习路线导航页&#xff08;持续更新中&#xff09; kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计&#xff08;一&#xff09;Kubernetes架构原则和对象设计&#xff08;二&#xff09;Kubernetes架构原则和对象设计&#xff08;三&#xff09;Kubernetes控…

javax.servlet.Filter 介绍-笔记

1.javax.servlet.Filter 简介 javax.servlet.Filter 是 Java Servlet API 中的一个核心接口&#xff0c;用于在请求到达目标资源&#xff08;如 Servlet 或 JSP&#xff09;之前或响应返回给客户端之前执行预处理或后处理操作。它常用于实现与业务逻辑无关的通用功能&#xff…

Win 11开始菜单图标变成白色怎么办?

在使用windows 11的过程中&#xff0c;有时候开始菜单的某些程序图标变成白色的文件形式&#xff0c;但是程序可以正常打开&#xff0c;这个如何解决呢&#xff1f; 这通常是由于快捷方式出了问题&#xff0c;下面跟着操作步骤来解决吧。 1、右键有问题的软件&#xff0c;打开…

入门OpenTelemetry——应用自动埋点

埋点 什么是埋点 埋点&#xff0c;本质就是在你的应用程序里&#xff0c;在重要位置插入采集代码&#xff0c;比如&#xff1a; 收集请求开始和结束的时间收集数据库查询时间收集函数调用链路信息收集异常信息 这些埋点数据&#xff08;Trace、Metrics、Logs&#xff09;被…

C语言链表的操作

初学 初学C语言时&#xff0c;对于链表节点的定义一般是这样的&#xff1a; typedef struct node {int data;struct node *next; } Node; 向链表中添加节点&#xff1a; void addNode(Node **head, int data) {Node *newNode (Node*)malloc(sizeof(Node));newNode->dat…

芯片生态链深度解析(二):基础设备篇——人类精密制造的“巅峰对决”

【开篇&#xff1a;设备——芯片工业的“剑与盾”】 当ASML的EUV光刻机以每秒5万次激光脉冲在硅片上雕刻出0.13nm精度的电路&#xff08;相当于在月球表面精准定位一枚二维码&#xff09;&#xff0c;当国产28nm光刻机在华虹产线实现“从0到1”的突破&#xff0c;这场精密制造…

C语言指针深入详解(二):const修饰指针、野指针、assert断言、指针的使用和传址调用

目录 一、const修饰指针 &#xff08;一&#xff09;const修饰变量 &#xff08;二&#xff09;const 修饰指针变量 二、野指针 &#xff08;一&#xff09;野指针成因 1、指针未初始化 2、指针越界访问 3、指针指向的空间释放 &#xff08;二&#xff09;如何规避野指…