真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么使用sharding-jdbc讀寫(xiě)分離

這篇文章主要介紹“怎么使用sharding-jdbc讀寫(xiě)分離”,在日常操作中,相信很多人在怎么使用sharding-jdbc讀寫(xiě)分離問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”怎么使用sharding-jdbc讀寫(xiě)分離”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

創(chuàng)新互聯(lián)公司專(zhuān)業(yè)為企業(yè)提供麟游網(wǎng)站建設(shè)、麟游做網(wǎng)站、麟游網(wǎng)站設(shè)計(jì)、麟游網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、麟游企業(yè)網(wǎng)站模板建站服務(wù),十余年麟游做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。

核心概念

  • 主庫(kù):添加、更新以及刪除數(shù)據(jù)操作

  • 從庫(kù):查詢(xún)數(shù)據(jù)操作所使用的數(shù)據(jù)庫(kù),可支持多從庫(kù)

  • 一主多從讀寫(xiě)分離,多主多從需用使用sharding

源碼分析

1.啟動(dòng)入口:

public class JavaConfigurationExample {
    
//    private static ShardingType shardingType = ShardingType.SHARDING_DATABASES;
//    private static ShardingType shardingType = ShardingType.SHARDING_TABLES;
//    private static ShardingType shardingType = ShardingType.SHARDING_DATABASES_AND_TABLES;
    private static ShardingType shardingType = ShardingType.MASTER_SLAVE;
//    private static ShardingType shardingType = ShardingType.SHARDING_MASTER_SLAVE;
//        private static ShardingType shardingType = ShardingType.SHARDING_VERTICAL;
    
    public static void main(final String[] args) throws SQLException {
        DataSource dataSource = DataSourceFactory.newInstance(shardingType);
        CommonService commonService = getCommonService(dataSource);
        commonService.initEnvironment();
        commonService.processSuccess();
        commonService.cleanEnvironment();
    }
    
    private static CommonService getCommonService(final DataSource dataSource) {
        return new CommonServiceImpl(new OrderRepositoryImpl(dataSource), new OrderItemRepositoryImpl(dataSource));
    }
}

2.以sharding-jdbc為例,配置主從讀寫(xiě)分離代碼如下:

@Override
public DataSource getDataSource() throws SQLException {
    //主從配置
    MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(/*主從命名*/"demo_ds_master_slave", /*主庫(kù)*/"demo_ds_master", /*從庫(kù)*/Arrays.asList("demo_ds_slave_0", "demo_ds_slave_1"));
    //打印sql
    Properties props = new Properties();
    props.put("sql.show", true);
    //創(chuàng)建MasterSlaveDataSource數(shù)據(jù)源
    return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, props);
}

private Map createDataSourceMap() {
    Map result = new HashMap<>();
    //主庫(kù)
    result.put("demo_ds_master", DataSourceUtil.createDataSource("demo_ds_master"));
    //兩從庫(kù)
    result.put("demo_ds_slave_0", DataSourceUtil.createDataSource("demo_ds_slave_0"));
    result.put("demo_ds_slave_1", DataSourceUtil.createDataSource("demo_ds_slave_1"));
    return result;
}

創(chuàng)建sharding主從數(shù)據(jù)源MasterSlaveDataSource

public MasterSlaveDataSource(final Map dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
        super(dataSourceMap);
        //緩存MySQL元數(shù)據(jù)
        cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);
        //主從規(guī)則配置
        this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
        //主從sql解析
        parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType());
        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
    }

3.執(zhí)行insert插入方法

@Override
    public Long insert(final Order order) throws SQLException {
        String sql = "INSERT INTO t_order (user_id, status) VALUES (?, ?)";
        //獲取MasterSlaveDataSource數(shù)據(jù)源連接,同時(shí)創(chuàng)建MasterSlavePreparedStatement
        //這里有兩個(gè)Statement分別含義
        //1.MasterSlaveStatement:執(zhí)行sql時(shí)候才路由
        //2.MasterSlavePreparedStatement:創(chuàng)建Statement時(shí)就路由

        //Statement.RETURN_GENERATED_KEYS 自動(dòng)生成主鍵并返回生成的主鍵
        try (Connection connection = dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
            preparedStatement.setInt(1, order.getUserId());
            preparedStatement.setString(2, order.getStatus());
            //MasterSlavePreparedStatement執(zhí)行sql
            preparedStatement.executeUpdate();
            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                if (resultSet.next()) {
                    order.setOrderId(resultSet.getLong(1));
                }
            }
        }
        return order.getOrderId();
    }

 獲取數(shù)據(jù)庫(kù)連接MasterSlaveConnection->AbstractConnectionAdapter#getConnection

    /**
     * Get database connection.
     *
     * @param dataSourceName data source name
     * @return database connection
     * @throws SQLException SQL exception
     */
    //MEMORY_STRICTLY:Proxy會(huì)保持一個(gè)數(shù)據(jù)庫(kù)中所有被路由到的表的連接,這種方式的好處是利用流式ResultSet來(lái)節(jié)省內(nèi)存
    //
    //CONNECTION_STRICTLY:代理在取出ResultSet中的所有數(shù)據(jù)后會(huì)釋放連接,同時(shí),內(nèi)存的消耗將會(huì)增加
    //
    public final Connection getConnection(final String dataSourceName) throws SQLException {
        return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
    }
    
    /**
     * Get database connections.
     *
     * @param connectionMode connection mode
     * @param dataSourceName data source name
     * @param connectionSize size of connection list to be get
     * @return database connections
     * @throws SQLException SQL exception
     */
    public final List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
        //獲取數(shù)據(jù)源
        DataSource dataSource = getDataSourceMap().get(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
        Collection connections;
        //并發(fā)從cache中獲取連接
        synchronized (cachedConnections) {
            connections = cachedConnections.get(dataSourceName);
        }
        List result;
        //如果cache中連接數(shù)大于指定連接數(shù)時(shí),返回指定連接數(shù)量
        if (connections.size() >= connectionSize) {
            result = new ArrayList<>(connections).subList(0, connectionSize);
        } else if (!connections.isEmpty()) {
            result = new ArrayList<>(connectionSize);
            result.addAll(connections);
            //創(chuàng)建缺少的指定連接數(shù)
            List newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
            result.addAll(newConnections);
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, newConnections);
            }
        } else {
            result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, result);
            }
        }
        return result;
    }
    
    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    private List createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
        //為1時(shí)不存在并發(fā)獲取連接情況,直接返回單個(gè)連接
        if (1 == connectionSize) {
            return Collections.singletonList(createConnection(dataSourceName, dataSource));
        }
        //TODO 不處理并發(fā)
        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
        //并發(fā)
        synchronized (dataSource) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
    }
    
    private List createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
        List result = new ArrayList<>(connectionSize);
        for (int i = 0; i < connectionSize; i++) {
            try {
                result.add(createConnection(dataSourceName, dataSource));
            } catch (final SQLException ex) {
                for (Connection each : result) {
                    each.close();
                }
                throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
            }
        }
        return result;
    }
    
    private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
        //判斷是否是sharding事物
        Connection result = isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
        replayMethodsInvocation(result);
        return result;
    }

