sharding中怎么執(zhí)行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括肅寧網(wǎng)站建設(shè)、肅寧網(wǎng)站制作、肅寧網(wǎng)頁制作以及肅寧網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,肅寧網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到肅寧省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
內(nèi)存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費(fèi)的數(shù)據(jù)庫連接數(shù)量不做限制。如果實(shí)際執(zhí)行的SQL需要對某數(shù)據(jù)庫實(shí)例中的200張表做操作,則對每張表創(chuàng)建一個(gè)新的數(shù)據(jù)庫連接,并通過多線程的方式并發(fā)處理,以達(dá)成執(zhí)行效率最大化。并且在SQL滿足條件情況下,優(yōu)先選擇流式歸并,以防止出現(xiàn)內(nèi)存溢出或避免頻繁垃圾回收情況
連接限制模式:使用此模式的前提是,ShardingSphere嚴(yán)格控制對一次操作所耗費(fèi)的數(shù)據(jù)庫連接數(shù)量。如果實(shí)際執(zhí)行的SQL需要對某數(shù)據(jù)庫實(shí)例中的200張表做操作,那么只會(huì)創(chuàng)建唯一的數(shù)據(jù)庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數(shù)據(jù)庫,仍然采用多線程處理對不同庫的操作,但每個(gè)庫的每次操作仍然只創(chuàng)建一個(gè)唯一的數(shù)據(jù)庫連接。這樣即可以防止對一次請求對數(shù)據(jù)庫連接占用過多所帶來的問題。該模式始終選擇內(nèi)存歸并
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個(gè)簡單查詢語句,來分析ss大致如何來執(zhí)行sql,根據(jù)分片改寫后的sql,應(yīng)該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執(zhí)行
1.初始化PreparedStatementExecutor#init,封裝Statement執(zhí)行單元
public final class PreparedStatementExecutor extends AbstractStatementExecutor { @Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor( final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) { super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys; } /** * Initialize executor. * * @param routeResult route result * @throws SQLException SQL exception */ public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement()); //添加路由單元,即數(shù)據(jù)源對應(yīng)的sql單元 getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); //緩存statement、參數(shù) cacheStatements(); } private Collection> obtainExecuteGroups(final Collection routeUnits) throws SQLException { //執(zhí)行封裝Statement執(zhí)行單元 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); } @SuppressWarnings("MagicConstant") private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException { return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); } ... ... }
2.執(zhí)行封裝Statement執(zhí)行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor public final class SQLExecutePrepareTemplate { private final int maxConnectionsSizePerQuery; /** * Get execute unit groups. * * @param routeUnits route units * @param callback SQL execute prepare callback * @return statement execute unit groups * @throws SQLException SQL exception */ public Collection> getExecuteUnitGroups(final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(routeUnits, callback); } private Collection > getSynchronizedExecuteUnitGroups( final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { //數(shù)據(jù)源對應(yīng)sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?] Map > sqlUnitGroups = getSQLUnitGroups(routeUnits); Collection > result = new LinkedList<>(); for (Entry > entry : sqlUnitGroups.entrySet()) { //添加分片執(zhí)行組 result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } private Map > getSQLUnitGroups(final Collection routeUnits) { Map > result = new LinkedHashMap<>(routeUnits.size(), 1); for (RouteUnit each : routeUnits) { if (!result.containsKey(each.getDataSourceName())) { result.put(each.getDataSourceName(), new LinkedList ()); } result.get(each.getDataSourceName()).add(each.getSqlUnit()); } return result; } private List > getSQLExecuteGroups( final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List > result = new LinkedList<>(); //在maxConnectionSizePerQuery允許的范圍內(nèi),當(dāng)一個(gè)連接需要執(zhí)行的請求數(shù)量大于1時(shí),意味著當(dāng)前的數(shù)據(jù)庫連接無法持有相應(yīng)的數(shù)據(jù)結(jié)果集,則必須采用內(nèi)存歸并; //反之,當(dāng)一個(gè)連接需要執(zhí)行的請求數(shù)量等于1時(shí),意味著當(dāng)前的數(shù)據(jù)庫連接可以持有相應(yīng)的數(shù)據(jù)結(jié)果集,則可以采用流式歸并 //TODO 場景:在不分庫只分表的情況下,會(huì)存在一個(gè)數(shù)據(jù)源對應(yīng)多個(gè)sql單元的情況 //計(jì)算所需要的分區(qū)大小 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); //按照分區(qū)大小進(jìn)行分區(qū) //事例: //sqlUnits = [1, 2, 3, 4, 5] //desiredPartitionSize = 2 //則結(jié)果為:[[1, 2], [3,4], [5]] List > sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); //maxConnectionsSizePerQuery該參數(shù)表示一次查詢時(shí)每個(gè)數(shù)據(jù)庫所允許使用的最大連接數(shù) //根據(jù)maxConnectionsSizePerQuery來判斷使用連接模式 //CONNECTION_STRICTLY 連接限制模式 //MEMORY_STRICTLY 內(nèi)存限制模式 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; //獲取分區(qū)大小的連接 List
connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; //遍歷分區(qū),將分區(qū)好的sql單元放到指定連接執(zhí)行 for (List each : sqlUnitPartitions) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; } private ShardingExecuteGroup getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException { List result = new LinkedList<>(); //遍歷sql單元 for (SQLUnit each : sqlUnitGroup) { //回調(diào),創(chuàng)建statement執(zhí)行單元 result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode)); } //封裝成分片執(zhí)行組 return new ShardingExecuteGroup<>(result); } }
1.執(zhí)行查詢sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... /** * Execute query. * * @return result set list * @throws SQLException SQL exception */ public ListexecuteQuery() throws SQLException { //獲取當(dāng)前是否異常值 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); //創(chuàng)建回調(diào)實(shí)例 //執(zhí)行SQLExecuteCallback的execute方法 SQLExecuteCallback executeCallback = new SQLExecuteCallback (getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(statement, connectionMode); } }; return executeCallback(executeCallback); } ... ... protected final List executeCallback(final SQLExecuteCallback executeCallback) throws SQLException { List result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); //執(zhí)行完后刷新分片元數(shù)據(jù),比如創(chuàng)建表、修改表etc. refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement); return result; } ... ... }
2.通過線程池分組執(zhí)行,并回調(diào)callback
@RequiredArgsConstructor public abstract class SQLExecuteCallbackimplements ShardingGroupExecuteCallback { //數(shù)據(jù)庫類型 private final DatabaseType databaseType; //是否異常 private final boolean isExceptionThrown; @Override public final Collection execute(final Collection statementExecuteUnits, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { Collection result = new LinkedList<>(); //遍歷statement執(zhí)行單元 for (StatementExecuteUnit each : statementExecuteUnits) { //執(zhí)行添加返回結(jié)果T result.add(execute0(each, isTrunkThread, shardingExecuteDataMap)); } return result; } private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { //設(shè)置當(dāng)前線程是否異常 ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); //根據(jù)url獲取數(shù)據(jù)源元數(shù)據(jù) DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL()); //sql執(zhí)行鉤子 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook(); try { sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap); //執(zhí)行sql T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode()); sqlExecutionHook.finishSuccess(); return result; } catch (final SQLException ex) { sqlExecutionHook.finishFailure(ex); ExecutorExceptionHandler.handleException(ex); return null; } } protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException; }
3.執(zhí)行executeSQL,調(diào)用第三步的callback中的executeSQL,封裝ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException { PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery(); ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule(); //緩存resultSet getResultSets().add(resultSet); //判斷ConnectionMode //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內(nèi)存MemoryQueryResult return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) : new MemoryQueryResult(resultSet, shardingRule); } ... ... }
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。