摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/jdbc-implement-and-read-write-splitting/ 「芋道源码」欢迎转载,保留摘要,谢谢!
排版已崩….建议点击原文,抱歉~~~~
本文主要基于 Sharding-JDBC 1.5.0 正式版
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
- RocketMQ / MyCAT / Sharding-JDBC **所有**源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC **中文注释源码 GitHub 地址**
- 您对于源码的疑问每条留言**都**将得到**认真**回复。**甚至不知道如何读源码也可以请教噢**。
- **新的**源码解析文章**实时**收到通知。**每周更新一篇左右**。
- **认真的**源码交流微信群。
1. 概述
本文主要分享 JDBC 与 读写分离 的实现。为什么会把这两个东西放在一起讲呢?客户端直连数据库的读写分离主要通过获取读库和写库的不同连接来实现,和 JDBC Connection 刚好放在一块。
OK,我们先来看一段 Sharding-JDBC 官方对自己的定义和定位
Sharding-JDBC定位为轻量级java框架,使用客户端直连数据库,以jar包形式提供服务,未使用中间层,无需额外部署,无其他依赖,DBA也无需改变原有的运维方式,可理解为增强版的JDBC驱动,旧代码迁移成本几乎为零。
可以看出,Sharding-JDBC 通过实现 JDBC规范,对上层提供透明化数据库分库分表的访问。😈 黑科技?实际我们使用的数据库连接池也是通过这种方式实现对上层无感知的提供连接池。甚至还可以通过这种方式实现对 Lucene、MongoDB 等等的访问。
扯远了,下面来看看 Sharding-JDBC
jdbc
包的结构:
根据
core
包,可以看出分到四种我们超级熟悉的对象
Datasource
Connection
Statement
ResultSet
实现层级如下:JDBC 接口 =(继承)==
unsupported
抽象类 =(继承)==
unsupported
抽象类 =(继承)==
core
类。
本文内容顺序
查询流程,分析的类:
读写分离,分析的类:
Sharding-JDBC 正在收集使用公司名单:传送门。 🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门 Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门 登记吧,骚年!传送门
2. unspported 包
unspported
包内的抽象类,声明不支持操作的数据对象,所有方法都是
thrownewSQLFeatureNotSupportedException()
方式。
public abstract class AbstractUnsupportedGeneratedKeysResultSet extends AbstractUnsupportedOperationResultSet {</code></p></li>- ``- ` @Override`- ` public boolean getBoolean(final int columnIndex) throws SQLException {`- ` throw new SQLFeatureNotSupportedException("getBoolean");`- ` }`- ``- ` // .... 省略其它类似方法`- `}`- ``- `public abstract class AbstractUnsupportedOperationConnection extends WrapperAdapter implements Connection {`- ``- ` @Override`- ` public final CallableStatement prepareCall(final String sql) throws SQLException {`- ` throw new SQLFeatureNotSupportedException("prepareCall");`- ` }`- ``- ` // .... 省略其它类似方法`- `}`</ol></pre>
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3. adapter 包</h1>
`adapter` 包内的**抽象**类,实现和分库分表**无关**的方法。
**考虑到第4、5两小节更容易理解,本小节贴的代码会相对多**
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.1 WrapperAdapter</h2>
WrapperAdapter,JDBC Wrapper 适配类。
**对 Wrapper 接口实现如下两个方法**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final T T unwrap(final ClassT iface) throws SQLException {`- ` if (isWrapperFor(iface)) {`- ` return (T) this;`- ` }`- ` throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));`- `}`- ``- `@Override`- `public final boolean isWrapperFor(final Class? iface) throws SQLException {`- ` return iface.isInstance(this);`- `}`</pre>
**提供子类 `#recordMethodInvocation()` 记录方法调用, `#replayMethodsInvocation()` 回放记录的方法调用**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 记录的方法数组`- `*/`- `private final CollectionJdbcMethodInvocation jdbcMethodInvocations = new ArrayList();`- ``- `/**`- `* 记录方法调用.`- `* `- `* @param targetClass 目标类`- `* @param methodName 方法名称`- `* @param argumentTypes 参数类型`- `* @param arguments 参数`- `*/`- `public final void recordMethodInvocation(final Class? targetClass, final String methodName, final Class?[] argumentTypes, final Object[] arguments) {`- ` try {`- ` jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));`- ` } catch (final NoSuchMethodException ex) {`- ` throw new ShardingJdbcException(ex);`- ` }`- `}`- ``- `/**`- `* 回放记录的方法调用.`- `* `- `* @param target 目标对象`- `*/`- `public final void replayMethodsInvocation(final Object target) {`- ` for (JdbcMethodInvocation each : jdbcMethodInvocations) {`- ` each.invoke(target);`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
这两个方法有什么用途呢?例如下文会提到的 AbstractConnectionAdapter 的 `#setAutoCommit()`,当它无数据库连接时,先记录;等获得到数据连接后,再回放:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`// AbstractConnectionAdapter.java`
</li>
<li>
`@Override`
</li>
<li>
`public final void setAutoCommit(final boolean autoCommit) throws SQLException {`
</li>
<li>
` this.autoCommit = autoCommit;`
</li>
<li>
` if (getConnections().isEmpty()) { // 无数据连接时,记录方法调用`
</li>
<li>
` recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});`
</li>
<li>
` return;`
</li>
<li>
` }`
</li>
<li>
` for (Connection each : getConnections()) {`
</li>
<li>
` each.setAutoCommit(autoCommit);`
</li>
<li>
` }`
</li>
<li>
`}`
</li>
</ol>
<li>
JdbcMethodInvocation,反射调用JDBC相关方法的工具类:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`public class JdbcMethodInvocation {`
</li>
<li>
``
</li>
<li>
` /**`
</li>
<li>
` * 方法`
</li>
<li>
` */`
</li>
<li>
` @Getter`
</li>
<li>
` private final Method method;`
</li>
<li>
` /**`
</li>
<li>
` * 方法参数`
</li>
<li>
` */`
</li>
<li>
` @Getter`
</li>
<li>
` private final Object[] arguments;`
</li>
<li>
``
</li>
<li>
` /**`
</li>
<li>
` * 调用方法.`
</li>
<li>
` * `
</li>
<li>
` * @param target 目标对象`
</li>
<li>
` */`
</li>
<li>
` public void invoke(final Object target) {`
</li>
<li>
` try {`
</li>
<li>
` method.invoke(target, arguments); // 反射调用`
</li>
<li>
` } catch (final IllegalAccessException | InvocationTargetException ex) {`
</li>
<li>
` throw new ShardingJdbcException("Invoke jdbc method exception", ex);`
</li>
<li>
` }`
</li>
<li>
` }`
</li>
<li>
`}`
</li>
</ol>
**提供子类 `#throwSQLExceptionIfNecessary()` 抛出异常链**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `protected void throwSQLExceptionIfNecessary(final CollectionSQLException exceptions) throws SQLException {`- ` if (exceptions.isEmpty()) { // 为空不抛出异常`- ` return;`- ` }`- ` SQLException ex = new SQLException();`- ` for (SQLException each : exceptions) {`- ` ex.setNextException(each); // 异常链`- ` }`- ` throw ex;`- `}`</pre>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.2 AbstractDataSourceAdapter</h2>
AbstractDataSourceAdapter,数据源适配类。
直接点击链接查看源码。
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.3 AbstractConnectionAdapter</h2>
AbstractConnectionAdapter,数据库连接适配类。
我们来瞅瞅大家最关心的**事务**相关方法的实现。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 是否自动提交`- `*/`- `private boolean autoCommit = true;`- ``- `/**`- `* 获得链接`- `*`- `* @return 链接`- `*/`- `protected abstract CollectionConnection getConnections();`- ``- `@Override`- `public final boolean getAutoCommit() throws SQLException {`- ` return autoCommit;`- `}`- ``- `@Override`- `public final void setAutoCommit(final boolean autoCommit) throws SQLException {`- ` this.autoCommit = autoCommit;`- ` if (getConnections().isEmpty()) { // 无数据连接时,记录方法调用`- ` recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setAutoCommit(autoCommit);`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
`#setAutoCommit()` 调用时,实际会设置其所持有的 Connection 的 `autoCommit` 属性
</li>
<li>
`#getConnections()` 和分库分表相关,因而仅抽象该方法,留给子类实现
</li>
</ul>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final void commit() throws SQLException {`- ` for (Connection each : getConnections()) {`- ` each.commit();`- ` }`- `}`- ``- `@Override`- `public final void rollback() throws SQLException {`- ` CollectionSQLException exceptions = new LinkedList();`- ` for (Connection each : getConnections()) {`- ` try {`- ` each.rollback();`- ` } catch (final SQLException ex) {`- ` exceptions.add(ex);`- ` }`- ` }`- ` throwSQLExceptionIfNecessary(exceptions);`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
`#commit()`、 `#rollback()` 调用时,实际调用其所持有的 Connection 的方法
</li>
<li>
异常情况下, `#commit()` 和 `#rollback()` 处理方式不同,笔者暂时不知道答案,求证后会进行更新
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
<li>
`#commit()` 处理方式需要改成和 `#rollback()` 一样。代码如下:
</li>
</ul>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`@Override`
</li>
<li>
`public final void commit() throws SQLException {`
</li>
<li>
` CollectionSQLException exceptions = new LinkedList();`
</li>
<li>
` for (Connection each : getConnections()) {`
</li>
<li>
` try {`
</li>
<li>
` each.commit();`
</li>
<li>
` } catch (final SQLException ex) {`
</li>
<li>
` exceptions.add(ex);`
</li>
<li>
` }`
</li>
<li>
` }`
</li>
<li>
` throwSQLExceptionIfNecessary(exceptions);`
</li>
<li>
`}`
</li>
</ol>
事务级别和是否只读相关代码如下:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 只读`- `*/`- `private boolean readOnly = true;`- `/**`- `* 事务级别`- `*/`- `private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;`- ``- `@Override`- `public final void setReadOnly(final boolean readOnly) throws SQLException {`- ` this.readOnly = readOnly;`- ` if (getConnections().isEmpty()) {`- ` recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setReadOnly(readOnly);`- ` }`- `}`- ``- `@Override`- `public final void setTransactionIsolation(final int level) throws SQLException {`- ` transactionIsolation = level;`- ` if (getConnections().isEmpty()) {`- ` recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});`- ` return;`- ` }`- ` for (Connection each : getConnections()) {`- ` each.setTransactionIsolation(level);`- ` }`- `}`</pre>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.4 AbstractStatementAdapter</h2>
AbstractStatementAdapter,静态语句对象适配类。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public final int getUpdateCount() throws SQLException {`- ` long result = 0;`- ` boolean hasResult = false;`- ` for (Statement each : getRoutedStatements()) {`- ` if (each.getUpdateCount() -1) {`- ` hasResult = true;`- ` }`- ` result += each.getUpdateCount();`- ` }`- ` if (result Integer.MAX_VALUE) {`- ` result = Integer.MAX_VALUE;`- ` }`- ` return hasResult ? Long.valueOf(result).intValue() : -1;`- `}`- ``- `/**`- `* 获取路由的静态语句对象集合.`- `* `- `* @return 路由的静态语句对象集合`- `*/`- `protected abstract Collection? extends Statement getRoutedStatements();`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
`#getUpdateCount()` 调用持有的 Statement 计算更新数量
</li>
<li>
`#getRoutedStatements()` 和分库分表相关,因而仅抽象该方法,留给子类实现
</li>
</ul>
<h2 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 24px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">3.5 AbstractPreparedStatementAdapter</h2>
AbstractPreparedStatementAdapter,预编译语句对象的适配类。
**`#recordSetParameter()`实现对占位符参数的设置**:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `/**`- `* 记录的设置参数方法数组`- `*/`- `private final ListSetParameterMethodInvocation setParameterMethodInvocations = new LinkedList();`- `/**`- `* 参数`- `*/`- `@Getter`- `private final ListObject parameters = new ArrayList();`- ``- `@Override`- `public final void setInt(final int parameterIndex, final int x) throws SQLException {`- ` setParameter(parameterIndex, x);`- ` recordSetParameter("setInt", new Class[]{int.class, int.class}, parameterIndex, x);`- `}`- ``- `/**`- `* 记录占位符参数`- `*`- `* @param parameterIndex 占位符参数位置`- `* @param value 参数`- `*/`- `private void setParameter(final int parameterIndex, final Object value) {`- ` if (parameters.size() == parameterIndex - 1) {`- ` parameters.add(value);`- ` return;`- ` }`- ` for (int i = parameters.size(); i = parameterIndex - 1; i++) { // 用 null 填充前面未设置的位置`- ` parameters.add(null);`- ` }`- ` parameters.set(parameterIndex - 1, value);`- `}`- ``- `/**`- `* 记录设置参数方法调用`- `*`- `* @param methodName 方法名,例如 setInt、setLong 等`- `* @param argumentTypes 参数类型`- `* @param arguments 参数`- `*/`- `private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {`- ` try {`- ` setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));`- ` } catch (final NoSuchMethodException ex) {`- ` throw new ShardingJdbcException(ex);`- ` }`- `}`- ``- `/**`- `* 回放记录的设置参数方法调用`- `*`- `* @param preparedStatement 预编译语句对象`- `*/`- `protected void replaySetParameter(final PreparedStatement preparedStatement) {`- ` addParameters();`- ` for (SetParameterMethodInvocation each : setParameterMethodInvocations) {`- ` updateParameterValues(each, parameters.get(each.getIndex() - 1)); // 同一个位置多次设置,值可能不一样,需要更新下`- ` each.invoke(preparedStatement);`- ` }`- `}`- ``- `/**`- `* 当使用分布式主键时,生成后会添加到 parameters,此时 parameters 数量多于 setParameterMethodInvocations,需要生成该分布式主键的 SetParameterMethodInvocation`- `*/`- `private void addParameters() {`- ` for (int i = setParameterMethodInvocations.size(); i parameters.size(); i++) {`- ` recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));`- ` }`- `}`- ``- `private void updateParameterValues(final SetParameterMethodInvocation setParameterMethodInvocation, final Object value) {`- ` if (!Objects.equals(setParameterMethodInvocation.getValue(), value)) {`- ` setParameterMethodInvocation.changeValueArgument(value); // 修改占位符参数`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
逻辑类似 `WrapperAdapter` 的 `#recordMethodInvocation()`, `#replayMethodsInvocation()`,请**认真**阅读代码注释
</li>
<li>
SetParameterMethodInvocation,继承 JdbcMethodInvocation,反射调用参数设置方法的工具类:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`public final class SetParameterMethodInvocation extends JdbcMethodInvocation {`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;">/**`
考虑到第4、5两小节更容易理解,本小节贴的代码会相对多
对 Wrapper 接口实现如下两个方法:
@Override
1.public final T T unwrap(final ClassT iface) throws SQLException {
1.if (isWrapperFor(iface)) {
1.return (T) this;
1.}
1.throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
1.}
1. ``1.@Override
1.public final boolean isWrapperFor(final Class? iface) throws SQLException {
1.return iface.isInstance(this);
1.}
- `// AbstractConnectionAdapter.java`
- `@Override`
- `public final void setAutoCommit(final boolean autoCommit) throws SQLException {`
- ` this.autoCommit = autoCommit;`
- ` if (getConnections().isEmpty()) { // 无数据连接时,记录方法调用`
- ` recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});`
- ` return;`
- ` }`
- ` for (Connection each : getConnections()) {`
- ` each.setAutoCommit(autoCommit);`
- ` }`
- `}`
提供子类
#throwSQLExceptionIfNecessary()
抛出异常链:
protected void throwSQLExceptionIfNecessary(final CollectionSQLException exceptions) throws SQLException {
1.if (exceptions.isEmpty()) { // 为空不抛出异常
1.return;
1.}
1.SQLException ex = new SQLException();
1.for (SQLException each : exceptions) {
1.ex.setNextException(each); // 异常链
1.}
1.throw ex;
1.}
直接点击链接查看源码。
我们来瞅瞅大家最关心的事务相关方法的实现。
@Override
1.public final void commit() throws SQLException {
1.for (Connection each : getConnections()) {
1.each.commit();
1.}
1.}
1.1. `@Override`1. `public final void rollback() throws SQLException {`1. ` CollectionSQLException exceptions = new LinkedList();`1. ` for (Connection each : getConnections()) {`1. ` try {`1. ` each.rollback();`1. ` } catch (final SQLException ex) {`1. ` exceptions.add(ex);`1. ` }`1. ` }`1. ` throwSQLExceptionIfNecessary(exceptions);`1. `}`1. `/**`1. `* 只读`1. `*/`1. `private boolean readOnly = true;`1. `/**`1. `* 事务级别`1. `*/`1. `private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;`1.
1.@Override
1.public final void setReadOnly(final boolean readOnly) throws SQLException {
1.this.readOnly = readOnly;
1.if (getConnections().isEmpty()) {
1.recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});
1.return;
1.}
1.for (Connection each : getConnections()) {
1.each.setReadOnly(readOnly);
1.}
1.}
1. ``1.@Override
1.public final void setTransactionIsolation(final int level) throws SQLException {
1.transactionIsolation = level;
1.if (getConnections().isEmpty()) {
1.recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});
1.return;
1.}
1.for (Connection each : getConnections()) {
1.each.setTransactionIsolation(level);
1.}
1.}
AbstractStatementAdapter,静态语句对象适配类。
#recordSetParameter()
实现对占位符参数的设置:
/**
1.* 记录的设置参数方法数组
1.*/
1.private final ListSetParameterMethodInvocation setParameterMethodInvocations = new LinkedList();
1./**
1.* 参数
1.*/
1.@Getter
1.private final ListObject parameters = new ArrayList();
1.1. `@Override`1. `public final void setInt(final int parameterIndex, final int x) throws SQLException {`1. ` setParameter(parameterIndex, x);`1. ` recordSetParameter("setInt", new Class[]{int.class, int.class}, parameterIndex, x);`1. `}`1.
1./**
1.* 记录占位符参数
1.*
1.* @param parameterIndex 占位符参数位置
1.* @param value 参数
1.*/
1.private void setParameter(final int parameterIndex, final Object value) {
1.if (parameters.size() == parameterIndex - 1) {
1.parameters.add(value);
1.return;
1.}
1.for (int i = parameters.size(); i = parameterIndex - 1; i++) { // 用 null 填充前面未设置的位置
1.parameters.add(null);
1.}
1.parameters.set(parameterIndex - 1, value);
1.}
1.1. `/**`1. `* 记录设置参数方法调用`1. `*`1. `* @param methodName 方法名,例如 setInt、setLong 等`1. `* @param argumentTypes 参数类型`1. `* @param arguments 参数`1. `*/`1. `private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {`1. ` try {`1. ` setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));`1. ` } catch (final NoSuchMethodException ex) {`1. ` throw new ShardingJdbcException(ex);`1. ` }`1. `}`1.
1./**
1.* 回放记录的设置参数方法调用
1.*
1.* @param preparedStatement 预编译语句对象
1.*/
1.protected void replaySetParameter(final PreparedStatement preparedStatement) {
1.addParameters();
1.for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
1.updateParameterValues(each, parameters.get(each.getIndex() - 1)); // 同一个位置多次设置,值可能不一样,需要更新下
1.each.invoke(preparedStatement);
1.}
1.}
1.1. `/**`1. `* 当使用分布式主键时,生成后会添加到 parameters,此时 parameters 数量多于 setParameterMethodInvocations,需要生成该分布式主键的 SetParameterMethodInvocation`1. `*/`1. `private void addParameters() {`1. ` for (int i = setParameterMethodInvocations.size(); i parameters.size(); i++) {`1. ` recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));`1. ` }`1. `}`1.
1.private void updateParameterValues(final SetParameterMethodInvocation setParameterMethodInvocation, final Object value) {
1.if (!Objects.equals(setParameterMethodInvocation.getValue(), value)) {
1.setParameterMethodInvocation.changeValueArgument(value); // 修改占位符参数
1.}
1.}
-
`public final class SetParameterMethodInvocation extends JdbcMethodInvocation {`
`
/**`
` * 位置`
` */`
`@Getter`
`private final int index;`
`/**`
` * 参数值`
` */`
`@Getter`
`private final Object value;`
``
`/**`
` * 设置参数值.`
` * `
` * @param value 参数值`
` */`
`public void changeValueArgument(final Object value) {`
` getArguments()[1] = value;`
`}`
3.6 AbstractResultSetAdapter
AbstractResultSetAdapter,代理结果集适配器。
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {</code></p></li>- ` /**`- ` * 结果集集合`- ` */`- ` @Getter`- ` private final ListResultSet resultSets;`- ``- ` @Override`- ` // TODO should return sharding statement in future`- ` public final Statement getStatement() throws SQLException {`- ` return getResultSets().get(0).getStatement();`- ` }`- ``- ` @Override`- ` public final ResultSetMetaData getMetaData() throws SQLException {`- ` return getResultSets().get(0).getMetaData();`- ` }`- ``- ` @Override`- ` public int findColumn(final String columnLabel) throws SQLException {`- ` return getResultSets().get(0).findColumn(columnLabel);`- ` }`- ``- ` // .... 省略其它方法`- `}`</ol></pre>
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">4. 插入流程</h1>
插入使用**分布式主键**例子代码如下:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `// 代码仅仅是例子,生产环境下请注意异常处理和资源关闭`- `String sql = "INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";`- `DataSource dataSource = new ShardingDataSource(shardingRule);`- `Connection conn = dataSource.getConnection();`- `PreparedStatement ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); // 返回主键需要 Statement.RETURN_GENERATED_KEYS`- `ps.setLong(1, 100);`- `ps.executeUpdate();`- `ResultSet rs = ps.getGeneratedKeys();`- `if (rs.next()) {`- ` System.out.println("id:" + rs.getLong(1));`- `}`</pre>
**调用 `#executeUpdate()` 方法,内部过程如下**:
<img style="box-sizing: border-box;border-width: 2px;border-style: solid;border-color: rgb(238, 238, 238);border-radius: 6px;" class=" aligncenter" src="https://www.javazhiyin.com/wp-content/uploads/2018/08/java1-1534475531.jpeg" src="https://www.javazhiyin.com/wp-content/themes/mnews/images/post-loading.gif" alt="数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离" title="数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离">
是不是对上层**完全透明**?!我们来看看内部是怎么实现的。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `// ShardingPreparedStatement.java`- `@Override`- `public int executeUpdate() throws SQLException {`- ` try {`- ` CollectionPreparedStatementUnit preparedStatementUnits = route();`- ` return new PreparedStatementExecutor(`- ` getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeUpdate();`- ` } finally {`- ` clearBatch();`- ` }`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
`#route()` 分库分表路由,获得预编译语句对象执行单元( PreparedStatementUnit )集合。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`public final class PreparedStatementUnit implements BaseStatementUnit {`
</li>
<li>
` /**`
</li>
<li>
` * SQL 执行单元`
</li>
<li>
` */`
</li>
<li>
` private final SQLExecutionUnit sqlExecutionUnit;`
</li>
<li>
` /**`
</li>
<li>
` * 预编译语句对象`
</li>
<li>
` */`
</li>
<li>
` private final PreparedStatement statement;`
</li>
<li>
`}`
</li>
</ol>
<li>
`#executeUpdate()` 调用执行引擎**并行**执行**多个**预编译语句对象。执行时,最终调用预编译语句对象( PreparedStatement )。我们来看一个例子:
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`// PreparedStatementExecutor.java`
</li>
<li>
`public int executeUpdate() {`
</li>
<li>
` Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate");`
</li>
<li>
` try {`
</li>
<li>
` ListInteger results = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallbackInteger() {`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;"> @Override`
// 代码仅仅是例子,生产环境下请注意异常处理和资源关闭
1.String sql = "INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";
1.DataSource dataSource = new ShardingDataSource(shardingRule);
1.Connection conn = dataSource.getConnection();
1.PreparedStatement ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); // 返回主键需要 Statement.RETURN_GENERATED_KEYS
1.ps.setLong(1, 100);
1.ps.executeUpdate();
1.ResultSet rs = ps.getGeneratedKeys();
1.if (rs.next()) {
1.System.out.println("id:" + rs.getLong(1));
1.}
调用
#executeUpdate()
方法,内部过程如下:
是不是对上层完全透明?!我们来看看内部是怎么实现的。
#executeUpdate()
调用执行引擎并行执行多个预编译语句对象。执行时,最终调用预编译语句对象( PreparedStatement )。我们来看一个例子:
` @Override`
` public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {`
` // 调用 PreparedStatement#executeUpdate()`
` return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();`
` }`
` });`
` return accumulate(results);`
// ShardingPreparedStatement.java</code></p></li>- `private CollectionPreparedStatementUnit route() throws SQLException {`- ` CollectionPreparedStatementUnit result = new LinkedList();`- ` // 路由`- ` setRouteResult(routingEngine.route(getParameters()));`- ` // 遍历 SQL 执行单元`- ` for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {`- ` SQLType sqlType = getRouteResult().getSqlStatement().getType();`- ` CollectionPreparedStatement preparedStatements;`- ` // 创建实际的 PreparedStatement`- ` if (SQLType.DDL == sqlType) {`- ` preparedStatements = generatePreparedStatementForDDL(each);`- ` } else {`- ` preparedStatements = Collections.singletonList(generatePreparedStatement(each));`- ` }`- ` getRoutedStatements().addAll(preparedStatements);`- ` // 回放设置占位符参数到 PreparedStatement`- ` for (PreparedStatement preparedStatement : preparedStatements) {`- ` replaySetParameter(preparedStatement);`- ` result.add(new PreparedStatementUnit(each, preparedStatement));`- ` }`- ` }`- ` return result;`- `}`- ``- `/**`- `* 创建 PreparedStatement`- `*`- `* @param sqlExecutionUnit SQL 执行单元`- `* @return PreparedStatement`- `* @throws SQLException 当 JDBC 操作发生异常时`- `*/`- `private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {`- ` OptionalGeneratedKey generatedKey = getGeneratedKey();`- ` // 获得连接`- ` Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());`- ` // 声明返回主键`- ` if (isReturnGeneratedKeys() || isReturnGeneratedKeys() && generatedKey.isPresent()) {`- ` return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS);`- ` }`- ` return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());`- `}`</ol></pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
调用 `#generatePreparedStatement()` 创建 PreparedStatement,后调用 `#replaySetParameter()` 回放设置占位符参数到 PreparedStatement
</li>
<li>
当 **声明返回主键** 时,即 `#isReturnGeneratedKeys()` 返回 `true` 时,调用 `connection.prepareStatement(sqlExecutionUnit.getSql(),RETURN_GENERATED_KEYS)`。为什么该方法会返回 `true`?上文例子 `conn.prepareStatement(sql,Statement.RETURN_GENERATED_KEYS)`
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
**声明返回主键**后,插入执行完成,我们调用 `#getGeneratedKeys()` 可以获得主键 :
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`// ShardingStatement.java`
</li>
<li>
`@Override`
</li>
<li>
`public ResultSet getGeneratedKeys() throws SQLException {`
</li>
<li>
` OptionalGeneratedKey generatedKey = getGeneratedKey();`
</li>
<li>
` // 分布式主键`
</li>
<li>
` if (generatedKey.isPresent() && returnGeneratedKeys) {`
</li>
<li>
` return new GeneratedKeysResultSet(routeResult.getGeneratedKeys().iterator(), generatedKey.get().getColumn(), this);`
</li>
<li>
` }`
</li>
<li>
` // 数据库自增`
</li>
<li>
` if (1 == getRoutedStatements().size()) {`
</li>
<li>
` return getRoutedStatements().iterator().next().getGeneratedKeys();`
</li>
<li>
` }`
</li>
<li>
` return new GeneratedKeysResultSet();`
</li>
<li>
`}`
</li>
<li>
`// ShardingConnection.java`
</li>
<li>
`@Override`
</li>
<li>
`public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {`
</li>
<li>
` return new ShardingPreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);`
</li>
<li>
`}`
</li>
<li>
``
</li>
<li>
`// ShardingPreparedStatement.java`
</li>
<li>
`public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) {`
</li>
<li>
` this(shardingConnection, sql);`
</li>
<li>
` if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {`
</li>
<li>
` markReturnGeneratedKeys();`
</li>
<li>
` }`
</li>
<li>
`}`
</li>
<li>
`protected final void markReturnGeneratedKeys() {`
</li>
<li>
` returnGeneratedKeys = true;`
</li>
<li>
`}`
</li>
</ol>
<li>
调用 `ShardingConnection#getConnection()` 方法获得该 PreparedStatement 对应的**真实**数据库连接( Connection ):
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
<li>
调用 `#getCachedConnection()` 尝试获得**已缓存**的数据库连接;如果缓存中不存在,获取到连接后会进行**缓存**
</li>
<li>
从 ShardingRule 配置的 DataSourceRule 获取**真实**的数据源( DataSource )
</li>
<li>
MasterSlaveDataSource 实现**主从**数据源封装,我们在下小节分享
</li>
<li>
调用 `#replayMethodsInvocation()` 回放记录的 Connection 方法
</li>
</ul>
<ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`// ShardingConnection.java`
</li>
<li>
`/**`
</li>
<li>
` * 根据数据源名称获取相应的数据库连接.`
</li>
<li>
` * `
</li>
<li>
` * @param dataSourceName 数据源名称`
</li>
<li>
` * @param sqlType SQL语句类型`
</li>
<li>
` * @return 数据库连接`
</li>
<li>
` * @throws SQLException SQL异常`
</li>
<li>
` */`
</li>
<li>
`public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {`
</li>
<li>
` // 从连接缓存中获取连接`
</li>
<li>
` OptionalConnection connection = getCachedConnection(dataSourceName, sqlType);`
</li>
<li>
` if (connection.isPresent()) {`
</li>
<li>
` return connection.get();`
</li>
<li>
` }`
</li>
<li>
` Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));`
</li>
<li>
` //`
</li>
<li>
` DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);`
</li>
<li>
` Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);`
</li>
<li>
` String realDataSourceName;`
</li>
<li>
` if (dataSource instanceof MasterSlaveDataSource) {`
</li>
<li>
` dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);`
</li>
<li>
` realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);`
</li>
<li>
` } else {`
</li>
<li>
` realDataSourceName = dataSourceName;`
</li>
<li>
` }`
</li>
<li>
` Connection result = dataSource.getConnection();`
</li>
<li>
` MetricsContext.stop(metricsContext);`
</li>
<li>
` // 添加到连接缓存`
</li>
<li>
` connectionMap.put(realDataSourceName, result);`
</li>
<li>
` // 回放 Connection 方法`
</li>
<li>
` replayMethodsInvocation(result);`
</li>
<li>
` return result;`
</li>
<li>
`}`
</li>
<li>
``
</li>
<li>
`private OptionalConnection getCachedConnection(final String dataSourceName, final SQLType sqlType) {`
</li>
<li>
` String key = connectionMap.containsKey(dataSourceName) ? dataSourceName : MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);`
</li>
<li>
` return Optional.fromNullable(connectionMap.get(key));`
</li>
<li>
`}`
</li>
</ol>
插入实现的代码基本分享完了,因为是不断代码下钻的方式分析,可以反向向上在理理,会更加清晰。
<h1 style=" box-sizing: border-box;margin-top: 1.5rem;margin-bottom: 1rem;color: rgb(21, 153, 87);line-height: 1.35;font-size: 28px;white-space: normal; ; ; ; ; ; ; ; ; ; ; ; ; ">5. 查询流程</h1>
单纯从 `core` 包里的 JDBC 实现,查询流程 `#executeQuery()` 和 `#execute()` 基本一致,差别在于**执行**和**多结果集归并**。
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `@Override`- `public ResultSet executeQuery() throws SQLException {`- ` ResultSet result;`- ` try {`- ` // 路由`- ` CollectionPreparedStatementUnit preparedStatementUnits = route();`- ` // 执行`- ` ListResultSet resultSets = new PreparedStatementExecutor(`- ` getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();`- ` // 结果归并`- ` result = new ShardingResultSet(resultSets, new MergeEngine(`- ` getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());`- ` } finally {`- ` clearBatch();`- ` }`- ` // 设置结果集`- ` setCurrentResultSet(result);`- ` return result;`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
**SQL执行** 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— SQL 执行》
</li>
<li>
**结果归并** 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— 结果归并》
</li>
<li>
结果归并 `#merge()` 完后,创建分片结果集( ShardingResultSet )
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
</li>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`public final class ShardingResultSet extends AbstractResultSetAdapter {`
</li>
<li>
` /**`
</li>
<li>
` * 归并结果集`
</li>
<li>
` */`
</li>
<li>
` private final ResultSetMerger mergeResultSet;`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;">@Override`
调用
ShardingConnection#getConnection()
方法获得该 PreparedStatement 对应的真实数据库连接( Connection ):
单纯从
core
包里的 JDBC 实现,查询流程
#executeQuery()
和
#execute()
基本一致,差别在于执行和多结果集归并。
@Override
1.public ResultSet executeQuery() throws SQLException {
1.ResultSet result;
1.try {
1.// 路由
1.CollectionPreparedStatementUnit preparedStatementUnits = route();
1.// 执行
1.ListResultSet resultSets = new PreparedStatementExecutor(
1.getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
1.// 结果归并
1.result = new ShardingResultSet(resultSets, new MergeEngine(
1.getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
1.} finally {
1.clearBatch();
1.}
1.// 设置结果集
1.setCurrentResultSet(result);
1.return result;
1.}
- `public final class ShardingResultSet extends AbstractResultSetAdapter {`
- ` /**`
- ` * 归并结果集`
- ` */`
-
` private final ResultSetMerger mergeResultSet;`
`
@Override`
`public int getInt(final int columnIndex) throws SQLException {`
` Object result = mergeResultSet.getValue(columnIndex, int.class);`
` wasNull = null == result;`
` return (int) ResultSetUtil.convertValue(result, int.class);`
`}`
``
`@Override`
`public int getInt(final String columnLabel) throws SQLException {`
` Object result = mergeResultSet.getValue(columnLabel, int.class);`
` wasNull = null == result;`
` return (int) ResultSetUtil.convertValue(result, int.class);`
`}`
``
`// .... 隐藏其他类似 getXXXX() 方法`
6. 读写分离
建议前置阅读:《官方文档 —— 读写分离》
当你有读写分离的需求时,将 ShardingRule 配置对应的数据源 从 ShardingDataSource 替换成 MasterSlaveDataSource。我们来看看 MasterSlaveDataSource 的功能和实现。
支持一主多从的读写分离配置,可配合分库分表使用
// MasterSlaveDataSourceFactory.java</code></p></li>- `public final class MasterSlaveDataSourceFactory {`- ` /**`- ` * 创建读写分离数据源.`- ` * `- ` * @param name 读写分离数据源名称`- ` * @param masterDataSource 主节点数据源`- ` * @param slaveDataSource 从节点数据源`- ` * @param otherSlaveDataSources 其他从节点数据源`- ` * @return 读写分离数据源`- ` */`- ` public static DataSource createDataSource(final String name, final DataSource masterDataSource, final DataSource slaveDataSource, final DataSource... otherSlaveDataSources) {`- ` return new MasterSlaveDataSource(name, masterDataSource, Lists.asList(slaveDataSource, otherSlaveDataSources));`- ` }`- `}`- ``- `// MasterSlaveDataSource.java`- `public final class MasterSlaveDataSource extends AbstractDataSourceAdapter {`- ` /**`- ` * 数据源名`- ` */`- ` private final String name;`- ` /**`- ` * 主数据源`- ` */`- ` @Getter`- ` private final DataSource masterDataSource;`- ` /**`- ` * 从数据源集合`- ` */`- ` @Getter`- ` private final ListDataSource slaveDataSources;`- `}`</ol></pre>
**同一线程且同一数据库连接内,如有写入操作,以后的读操作均从主库读取,用于保证数据一致性。**
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;color: rgb(80, 97, 109);font-size: 10px;line-height: 12px; ; ; ; ; ; ">- `// ShardingConnection.java`- `public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {`- ` // .... 省略部分代码`- ` String realDataSourceName;`- ` if (dataSource instanceof MasterSlaveDataSource) { // 读写分离`- ` dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);`- ` realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);`- ` } else {`- ` realDataSourceName = dataSourceName;`- ` }`- ` Connection result = dataSource.getConnection();`- ` // .... 省略部分代码`- `}`- ``- `// MasterSlaveDataSource.java`- `/**`- `* 当前线程是否是 DML 操作标识`- `*/`- `private static final ThreadLocalBoolean DML_FLAG = new ThreadLocalBoolean() {`- ``- ` @Override`- ` protected Boolean initialValue() {`- ` return false;`- ` }`- `};`- `/**`- `* 从库负载均衡策略`- `*/`- `private final SlaveLoadBalanceStrategy slaveLoadBalanceStrategy = new RoundRobinSlaveLoadBalanceStrategy();`- ``- `/**`- `* 获取主或从节点的数据源.`- `*`- `* @param sqlType SQL类型`- `* @return 主或从节点的数据源`- `*/`- `public DataSource getDataSource(final SQLType sqlType) {`- ` if (isMasterRoute(sqlType)) {`- ` DML_FLAG.set(true);`- ` return masterDataSource;`- ` }`- ` return slaveLoadBalanceStrategy.getDataSource(name, slaveDataSources);`- `}`- ``- `private static boolean isMasterRoute(final SQLType sqlType) {`- ` return SQLType.DQL != sqlType || DML_FLAG.get() || HintManagerHolder.isMasterRouteOnly();`- `}`</pre>
<ul style="list-style-type: square;" class="list-paddingleft-2">
<li>
ShardingConnection 获取到的数据源是 MasterSlaveDataSource 时,调用 `MasterSlaveDataSource#getConnection()` 方法获取**真实**的数据源
</li>
<li>
通过 `#isMasterRoute()` 判断是否读取**主库**,以下三种情况会访问主库:
<p style="box-sizing: border-box;margin-top: 15px;margin-bottom: 15px;font-size: 16px;white-space: pre-line;line-height: 30px;">
</p>
<p style="box-sizing: border-box;margin-top: 15px;margin-bottom: 15px;font-size: 16px;white-space: pre-line;line-height: 30px;">
</p>
</li>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
<li>
非查询语句 (DQL)
</li>
<li>
**该**数据源在**当前**线程访问过主库:通过线程变量 `DML_FLAG` 实现
</li>
<li>
强制主库:程序里调用 `HintManager.getInstance().setMasterRouteOnly()` 实现
</li>
</ul>
<li>
访问从库时,会通过负载均衡策略( SlaveLoadBalanceStrategy ) 选择一个从库
<p style="box-sizing: border-box;margin-top: 15px;margin-bottom: 15px;font-size: 16px;white-space: pre-line;line-height: 30px;">
</p>
<pre class="prettyprint linenums prettyprinted" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; "></pre>
<p style="box-sizing: border-box;margin-top: 15px;margin-bottom: 15px;font-size: 16px;white-space: pre-line;line-height: 30px;">
</p>
</li>
<ul style="list-style-type: circle;" class="list-paddingleft-2">
<li>
MasterSlaveDataSource 默认使用 RoundRobinSlaveLoadBalanceStrategy,暂时不支持配置
</li>
<li>
RoundRobinSlaveLoadBalanceStrategy,轮询负载均衡策略,**每个从节点访问次数均衡,暂不支持数据源故障移除**
</li>
</ul>
</ul><ol class="linenums list-paddingleft-2" style="list-style-type: none;">
<li>
`// SlaveLoadBalanceStrategy.java`
</li>
<li>
`public interface SlaveLoadBalanceStrategy {`
<pre class="prettyprint linenums" style=" box-sizing: border-box;margin-top: 0px;margin-bottom: 0px;padding: 8px 0px 6px;background-color: rgb(241, 239, 238);border-radius: 0px;overflow-y: auto;font-size: 10px;line-height: 12px; ; ; ; ; ; ">`<code style="box-sizing: border-box;margin-left: -20px;display: flex;overflow: initial;line-height: 12px;word-wrap: normal;border-width: 0px;border-style: initial;border-color: initial;font-size: 10px;font-family: inherit !important;">/**`
// ShardingConnection.java
1.public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
1.// .... 省略部分代码
1.String realDataSourceName;
1.if (dataSource instanceof MasterSlaveDataSource) { // 读写分离
1.dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
1.realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
1.} else {
1.realDataSourceName = dataSourceName;
1.}
1.Connection result = dataSource.getConnection();
1.// .... 省略部分代码
1.}
1.1. `// MasterSlaveDataSource.java`1. `/**`1. `* 当前线程是否是 DML 操作标识`1. `*/`1. `private static final ThreadLocalBoolean DML_FLAG = new ThreadLocalBoolean() {`1.
1.@Override
1.protected Boolean initialValue() {
1.return false;
1.}
1.};
1./**
1.* 从库负载均衡策略
1.*/
1.private final SlaveLoadBalanceStrategy slaveLoadBalanceStrategy = new RoundRobinSlaveLoadBalanceStrategy();
1.1. `/**`1. `* 获取主或从节点的数据源.`1. `*`1. `* @param sqlType SQL类型`1. `* @return 主或从节点的数据源`1. `*/`1. `public DataSource getDataSource(final SQLType sqlType) {`1. ` if (isMasterRoute(sqlType)) {`1. ` DML_FLAG.set(true);`1. ` return masterDataSource;`1. ` }`1. ` return slaveLoadBalanceStrategy.getDataSource(name, slaveDataSources);`1. `}`1.
1.private static boolean isMasterRoute(final SQLType sqlType) {
1.return SQLType.DQL != sqlType || DML_FLAG.get() || HintManagerHolder.isMasterRouteOnly();
1.}
访问从库时,会通过负载均衡策略( SlaveLoadBalanceStrategy ) 选择一个从库
`/**`
` * 根据负载均衡策略获取从库数据源.`
` * `
` * @param name 读写分离数据源名称`
` * @param slaveDataSources 从库数据源列表`
` * @return 选中的从库数据源`
` */`
`DataSource getDataSource(String name, List<DataSource> slaveDataSources);`
`private static final ConcurrentHashMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>();`
``
`@Override`
`public DataSource getDataSource(final String name, final List<DataSource> slaveDataSources) {`
` AtomicInteger count = COUNT_MAP.containsKey(name) ? COUNT_MAP.get(name) : new AtomicInteger(0);`
` COUNT_MAP.putIfAbsent(name, count);`
` count.compareAndSet(slaveDataSources.size(), 0);`
` return slaveDataSources.get(count.getAndIncrement() % slaveDataSources.size());`
`}`
666. 彩蛋
没有彩蛋
没有彩
没有
没
下一篇,《分布式事务(一)之最大努力型》走起。老司机,赶紧上车。
道友,分享一个朋友圈可好?不然交个道姑那敏感词你。