預(yù)準(zhǔn)備路由并緩存Statement

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
        this.connection = connection;
        //創(chuàng)建router對(duì)象
        masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(), 
                connection.getMasterSlaveDataSource().getShardingProperties().getValue(ShardingPropertiesConstant.SQL_SHOW));
        //緩存路由后的Statement,useCache緩存解析后的sql Statement
        for (String each : masterSlaveRouter.route(sql, true)) {
            //獲取數(shù)據(jù)庫(kù)連接
            PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys);
            routedStatements.add(preparedStatement);
        }
    }

執(zhí)行MasterSlaveRouter#route方法獲取路由庫(kù)

public Collection route(final String sql, final boolean useCache) {
        //解析sql,這里不分析sql如何使用antlr4解析
        Collection result = route(parseEngine.parse(sql, useCache));
        //是否打印sql
        if (showSQL) {
            SQLLogger.logSQL(sql, result);
        }
        return result;
    }
    
    private Collection route(final SQLStatement sqlStatement) {
        //判斷是否master
        if (isMasterRoute(sqlStatement)) {
            //設(shè)置當(dāng)前線(xiàn)程是否允許訪(fǎng)問(wèn)主庫(kù)
            MasterVisitedManager.setMasterVisited();
            //返回主庫(kù)
            return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
        }
        //根據(jù)配置的算法獲取從庫(kù),兩種算法:
        //1、隨機(jī)
        //2、輪詢(xún)
        return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
    }

執(zhí)行MasterSlavePreparedStatement#executeUpdate

@Override
    public int executeUpdate() throws SQLException {
        int result = 0;
        //從本地緩存遍歷執(zhí)行
        for (PreparedStatement each : routedStatements) {
            result += each.executeUpdate();
        }
        return result;
    }

4.獲取從庫(kù)算法策略

  • 隨機(jī)算法

@Getter
@Setter
public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
    
    private Properties properties = new Properties();
    
    @Override
    public String getType() {
        return "RANDOM";
    }
    
    @Override
    public String getDataSource(final String name, final String masterDataSourceName, final List slaveDataSourceNames) {
        //從slave.size()中獲取一個(gè)隨機(jī)數(shù)
        return slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size()));
    }
}
  • 輪詢(xún)算法

@Getter
@Setter
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {

    //并發(fā)map
    private static final ConcurrentHashMap COUNTS = new ConcurrentHashMap<>();
    
    private Properties properties = new Properties();
    
    @Override
    public String getType() {
        return "ROUND_ROBIN";
    }
    
    @Override
    public String getDataSource(final String name, final String masterDataSourceName, final List slaveDataSourceNames) {
        //查看對(duì)應(yīng)名稱(chēng)的計(jì)數(shù)器,沒(méi)有則初始化一個(gè)
        AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
        COUNTS.putIfAbsent(name, count);
        // 采用cas輪詢(xún),如果計(jì)數(shù)器長(zhǎng)到slave.size(),那么歸零(防止計(jì)數(shù)器不斷增長(zhǎng)下去)
        count.compareAndSet(slaveDataSourceNames.size(), 0);
        //絕對(duì)值,計(jì)數(shù)器%slave.size()取模
        return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size());
    }
}
  • 默認(rèn)算法

SPI擴(kuò)展機(jī)制,load加載第一個(gè)算法作為默認(rèn)算法;ss默認(rèn)是隨機(jī)

到此,關(guān)于“怎么使用sharding-jdbc讀寫(xiě)分離”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!


網(wǎng)站名稱(chēng):怎么使用sharding-jdbc讀寫(xiě)分離
標(biāo)題網(wǎng)址:http://weahome.cn/article/poohdj.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部