本文主要基于 Sharding-JDBC 1.5.0 正式版
1. 概述
本文主要分享 JDBC 与 读写分离 的实现。为什么会把这两个东西放在一起讲呢?客户端直连数据库的读写分离主要通过获取读库和写库的不同连接来实现,和 JDBC Connection 刚好放在一块。
OK,我们先来看一段 Sharding-JDBC 官方对自己的定义和定位
可以看出,Sharding-JDBC 通过实现 JDBC规范,对上层提供透明化数据库分库分表的访问。😈 黑科技?实际我们使用的数据库连接池也是通过这种方式实现对上层无感知的提供连接池。甚至还可以通过这种方式实现对 Lucene、MongoDB 等等的访问。
扯远了,下面来看看 Sharding-JDBC
实现层级如下:JDBC 接口 =(继承)==
抽象类 =(继承)==
抽象类 =(继承)==
2. unspported 包
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3. adapter 包</h1>
`adapter` 包内的**抽象**类,实现和分库分表**无关**的方法。
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.1 WrapperAdapter</h2>
WrapperAdapter,JDBC Wrapper 适配类。
**对 Wrapper 接口实现如下两个方法**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final T T unwrap(final ClassT iface) throws SQLException {`- ` if (isWrapperFor(iface)) {`- ` return (T) this;`- ` }`- ` throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));`- `}`- ``- `@Override`- `public final boolean isWrapperFor(final Class? iface) throws SQLException {`- ` return iface.isInstance(this);`- `}`</pre>
**提供子类 `#recordMethodInvocation()` 记录方法调用, `#replayMethodsInvocation()` 回放记录的方法调用**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 记录的方法数组`- `*/`- `private final CollectionJdbcMethodInvocation jdbcMethodInvocations = new ArrayList();`- ``- `/**`- `* 记录方法调用.`- `* `- `* @param targetClass 目标类`- `* @param methodName 方法名称`- `* @param argumentTypes 参数类型`- `* @param arguments 参数`- `*/`- `public final void recordMethodInvocation(final Class? targetClass, final String methodName, final Class?[] argumentTypes, final Object[] arguments) {`- ` try {`- ` jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));`- ` } catch (final NoSuchMethodException ex) {`- ` throw new ShardingJdbcException(ex);`- ` }`- `}`- ``- `/**`- `* 回放记录的方法调用.`- `* `- `* @param target 目标对象`- `*/`- `public final void replayMethodsInvocation(final Object target) {`- ` for (JdbcMethodInvocation each : jdbcMethodInvocations) {`- ` each.invoke(target);`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
这两个方法有什么用途呢?例如下文会提到的 AbstractConnectionAdapter 的 `#setAutoCommit()`,当它无数据库连接时,先记录;等获得到数据连接后,再回放:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`// AbstractConnectionAdapter.java`
`public final void setAutoCommit(final boolean autoCommit) throws SQLException {`
` this.autoCommit = autoCommit;`
` if (getConnections().isEmpty()) { // 无数据连接时,记录方法调用`
` recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});`
` return;`
` }`
` for (Connection each : getConnections()) {`
` each.setAutoCommit(autoCommit);`
` }`
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`public class JdbcMethodInvocation {`
` /**`
` * 方法`
` */`
` @Getter`
` private final Method method;`
` /**`
` * 方法参数`
` */`
` @Getter`
` private final Object[] arguments;`
` /**`
` * 调用方法.`
` * `
` * @param target 目标对象`
` */`
` public void invoke(final Object target) {`
` try {`
` method.invoke(target, arguments); // 反射调用`
` } catch (final IllegalAccessException | InvocationTargetException ex) {`
` throw new ShardingJdbcException("Invoke jdbc method exception", ex);`
` }`
` }`
**提供子类 `#throwSQLExceptionIfNecessary()` 抛出异常链**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `protected void throwSQLExceptionIfNecessary(final CollectionSQLException exceptions) throws SQLException {`- ` if (exceptions.isEmpty()) { // 为空不抛出异常`- ` return;`- ` }`- ` SQLException ex = new SQLException();`- ` for (SQLException each : exceptions) {`- ` ex.setNextException(each); // 异常链`- ` }`- ` throw ex;`- `}`</pre>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.2 AbstractDataSourceAdapter</h2>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.3 AbstractConnectionAdapter</h2>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 是否自动提交`- `*/`- `private boolean autoCommit = true;`- ``- `/**`- `* 获得链接`- `*`- `* @return 链接`- `*/`- `protected abstract CollectionConnection getConnections();`- ``- `@Override`- `public final boolean getAutoCommit() throws SQLException {`- ` return autoCommit;`- `}`- ``- `@Override`- `public final void setAutoCommit(final boolean autoCommit) throws SQLException {`- ` this.autoCommit = autoCommit;`- ` if (getConnections().isEmpty()) { // 无数据连接时,记录方法调用`- ` recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setAutoCommit(autoCommit);`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
`#setAutoCommit()` 调用时,实际会设置其所持有的 Connection 的 `autoCommit` 属性
`#getConnections()` 和分库分表相关,因而仅抽象该方法,留给子类实现
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final void commit() throws SQLException {`- ` for (Connection each : getConnections()) {`- ` each.commit();`- ` }`- `}`- ``- `@Override`- `public final void rollback() throws SQLException {`- ` CollectionSQLException exceptions = new LinkedList();`- ` for (Connection each : getConnections()) {`- ` try {`- ` each.rollback();`- ` } catch (final SQLException ex) {`- ` exceptions.add(ex);`- ` }`- ` }`- ` throwSQLExceptionIfNecessary(exceptions);`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
`#commit()`、 `#rollback()` 调用时,实际调用其所持有的 Connection 的方法
异常情况下, `#commit()` 和 `#rollback()` 处理方式不同,笔者暂时不知道答案,求证后会进行更新
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
`#commit()` 处理方式需要改成和 `#rollback()` 一样。代码如下:
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`public final void commit() throws SQLException {`
` CollectionSQLException exceptions = new LinkedList();`
` for (Connection each : getConnections()) {`
` try {`
` each.commit();`
` } catch (final SQLException ex) {`
` exceptions.add(ex);`
` }`
` }`
` throwSQLExceptionIfNecessary(exceptions);`
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 只读`- `*/`- `private boolean readOnly = true;`- `/**`- `* 事务级别`- `*/`- `private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;`- ``- `@Override`- `public final void setReadOnly(final boolean readOnly) throws SQLException {`- ` this.readOnly = readOnly;`- ` if (getConnections().isEmpty()) {`- ` recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setReadOnly(readOnly);`- ` }`- `}`- ``- `@Override`- `public final void setTransactionIsolation(final int level) throws SQLException {`- ` transactionIsolation = level;`- ` if (getConnections().isEmpty()) {`- ` recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setTransactionIsolation(level);`- ` }`- `}`</pre>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.4 AbstractStatementAdapter</h2>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final int getUpdateCount() throws SQLException {`- ` long result = 0;`- ` boolean hasResult = false;`- ` for (Statement each : getRoutedStatements()) {`- ` if (each.getUpdateCount() -1) {`- ` hasResult = true;`- ` }`- ` result += each.getUpdateCount();`- ` }`- ` if (result Integer.MAX_VALUE) {`- ` result = Integer.MAX_VALUE;`- ` }`- ` return hasResult ? Long.valueOf(result).intValue() : -1;`- `}`- ``- `/**`- `* 获取路由的静态语句对象集合.`- `* `- `* @return 路由的静态语句对象集合`- `*/`- `protected abstract Collection? extends Statement getRoutedStatements();`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
`#getUpdateCount()` 调用持有的 Statement 计算更新数量
`#getRoutedStatements()` 和分库分表相关,因而仅抽象该方法,留给子类实现
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.5 AbstractPreparedStatementAdapter</h2>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 记录的设置参数方法数组`- `*/`- `private final ListSetParameterMethodInvocation setParameterMethodInvocations = new LinkedList();`- `/**`- `* 参数`- `*/`- `@Getter`- `private final ListObject parameters = new ArrayList();`- ``- `@Override`- `public final void setInt(final int parameterIndex, final int x) throws SQLException {`- ` setParameter(parameterIndex, x);`- ` recordSetParameter("setInt", new Class[]{int.class, int.class}, parameterIndex, x);`- `}`- ``- `/**`- `* 记录占位符参数`- `*`- `* @param parameterIndex 占位符参数位置`- `* @param value 参数`- `*/`- `private void setParameter(final int parameterIndex, final Object value) {`- ` if (parameters.size() == parameterIndex - 1) {`- ` parameters.add(value);`- ` return;`- ` }`- ` for (int i = parameters.size(); i = parameterIndex - 1; i++) { // 用 null 填充前面未设置的位置`- ` parameters.add(null);`- ` }`- ` parameters.set(parameterIndex - 1, value);`- `}`- ``- `/**`- `* 记录设置参数方法调用`- `*`- `* @param methodName 方法名,例如 setInt、setLong 等`- `* @param argumentTypes 参数类型`- `* @param arguments 参数`- `*/`- `private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {`- ` try {`- ` setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));`- ` } catch (final NoSuchMethodException ex) {`- ` throw new ShardingJdbcException(ex);`- ` }`- `}`- ``- `/**`- `* 回放记录的设置参数方法调用`- `*`- `* @param preparedStatement 预编译语句对象`- `*/`- `protected void replaySetParameter(final PreparedStatement preparedStatement) {`- ` addParameters();`- ` for (SetParameterMethodInvocation each : setParameterMethodInvocations) {`- ` updateParameterValues(each, parameters.get(each.getIndex() - 1)); // 同一个位置多次设置,值可能不一样,需要更新下`- ` each.invoke(preparedStatement);`- ` }`- `}`- ``- `/**`- `* 当使用分布式主键时,生成后会添加到 parameters,此时 parameters 数量多于 setParameterMethodInvocations,需要生成该分布式主键的 SetParameterMethodInvocation`- `*/`- `private void addParameters() {`- ` for (int i = setParameterMethodInvocations.size(); i parameters.size(); i++) {`- ` recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));`- ` }`- `}`- ``- `private void updateParameterValues(final SetParameterMethodInvocation setParameterMethodInvocation, final Object value) {`- ` if (!Objects.equals(setParameterMethodInvocation.getValue(), value)) {`- ` setParameterMethodInvocation.changeValueArgument(value); // 修改占位符参数`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
逻辑类似 `WrapperAdapter` 的 `#recordMethodInvocation()`, `#replayMethodsInvocation()`,请**认真**阅读代码注释
SetParameterMethodInvocation,继承 JdbcMethodInvocation,反射调用参数设置方法的工具类:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`public final class SetParameterMethodInvocation extends JdbcMethodInvocation {`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;">/**`
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">4. 插入流程</h1>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `// 代码仅仅是例子,生产环境下请注意异常处理和资源关闭`- `String sql = "INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";`- `DataSource dataSource = new ShardingDataSource(shardingRule);`- `Connection conn = dataSource.getConnection();`- `PreparedStatement ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); // 返回主键需要 Statement.RETURN_GENERATED_KEYS`- `ps.setLong(1, 100);`- `ps.executeUpdate();`- `ResultSet rs = ps.getGeneratedKeys();`- `if (rs.next()) {`- ` System.out.println("id:" + rs.getLong(1));`- `}`</pre>
**调用 `#executeUpdate()` 方法,内部过程如下**:
<img style="box-sizing: border-box;border-width: 2px;border-style: solid;border-color: rgb(238, 238, 238);border-radius: 6px;" class=" aligncenter" src="https://www.javazhiyin.com/wp-content/uploads/2018/08/java1-1534475531.jpeg" src="https://www.javazhiyin.com/wp-content/themes/mnews/images/post-loading.gif" alt="数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离" title="数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离">
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `// ShardingPreparedStatement.java`- `@Override`- `public int executeUpdate() throws SQLException {`- ` try {`- ` CollectionPreparedStatementUnit preparedStatementUnits = route();`- ` return new PreparedStatementExecutor(`- ` getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeUpdate();`- ` } finally {`- ` clearBatch();`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
`#route()` 分库分表路由,获得预编译语句对象执行单元( PreparedStatementUnit )集合。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`public final class PreparedStatementUnit implements BaseStatementUnit {`
` /**`
` * SQL 执行单元`
` */`
` private final SQLExecutionUnit sqlExecutionUnit;`
` /**`
` * 预编译语句对象`
` */`
` private final PreparedStatement statement;`
`#executeUpdate()` 调用执行引擎**并行**执行**多个**预编译语句对象。执行时,最终调用预编译语句对象( PreparedStatement )。我们来看一个例子:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`// PreparedStatementExecutor.java`
`public int executeUpdate() {`
` Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate");`
` try {`
` ListInteger results = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallbackInteger() {`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;"> @Override`
// 代码仅仅是例子,生产环境下请注意异常处理和资源关闭
1.String sql = "INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";
1.DataSource dataSource = new ShardingDataSource(shardingRule);
1.Connection conn = dataSource.getConnection();
1.PreparedStatement ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); // 返回主键需要 Statement.RETURN_GENERATED_KEYS
1.ps.setLong(1, 100);
1.ResultSet rs = ps.getGeneratedKeys();
1.if (rs.next()) {
1.System.out.println("id:" + rs.getLong(1));
调用执行引擎并行执行多个预编译语句对象。执行时,最终调用预编译语句对象( PreparedStatement )。我们来看一个例子:
` @Override`
` public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {`
` // 调用 PreparedStatement#executeUpdate()`
` return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();`
` }`
` });`
` return accumulate(results);`
<ul style="list-style-type: square;" class="list-paddingleft-2">
调用 `#generatePreparedStatement()` 创建 PreparedStatement,后调用 `#replaySetParameter()` 回放设置占位符参数到 PreparedStatement
当 **声明返回主键** 时,即 `#isReturnGeneratedKeys()` 返回 `true` 时,调用 `connection.prepareStatement(sqlExecutionUnit.getSql(),RETURN_GENERATED_KEYS)`。为什么该方法会返回 `true`?上文例子 `conn.prepareStatement(sql,Statement.RETURN_GENERATED_KEYS)`
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
**声明返回主键**后,插入执行完成,我们调用 `#getGeneratedKeys()` 可以获得主键 :
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`// ShardingStatement.java`
`public ResultSet getGeneratedKeys() throws SQLException {`
` OptionalGeneratedKey generatedKey = getGeneratedKey();`
` // 分布式主键`
` if (generatedKey.isPresent() && returnGeneratedKeys) {`
` return new GeneratedKeysResultSet(routeResult.getGeneratedKeys().iterator(), generatedKey.get().getColumn(), this);`
` }`
` // 数据库自增`
` if (1 == getRoutedStatements().size()) {`
` return getRoutedStatements().iterator().next().getGeneratedKeys();`
` }`
` return new GeneratedKeysResultSet();`
`// ShardingConnection.java`
`public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {`
` return new ShardingPreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);`
`// ShardingPreparedStatement.java`
`public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) {`
` this(shardingConnection, sql);`
` if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {`
` markReturnGeneratedKeys();`
` }`
`protected final void markReturnGeneratedKeys() {`
` returnGeneratedKeys = true;`
调用 `ShardingConnection#getConnection()` 方法获得该 PreparedStatement 对应的**真实**数据库连接( Connection ):
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
调用 `#getCachedConnection()` 尝试获得**已缓存**的数据库连接;如果缓存中不存在,获取到连接后会进行**缓存**
从 ShardingRule 配置的 DataSourceRule 获取**真实**的数据源( DataSource )
MasterSlaveDataSource 实现**主从**数据源封装,我们在下小节分享
调用 `#replayMethodsInvocation()` 回放记录的 Connection 方法
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`// ShardingConnection.java`
` * 根据数据源名称获取相应的数据库连接.`
` * `
` * @param dataSourceName 数据源名称`
` * @param sqlType SQL语句类型`
` * @return 数据库连接`
` * @throws SQLException SQL异常`
` */`
`public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {`
` // 从连接缓存中获取连接`
` OptionalConnection connection = getCachedConnection(dataSourceName, sqlType);`
` if (connection.isPresent()) {`
` return connection.get();`
` }`
` Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));`
` //`
` DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);`
` Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);`
` String realDataSourceName;`
` if (dataSource instanceof MasterSlaveDataSource) {`
` dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);`
` realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);`
` } else {`
` realDataSourceName = dataSourceName;`
` }`
` Connection result = dataSource.getConnection();`
` MetricsContext.stop(metricsContext);`
` // 添加到连接缓存`
` connectionMap.put(realDataSourceName, result);`
` // 回放 Connection 方法`
` replayMethodsInvocation(result);`
` return result;`
`private OptionalConnection getCachedConnection(final String dataSourceName, final SQLType sqlType) {`
` String key = connectionMap.containsKey(dataSourceName) ? dataSourceName : MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);`
` return Optional.fromNullable(connectionMap.get(key));`
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">5. 查询流程</h1>
单纯从 `core` 包里的 JDBC 实现,查询流程 `#executeQuery()` 和 `#execute()` 基本一致,差别在于**执行**和**多结果集归并**。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public ResultSet executeQuery() throws SQLException {`- ` ResultSet result;`- ` try {`- ` // 路由`- ` CollectionPreparedStatementUnit preparedStatementUnits = route();`- ` // 执行`- ` ListResultSet resultSets = new PreparedStatementExecutor(`- ` getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();`- ` // 结果归并`- ` result = new ShardingResultSet(resultSets, new MergeEngine(`- ` getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());`- ` } finally {`- ` clearBatch();`- ` }`- ` // 设置结果集`- ` setCurrentResultSet(result);`- ` return result;`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
**SQL执行** 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— SQL 执行》
**结果归并** 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— 结果归并》
结果归并 `#merge()` 完后,创建分片结果集( ShardingResultSet )
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
`public final class ShardingResultSet extends AbstractResultSetAdapter {`
` /**`
` * 归并结果集`
` */`
` private final ResultSetMerger mergeResultSet;`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;">@Override`
方法获得该 PreparedStatement 对应的真实数据库连接( Connection ):
` * 根据负载均衡策略获取从库数据源.`
` * `
` * @param name 读写分离数据源名称`
` * @param slaveDataSources 从库数据源列表`
` * @return 选中的从库数据源`
` */`
`DataSource getDataSource(String name, List<DataSource> slaveDataSources);`
`private static final ConcurrentHashMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>();`
`public DataSource getDataSource(final String name, final List<DataSource> slaveDataSources) {`
` AtomicInteger count = COUNT_MAP.containsKey(name) ? COUNT_MAP.get(name) : new AtomicInteger(0);`
` COUNT_MAP.putIfAbsent(name, count);`
` count.compareAndSet(slaveDataSources.size(), 0);`
` return slaveDataSources.get(count.getAndIncrement() % slaveDataSources.size());`
666. 彩蛋