這篇文章主要介紹“怎么使用sharding-jdbc讀寫(xiě)分離”,在日常操作中,相信很多人在怎么使用sharding-jdbc讀寫(xiě)分離問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”怎么使用sharding-jdbc讀寫(xiě)分離”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
創(chuàng)新互聯(lián)公司專(zhuān)業(yè)為企業(yè)提供麟游網(wǎng)站建設(shè)、麟游做網(wǎng)站、麟游網(wǎng)站設(shè)計(jì)、麟游網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、麟游企業(yè)網(wǎng)站模板建站服務(wù),十余年麟游做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
核心概念
主庫(kù):添加、更新以及刪除數(shù)據(jù)操作
從庫(kù):查詢(xún)數(shù)據(jù)操作所使用的數(shù)據(jù)庫(kù),可支持多從庫(kù)
一主多從讀寫(xiě)分離,多主多從需用使用sharding
源碼分析
1.啟動(dòng)入口:
public class JavaConfigurationExample { // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES; // private static ShardingType shardingType = ShardingType.SHARDING_TABLES; // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES_AND_TABLES; private static ShardingType shardingType = ShardingType.MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_VERTICAL; public static void main(final String[] args) throws SQLException { DataSource dataSource = DataSourceFactory.newInstance(shardingType); CommonService commonService = getCommonService(dataSource); commonService.initEnvironment(); commonService.processSuccess(); commonService.cleanEnvironment(); } private static CommonService getCommonService(final DataSource dataSource) { return new CommonServiceImpl(new OrderRepositoryImpl(dataSource), new OrderItemRepositoryImpl(dataSource)); } }
2.以sharding-jdbc為例,配置主從讀寫(xiě)分離代碼如下:
@Override public DataSource getDataSource() throws SQLException { //主從配置 MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(/*主從命名*/"demo_ds_master_slave", /*主庫(kù)*/"demo_ds_master", /*從庫(kù)*/Arrays.asList("demo_ds_slave_0", "demo_ds_slave_1")); //打印sql Properties props = new Properties(); props.put("sql.show", true); //創(chuàng)建MasterSlaveDataSource數(shù)據(jù)源 return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, props); } private MapcreateDataSourceMap() { Map result = new HashMap<>(); //主庫(kù) result.put("demo_ds_master", DataSourceUtil.createDataSource("demo_ds_master")); //兩從庫(kù) result.put("demo_ds_slave_0", DataSourceUtil.createDataSource("demo_ds_slave_0")); result.put("demo_ds_slave_1", DataSourceUtil.createDataSource("demo_ds_slave_1")); return result; }
創(chuàng)建sharding主從數(shù)據(jù)源MasterSlaveDataSource
public MasterSlaveDataSource(final MapdataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException { super(dataSourceMap); //緩存MySQL元數(shù)據(jù) cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //主從規(guī)則配置 this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig); //主從sql解析 parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType()); shardingProperties = new ShardingProperties(null == props ? new Properties() : props); }
3.執(zhí)行insert插入方法
@Override public Long insert(final Order order) throws SQLException { String sql = "INSERT INTO t_order (user_id, status) VALUES (?, ?)"; //獲取MasterSlaveDataSource數(shù)據(jù)源連接,同時(shí)創(chuàng)建MasterSlavePreparedStatement //這里有兩個(gè)Statement分別含義 //1.MasterSlaveStatement:執(zhí)行sql時(shí)候才路由 //2.MasterSlavePreparedStatement:創(chuàng)建Statement時(shí)就路由 //Statement.RETURN_GENERATED_KEYS 自動(dòng)生成主鍵并返回生成的主鍵 try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setInt(1, order.getUserId()); preparedStatement.setString(2, order.getStatus()); //MasterSlavePreparedStatement執(zhí)行sql preparedStatement.executeUpdate(); try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { if (resultSet.next()) { order.setOrderId(resultSet.getLong(1)); } } } return order.getOrderId(); }
獲取數(shù)據(jù)庫(kù)連接MasterSlaveConnection->AbstractConnectionAdapter#getConnection
/** * Get database connection. * * @param dataSourceName data source name * @return database connection * @throws SQLException SQL exception */ //MEMORY_STRICTLY:Proxy會(huì)保持一個(gè)數(shù)據(jù)庫(kù)中所有被路由到的表的連接,這種方式的好處是利用流式ResultSet來(lái)節(jié)省內(nèi)存 // //CONNECTION_STRICTLY:代理在取出ResultSet中的所有數(shù)據(jù)后會(huì)釋放連接,同時(shí),內(nèi)存的消耗將會(huì)增加 // public final Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0); } /** * Get database connections. * * @param connectionMode connection mode * @param dataSourceName data source name * @param connectionSize size of connection list to be get * @return database connections * @throws SQLException SQL exception */ public final ListgetConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { //獲取數(shù)據(jù)源 DataSource dataSource = getDataSourceMap().get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection connections; //并發(fā)從cache中獲取連接 synchronized (cachedConnections) { connections = cachedConnections.get(dataSourceName); } List result; //如果cache中連接數(shù)大于指定連接數(shù)時(shí),返回指定連接數(shù)量 if (connections.size() >= connectionSize) { result = new ArrayList<>(connections).subList(0, connectionSize); } else if (!connections.isEmpty()) { result = new ArrayList<>(connectionSize); result.addAll(connections); //創(chuàng)建缺少的指定連接數(shù) List newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size()); result.addAll(newConnections); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, newConnections); } } else { result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize)); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, result); } } return result; } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException { //為1時(shí)不存在并發(fā)獲取連接情況,直接返回單個(gè)連接 if (1 == connectionSize) { return Collections.singletonList(createConnection(dataSourceName, dataSource)); } //TODO 不處理并發(fā) if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) { return createConnections(dataSourceName, dataSource, connectionSize); } //并發(fā) synchronized (dataSource) { return createConnections(dataSourceName, dataSource, connectionSize); } } private List createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException { List result = new ArrayList<>(connectionSize); for (int i = 0; i < connectionSize; i++) { try { result.add(createConnection(dataSourceName, dataSource)); } catch (final SQLException ex) { for (Connection each : result) { each.close(); } throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex); } } return result; } private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException { //判斷是否是sharding事物 Connection result = isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection(); replayMethodsInvocation(result); return result; }
預(yù)準(zhǔn)備路由并緩存Statement
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException { this.connection = connection; //創(chuàng)建router對(duì)象 masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(), connection.getMasterSlaveDataSource().getShardingProperties().getValue(ShardingPropertiesConstant.SQL_SHOW)); //緩存路由后的Statement,useCache緩存解析后的sql Statement for (String each : masterSlaveRouter.route(sql, true)) { //獲取數(shù)據(jù)庫(kù)連接 PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys); routedStatements.add(preparedStatement); } }
執(zhí)行MasterSlaveRouter#route方法獲取路由庫(kù)
public Collectionroute(final String sql, final boolean useCache) { //解析sql,這里不分析sql如何使用antlr4解析 Collection result = route(parseEngine.parse(sql, useCache)); //是否打印sql if (showSQL) { SQLLogger.logSQL(sql, result); } return result; } private Collection route(final SQLStatement sqlStatement) { //判斷是否master if (isMasterRoute(sqlStatement)) { //設(shè)置當(dāng)前線(xiàn)程是否允許訪(fǎng)問(wèn)主庫(kù) MasterVisitedManager.setMasterVisited(); //返回主庫(kù) return Collections.singletonList(masterSlaveRule.getMasterDataSourceName()); } //根據(jù)配置的算法獲取從庫(kù),兩種算法: //1、隨機(jī) //2、輪詢(xún) return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()))); }
執(zhí)行MasterSlavePreparedStatement#executeUpdate
@Override public int executeUpdate() throws SQLException { int result = 0; //從本地緩存遍歷執(zhí)行 for (PreparedStatement each : routedStatements) { result += each.executeUpdate(); } return result; }
4.獲取從庫(kù)算法策略
隨機(jī)算法
@Getter @Setter public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { private Properties properties = new Properties(); @Override public String getType() { return "RANDOM"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final ListslaveDataSourceNames) { //從slave.size()中獲取一個(gè)隨機(jī)數(shù) return slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size())); } }
輪詢(xún)算法
@Getter @Setter public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { //并發(fā)map private static final ConcurrentHashMapCOUNTS = new ConcurrentHashMap<>(); private Properties properties = new Properties(); @Override public String getType() { return "ROUND_ROBIN"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final List slaveDataSourceNames) { //查看對(duì)應(yīng)名稱(chēng)的計(jì)數(shù)器,沒(méi)有則初始化一個(gè) AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0); COUNTS.putIfAbsent(name, count); // 采用cas輪詢(xún),如果計(jì)數(shù)器長(zhǎng)到slave.size(),那么歸零(防止計(jì)數(shù)器不斷增長(zhǎng)下去) count.compareAndSet(slaveDataSourceNames.size(), 0); //絕對(duì)值,計(jì)數(shù)器%slave.size()取模 return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size()); } }
默認(rèn)算法
SPI擴(kuò)展機(jī)制,load加載第一個(gè)算法作為默認(rèn)算法;ss默認(rèn)是隨機(jī)
到此,關(guān)于“怎么使用sharding-jdbc讀寫(xiě)分離”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!