真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

sharding中怎么執(zhí)行jdbc

sharding中怎么執(zhí)行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括肅寧網(wǎng)站建設(shè)、肅寧網(wǎng)站制作、肅寧網(wǎng)頁制作以及肅寧網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,肅寧網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到肅寧省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

  • 內(nèi)存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費(fèi)的數(shù)據(jù)庫連接數(shù)量不做限制。如果實(shí)際執(zhí)行的SQL需要對某數(shù)據(jù)庫實(shí)例中的200張表做操作,則對每張表創(chuàng)建一個(gè)新的數(shù)據(jù)庫連接,并通過多線程的方式并發(fā)處理,以達(dá)成執(zhí)行效率最大化。并且在SQL滿足條件情況下,優(yōu)先選擇流式歸并,以防止出現(xiàn)內(nèi)存溢出或避免頻繁垃圾回收情況

  • 連接限制模式:使用此模式的前提是,ShardingSphere嚴(yán)格控制對一次操作所耗費(fèi)的數(shù)據(jù)庫連接數(shù)量。如果實(shí)際執(zhí)行的SQL需要對某數(shù)據(jù)庫實(shí)例中的200張表做操作,那么只會(huì)創(chuàng)建唯一的數(shù)據(jù)庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數(shù)據(jù)庫,仍然采用多線程處理對不同庫的操作,但每個(gè)庫的每次操作仍然只創(chuàng)建一個(gè)唯一的數(shù)據(jù)庫連接。這樣即可以防止對一次請求對數(shù)據(jù)庫連接占用過多所帶來的問題。該模式始終選擇內(nèi)存歸并

case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個(gè)簡單查詢語句,來分析ss大致如何來執(zhí)行sql,根據(jù)分片改寫后的sql,應(yīng)該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執(zhí)行

準(zhǔn)備階段

1.初始化PreparedStatementExecutor#init,封裝Statement執(zhí)行單元

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    @Getter
    private final boolean returnGeneratedKeys;

    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }

    /**
     * Initialize executor.
     *
     * @param routeResult route result
     * @throws SQLException SQL exception
     */
    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement());
        //添加路由單元,即數(shù)據(jù)源對應(yīng)的sql單元
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        //緩存statement、參數(shù)
        cacheStatements();
    }

    private Collection> obtainExecuteGroups(final Collection routeUnits) throws SQLException {
        //執(zhí)行封裝Statement執(zhí)行單元
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {

            @Override
            public List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }

   ... ...   
}

2.執(zhí)行封裝Statement執(zhí)行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups

