今天就跟大家聊聊有關(guān)MongoDB中實(shí)現(xiàn)原理是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、小程序設(shè)計(jì)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了雞西免費(fèi)建站歡迎大家使用!
主流程
MyCAT Server 接收 MySQL Client 基于 MySQL協(xié)議 的請(qǐng)求,翻譯 SQL 成 MongoDB操作 發(fā)送給 MongoDB Server。
MyCAT Server 接收 MongoDB Server 返回的 MongoDB數(shù)據(jù),翻譯成 MySQL數(shù)據(jù)結(jié)果 返回給 MySQL Client。
這樣一看,MyCAT 連接 MongoDB 是不是少神奇一點(diǎn)列。
Java數(shù)據(jù)庫連接,(Java Database Connectivity,簡稱JDBC)是Java語言中用來規(guī)范客戶端程序如何來訪問數(shù)據(jù)庫的應(yīng)用程序接口,提供了諸如查詢和更新數(shù)據(jù)庫中數(shù)據(jù)的方法。JDBC也是Sun Microsystems的商標(biāo)。JDBC是面向關(guān)系型數(shù)據(jù)庫的。
MyCAT 使用 JDBC 規(guī)范,抽象了對(duì) MongoDB 的訪問。通過這樣的方式,MyCAT 也抽象了 SequoiaDB 的訪問。可能這樣說法有些抽象,看個(gè)類圖壓壓驚。
是不是熟悉的味道。不得不說 JDBC 規(guī)范的精妙。
3. 查詢操作
SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;
看順序圖已經(jīng)很方便的理解整體邏輯,我就不多廢話啦。我們來看幾個(gè)核心的代碼邏輯。
1)、查詢 MongoDB
// MongoSQLParser.java public MongoData query() throws MongoSQLException { if (!(statement instanceof SQLSelectStatement)) { //return null; throw new IllegalArgumentException("not a query sql statement"); } MongoData mongo = new MongoData(); DBCursor c = null; SQLSelectStatement selectStmt = (SQLSelectStatement) statement; SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); int icount = 0; if (sqlSelectQuery instanceof MySqlSelectQueryBlock) { MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery(); BasicDBObject fields = new BasicDBObject(); // 顯示(返回)的字段 for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) { //System.out.println(item.toString()); if (!(item.getExpr() instanceof SQLAllColumnExpr)) { if (item.getExpr() instanceof SQLAggregateExpr) { SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr(); if (expr.getMethodName().equals("COUNT")) { // TODO 待讀:count(*) icount = 1; mongo.setField(getExprFieldName(expr), Types.BIGINT); } fields.put(getExprFieldName(expr), 1); } else { fields.put(getFieldName(item), 1); } } } // 表名 SQLTableSource table = mysqlSelectQuery.getFrom(); DBCollection coll = this._db.getCollection(table.toString()); mongo.setTable(table.toString()); // WHERE SQLExpr expr = mysqlSelectQuery.getWhere(); DBObject query = parserWhere(expr); // GROUP BY SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy(); BasicDBObject gbkey = new BasicDBObject(); if (groupby != null) { for (SQLExpr gbexpr : groupby.getItems()) { if (gbexpr instanceof SQLIdentifierExpr) { String name = ((SQLIdentifierExpr) gbexpr).getName(); gbkey.put(name, Integer.valueOf(1)); } } icount = 2; } // SKIP / LIMIT int limitoff = 0; int limitnum = 0; if (mysqlSelectQuery.getLimit() != null) { limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset()); limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount()); } if (icount == 1) { // COUNT(*) mongo.setCount(coll.count(query)); } else if (icount == 2) { // MapReduce BasicDBObject initial = new BasicDBObject(); initial.put("num", 0); String reduce = "function (obj, prev) { " + " prev.num++}"; mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce)); } else { if ((limitoff > 0) || (limitnum > 0)) { c = coll.find(query, fields).skip(limitoff).limit(limitnum); } else { c = coll.find(query, fields); } // order by SQLOrderBy orderby = mysqlSelectQuery.getOrderBy(); if (orderby != null) { BasicDBObject order = new BasicDBObject(); for (int i = 0; i < orderby.getItems().size(); i++) { SQLSelectOrderByItem orderitem = orderby.getItems().get(i); order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType())); } c.sort(order); // System.out.println(order); } } mongo.setCursor(c); } return mongo; }
2)、查詢條件
// MongoSQLParser.java private void parserWhere(SQLExpr aexpr, BasicDBObject o) { if (aexpr instanceof SQLBinaryOpExpr) { SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr; SQLExpr exprL = expr.getLeft(); if (!(exprL instanceof SQLBinaryOpExpr)) { if (expr.getOperator().getName().equals("=")) { o.put(exprL.toString(), getExpValue(expr.getRight())); } else { String op = ""; if (expr.getOperator().getName().equals("<")) { op = "$lt"; } else if (expr.getOperator().getName().equals("<=")) { op = "$lte"; } else if (expr.getOperator().getName().equals(">")) { op = "$gt"; } else if (expr.getOperator().getName().equals(">=")) { op = "$gte"; } else if (expr.getOperator().getName().equals("!=")) { op = "$ne"; } else if (expr.getOperator().getName().equals("<>")) { op = "$ne"; } parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight())); } } else { if (expr.getOperator().getName().equals("AND")) { parserWhere(exprL, o); parserWhere(expr.getRight(), o); } else if (expr.getOperator().getName().equals("OR")) { orWhere(exprL, expr.getRight(), o); } else { throw new RuntimeException("Can't identify the operation of of where"); } } } } private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) { BasicDBObject xo = new BasicDBObject(); BasicDBObject yo = new BasicDBObject(); parserWhere(exprL, xo); parserWhere(exprR, yo); ob.put("$or", new Object[]{xo, yo}); }
3)、解析 MongoDB 數(shù)據(jù)
// MongoResultSet.java public MongoResultSet(MongoData mongo, String schema) throws SQLException { this._cursor = mongo.getCursor(); this._schema = schema; this._table = mongo.getTable(); this.isSum = mongo.getCount() > 0; this._sum = mongo.getCount(); this.isGroupBy = mongo.getType(); if (this.isGroupBy) { dblist = mongo.getGrouyBys(); this.isSum = true; } if (this._cursor != null) { select = _cursor.getKeysWanted().keySet().toArray(new String[0]); // 解析 fields if (this._cursor.hasNext()) { _cur = _cursor.next(); if (_cur != null) { if (select.length == 0) { SetFields(_cur.keySet()); } _row = 1; } } // 設(shè)置 fields 類型 if (select.length == 0) { select = new String[]{"_id"}; SetFieldType(true); } else { SetFieldType(false); } } else { SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"}; SetFieldType(mongo.getFields()); } }
當(dāng)使用 SELECT * 查詢字段時(shí),fields 使用***條數(shù)據(jù)返回的 fields。即使,后面的數(shù)據(jù)有其他 fields,也不返回。
4)、返回?cái)?shù)據(jù)給 MySQL Client
// JDBCConnection.java private void ouputResultSet(ServerConnection sc, String sql) throws SQLException { ResultSet rs = null; Statement stmt = null; try { stmt = con.createStatement(); rs = stmt.executeQuery(sql); // header ListfieldPks = new LinkedList<>(); ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark); int colunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; byteBuf = headerPkg.write(byteBuf, sc, true); byteBuf.flip(); byte[] header = new byte[byteBuf.limit()]; byteBuf.get(header); byteBuf.clear(); List fields = new ArrayList (fieldPks.size()); for (FieldPacket curField : fieldPks) { curField.packetId = ++packetId; byteBuf = curField.write(byteBuf, sc, false); byteBuf.flip(); byte[] field = new byte[byteBuf.limit()]; byteBuf.get(field); byteBuf.clear(); fields.add(field); } // header eof EOFPacket eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); byte[] eof = new byte[byteBuf.limit()]; byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header, fields, eof, this); // row while (rs.next()) { RowDataPacket curRow = new RowDataPacket(colunmCount); for (int i = 0; i < colunmCount; i++) { int j = i + 1; if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte // ensure that do not use scientific notation format BigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset())); } else { curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; byteBuf = curRow.write(byteBuf, sc, false); byteBuf.flip(); byte[] row = new byte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row, this); } fieldPks.clear(); // row eof eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); eof = new byte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof, this); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { } } } } // MongoResultSet.java @Override public String getString(String columnLabel) throws SQLException { Object x = getObject(columnLabel); if (x == null) { return null; } return x.toString(); }
當(dāng)返回字段值是 Object 時(shí),返回該對(duì)象.toString()。例如:
mysql> select * from user order by _id asc; +--------------------------+------+-------------------------------+ | _id | name | profile | +--------------------------+------+-------------------------------+ | 1 | 123 | { "age" : 1 , "height" : 100} |
4. 插入操作
// MongoSQLParser.java public int executeUpdate() throws MongoSQLException { if (statement instanceof SQLInsertStatement) { return InsertData((SQLInsertStatement) statement); } if (statement instanceof SQLUpdateStatement) { return UpData((SQLUpdateStatement) statement); } if (statement instanceof SQLDropTableStatement) { return dropTable((SQLDropTableStatement) statement); } if (statement instanceof SQLDeleteStatement) { return DeleteDate((SQLDeleteStatement) statement); } if (statement instanceof SQLCreateTableStatement) { return 1; } return 1; } private int InsertData(SQLInsertStatement state) { if (state.getValues().getValues().size() == 0) { throw new RuntimeException("number of columns error"); } if (state.getValues().getValues().size() != state.getColumns().size()) { throw new RuntimeException("number of values and columns have to match"); } SQLTableSource table = state.getTableSource(); BasicDBObject o = new BasicDBObject(); int i = 0; for (SQLExpr col : state.getColumns()) { o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i))); i++; } DBCollection coll = this._db.getCollection(table.toString()); coll.insert(o); return 1; }
5. 彩蛋
1)、支持多 MongoDB ,并使用 MyCAT 進(jìn)行分片。
MyCAT 配置:multi_mongodb
2)、支持 MongoDB + MySQL 作為同一個(gè) MyCAT Table 的數(shù)據(jù)節(jié)點(diǎn)。查詢時(shí),可以合并數(shù)據(jù)結(jié)果。
查詢時(shí),返回 MySQL 數(shù)據(jù)記錄字段要比 MongoDB 數(shù)據(jù)記錄字段全,否則,合并結(jié)果時(shí)會(huì)報(bào)錯(cuò)。
MyCAT 配置:single_mongodb_mysql
3)、MongoDB 作為數(shù)據(jù)節(jié)點(diǎn)時(shí),可以使用 MyCAT 提供的數(shù)據(jù)庫主鍵字段功能。
MyCAT 配置:single_mongodb
看完上述內(nèi)容,你們對(duì)MongoDB中實(shí)現(xiàn)原理是什么有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。