@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {

    private final int maxConnectionsSizePerQuery;

    /**
     * Get execute unit groups.
     *
     * @param routeUnits route units
     * @param callback SQL execute prepare callback
     * @return statement execute unit groups
     * @throws SQLException SQL exception
     */
    public Collection> getExecuteUnitGroups(final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        return getSynchronizedExecuteUnitGroups(routeUnits, callback);
    }

    private Collection> getSynchronizedExecuteUnitGroups(
            final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //數(shù)據(jù)源對應(yīng)sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
        Map> sqlUnitGroups = getSQLUnitGroups(routeUnits);
        Collection> result = new LinkedList<>();

        for (Entry> entry : sqlUnitGroups.entrySet()) {
            //添加分片執(zhí)行組
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }

    private Map> getSQLUnitGroups(final Collection routeUnits) {
        Map> result = new LinkedHashMap<>(routeUnits.size(), 1);
        for (RouteUnit each : routeUnits) {
            if (!result.containsKey(each.getDataSourceName())) {
                result.put(each.getDataSourceName(), new LinkedList());
            }
            result.get(each.getDataSourceName()).add(each.getSqlUnit());
        }
        return result;
    }

    private List> getSQLExecuteGroups(
            final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List> result = new LinkedList<>();
        //在maxConnectionSizePerQuery允許的范圍內(nèi),當(dāng)一個(gè)連接需要執(zhí)行的請求數(shù)量大于1時(shí),意味著當(dāng)前的數(shù)據(jù)庫連接無法持有相應(yīng)的數(shù)據(jù)結(jié)果集,則必須采用內(nèi)存歸并;
        //反之,當(dāng)一個(gè)連接需要執(zhí)行的請求數(shù)量等于1時(shí),意味著當(dāng)前的數(shù)據(jù)庫連接可以持有相應(yīng)的數(shù)據(jù)結(jié)果集,則可以采用流式歸并

        //TODO 場景:在不分庫只分表的情況下,會(huì)存在一個(gè)數(shù)據(jù)源對應(yīng)多個(gè)sql單元的情況

        //計(jì)算所需要的分區(qū)大小
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //按照分區(qū)大小進(jìn)行分區(qū)
        //事例:
        //sqlUnits = [1, 2, 3, 4, 5]
        //desiredPartitionSize = 2
        //則結(jié)果為:[[1, 2], [3,4], [5]]
        List> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //maxConnectionsSizePerQuery該參數(shù)表示一次查詢時(shí)每個(gè)數(shù)據(jù)庫所允許使用的最大連接數(shù)
        //根據(jù)maxConnectionsSizePerQuery來判斷使用連接模式
        //CONNECTION_STRICTLY 連接限制模式
        //MEMORY_STRICTLY 內(nèi)存限制模式
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //獲取分區(qū)大小的連接
        List connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        //遍歷分區(qū),將分區(qū)好的sql單元放到指定連接執(zhí)行
        for (List each : sqlUnitPartitions) {
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

    private ShardingExecuteGroup getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, 
                                                                          final String dataSourceName, final List sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
        List result = new LinkedList<>();
        //遍歷sql單元
        for (SQLUnit each : sqlUnitGroup) {
            //回調(diào),創(chuàng)建statement執(zhí)行單元
            result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
        }
        //封裝成分片執(zhí)行組
        return new ShardingExecuteGroup<>(result);
    }
}
執(zhí)行階段

1.執(zhí)行查詢sql

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List executeQuery() throws SQLException {
        //獲取當(dāng)前是否異常值
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        //創(chuàng)建回調(diào)實(shí)例
        //執(zhí)行SQLExecuteCallback的execute方法
        SQLExecuteCallback executeCallback = new SQLExecuteCallback(getDatabaseType(), isExceptionThrown) {

            @Override
            protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);
    }

    ... ...

   protected final  List executeCallback(final SQLExecuteCallback executeCallback) throws SQLException {
        List result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        //執(zhí)行完后刷新分片元數(shù)據(jù),比如創(chuàng)建表、修改表etc.
        refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
        return result;
    }

    ... ...
}

2.通過線程池分組執(zhí)行,并回調(diào)callback

@RequiredArgsConstructor
public abstract class SQLExecuteCallback implements ShardingGroupExecuteCallback {
    //數(shù)據(jù)庫類型
    private final DatabaseType databaseType;
    //是否異常
    private final boolean isExceptionThrown;

    @Override
    public final Collection execute(final Collection statementExecuteUnits, final boolean isTrunkThread,
                                       final Map shardingExecuteDataMap) throws SQLException {
        Collection result = new LinkedList<>();
        //遍歷statement執(zhí)行單元
        for (StatementExecuteUnit each : statementExecuteUnits) {
            //執(zhí)行添加返回結(jié)果T
            result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
        }
        return result;
    }

    private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {
        //設(shè)置當(dāng)前線程是否異常
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        //根據(jù)url獲取數(shù)據(jù)源元數(shù)據(jù)
        DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
        //sql執(zhí)行鉤子
        SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
        try {
            sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
            //執(zhí)行sql
            T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
            sqlExecutionHook.finishSuccess();
            return result;
        } catch (final SQLException ex) {
            sqlExecutionHook.finishFailure(ex);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
    }

    protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}

3.執(zhí)行executeSQL,調(diào)用第三步的callback中的executeSQL,封裝ResultSet

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
        //緩存resultSet
        getResultSets().add(resultSet);
        //判斷ConnectionMode
        //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內(nèi)存MemoryQueryResult
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) 
                : new MemoryQueryResult(resultSet, shardingRule);
    }

    ... ...
}

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。


本文題目:sharding中怎么執(zhí)行jdbc
文章來源:http://weahome.cn/article/pcjcie.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部