這篇文章主要介紹“怎么連接redis服務(wù)器”,在日常操作中,相信很多人在怎么連接Redis服務(wù)器問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么連接Redis服務(wù)器”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、微信平臺小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了張家港免費(fèi)建站歡迎大家使用!
首先要滿足以下環(huán)境要求:
Docker: Docker已經(jīng)成為新應(yīng)用開發(fā)的必備工具,它使得應(yīng)用的構(gòu)建、分享與部署都極其簡單。
Docker Compose:我們使用Docker Compose來管理所有的服務(wù),以便輕松地進(jìn)行擴(kuò)展。
其他的需求都由Docker鏡像來滿足,我們不需要安裝其他任何東西了,只需要寫一個簡單的Docker Compos配置文檔 —— docker-compose.yml:
version: '3' services: ganache: image: trufflesuite/ganache-cli command: -m redis: image: redis:alpine ports: - "6379:6379" command: redis-server --appendonly yes volumes: - redis:/data zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "command:1:1,address.created:1:1,transaction:1:1,errors:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: redis:
只要運(yùn)行docker-compose up -d
就可以輕松地啟動服務(wù),這個命令會自動從Docker中心下載必要的鏡像,然后啟動。下面讓我們看看都有哪些服務(wù)。
如果沒有接入以太坊區(qū)塊鏈的節(jié)點,我們的錢包服務(wù)就不會有什么用。在開發(fā)期我們不需要下載整個以太坊區(qū)塊鏈,因此只要使用Ganache仿真器即可。使用Ganache的好處是開發(fā)效率高,因為出塊極快。不過在生產(chǎn)環(huán)境中就需要使用像Geth這樣的節(jié)點軟件來接入以太坊主網(wǎng)了。
我們需要數(shù)據(jù)庫來保存我們創(chuàng)建的地址,并且監(jiān)聽這些地址相關(guān)的交易。Redis是一個很出色的內(nèi)存鍵/值數(shù)據(jù)庫,非常適合我們的應(yīng)用場景。
在這個教程中,我們將使用Redis數(shù)據(jù)庫來保存我們?yōu)榈刂飞傻乃借€,但是在生產(chǎn)服務(wù)器上應(yīng)當(dāng)使用更安全的硬件設(shè)施來保護(hù)這些私鑰。
Apache Kafka在交易所架構(gòu)中扮演著核心的角色,它負(fù)責(zé)接收所有服務(wù)的消息并分發(fā)給訂閱這些消息的節(jié)點。
對于以太坊錢包服務(wù)而言,我們將使用以下這些主題進(jìn)行通信:
command
address.created
transaction
errors
Apache Kafka服務(wù)器可以獨立地進(jìn)行擴(kuò)展,為我們的服務(wù)提供了一個分布式的消息處理集群。
就我個人而言,是非常喜歡Elixir的,因為可以用它寫出極其可靠的分布式應(yīng)用,而且代碼也很容易理解和維護(hù)。但是考慮到以太坊的生態(tài),Elixir就沒有什么優(yōu)勢了。
對于以太坊開發(fā)而言最好的選擇還是使用Node.js/Javascript。因為有很多你可以直接就用的組件。因此我們的以太坊錢包服務(wù)最終決定使用Node.js開發(fā)。
首先運(yùn)行npm init
命令來創(chuàng)建默認(rèn)的node包:
~/exchange-hubwiz/eth-wallet$ npm init
然后我們可以添加一些錢包服務(wù)要用到的node依賴包,執(zhí)行如下命令:
~/exhcange-hubwiz/eth-wallet$ npm install --save web3 redis kafka-node ethereumjs-tx bluebird
前三個依賴包的作用容易理解:
web3:通過websocket連接到Ganache或其他以太坊節(jié)點
redis:連接到Redis服務(wù)器以便保存或提取數(shù)據(jù)
kafka-node:接入Zookeeper,獲取Kafka訪問端結(jié)點,生產(chǎn)或消費(fèi)Kafka消息
最后的兩個依賴包有助于讓我們的代碼更容易理解,并且可以利用async/await的異步編程模式的優(yōu)勢。
接下來,我們將利用這些node包連接Redis、以太坊和Kafka服務(wù)器。
連接Redis非常簡單,創(chuàng)建一個redis.js文件,然后編寫如下代碼:
// load configuration const config = require('../../config') const redis = require('redis') const bluebird = require('bluebird') // promisify the redis client using bluebird bluebird.promisifyAll(redis.RedisClient.prototype); bluebird.promisifyAll(redis.Multi.prototype); // create a new redis client and connect to the redis instance const client = redis.createClient(config.redis_port, config.redis_host); // if an error occurs, print it to the console client.on('error', function (err) { console.error("[REDIS] Error encountered", err) }) module.exports = client;
如果你認(rèn)為連接Redis很簡單了,那么使用web3連接以太坊節(jié)點簡單的會讓你吃驚。 創(chuàng)建一個ethereum.js,然后編寫如下代碼:
const config = require('../../config') const Web3 = require('web3') module.exports = new Web3(config.uri)
Kafka,需要從隊列中提取消息進(jìn)行消費(fèi),或者生產(chǎn)消息存入隊列。因此我們也需要繼續(xù)相關(guān)的配置。
創(chuàng)建一個新的文件query.js,然后編寫如下的代碼:
const kafka = require('kafka-node') const config = require('../../config') // configure how the consumers should connect to the broker/servers // each consumer creates his own connecto to a broker const default_options = { host: config.kafka_zookeeper_uri, autoCommit: true, fromOffset: 'earliest', } module.exports.consumer = (group_id = "ethereum_wallet_manager_consumer", topics = [], opts = {}) => { const options = Object.assign({ groupId: group_id }, default_options, opts) const consumer = new kafka.ConsumerGroup(options, topics) return consumer } // configure how the producer connects to the Apache Kafka broker // initiate the connection to the kafka client const client = new kafka.Client(config.kafka_zookeeper_uri, config.kafka_client_id) module.exports.client = client const producer = new kafka.Producer(client) // add a listener to the ready event async function on_ready(cb) { producer.on('ready', cb) } // define a method to send multiple messages to the given topic // this will return a promise that will resolve with the response from Kafka // messages are converted to JSON strings before they are added in the queue async function send(topic, messages) { return new Promise((resolve, reject) => { // convert objects to JSON strings messages = messages.map(JSON.stringify) // add the messages to the given topic producer.send([{ topic, messages}], function (err, data) { if (err) return reject(err) resolve(data) }) }) } // expose only these methods to the rest of the application and abstract away // the implementation of the producer to easily change it later module.exports.on_ready = on_ready module.exports.send = send
現(xiàn)在我們開始進(jìn)入以太坊錢包服務(wù)的核心特性開發(fā)階段。
交易所和支付網(wǎng)關(guān)需要為客戶生成新地址,以便用戶可以向服務(wù)充值,或者為產(chǎn)品付費(fèi)。生成一個沒有用過的以太坊地址是任何虛擬貨幣服務(wù)的基本需求,因此讓我們看看如何實現(xiàn)。
首先,創(chuàng)建一個commands.js,在其中我們訂閱隊列中的消息。主要包括以下幾個步驟:
連接到command主題,監(jiān)聽新的create_account命令
當(dāng)收到新的create_account命令時,創(chuàng)建新的密鑰對并存入密碼庫
生成account_created消息并發(fā)送到隊列的account_created主題
代碼如下:
const web3 = require("./ethereum") const redis = require('./redis') const queue = require('./queue') /** * Listen to new commands from the queue */ async function listen_to_commands() { const queue_consumer = queue.consumer('eth.wallet.manager.commands', ['command']) // process messages queue_consumer.on('message', async function (topic_message) { try { const message = JSON.parse(topic_message.value) // create the new address with some reply metadata to match the response to the request const resp = await create_address(message.meta) // if successful then post the response to the queue if (resp) { await queue_producer.send('address.created', [resp]) } } catch (err) { // in case something goes wrong catch the error and send it back in the 'errors' topic console.error(topic_message, err) queue_producer.send('errors', [{type: 'command', request: topic_message, error_code: err.code, error_message: err.message, error_stack: err.stack}]) } }) return queue_consumer } /** * Create a new ethereum address and return the address */ async function create_account(meta = {}) { // generate the address const account = await web3.eth.accounts.create() // disable checksum when storing the address const address = account.address.toLowerCase() // save the public address in Redis without any transactions received yet await redis.setAsync(`eth:address:public:${address}`, JSON.stringify({})) // Store the private key in a vault. // For demo purposes we use the same Redis instance, but this should be changed in production await redis.setAsync(`eth:address:private:${address}`, account.privateKey) return Object.assign({}, meta, {address: account.address}) } module.exports.listen_to_commands = listen_to_commands
我們的錢包還沒寫完,當(dāng)我們創(chuàng)建的地址收到用戶充值時應(yīng)當(dāng)?shù)玫酵ㄖ艑?。為此,以太坊的web3客戶端提供了newBlockHeaders訂閱機(jī)制。此外,如果我們的服務(wù)偶然宕機(jī),那么服務(wù)就會錯過在宕機(jī)期間生產(chǎn)的區(qū)塊,因此我們還需要檢查錢包是否已經(jīng)同步到了網(wǎng)絡(luò)的最新區(qū)塊。
創(chuàng)建 sync_blocks.js文件,編寫如下代碼:
const web3 = require('./ethereum') /** * Sync blocks and start listening for new blocks * @param {Number} current_block_number - The last block processed * @param {Object} opts - A list of options with callbacks for events */ async function sync_blocks(current_block_number, opts) { // first sync the wallet to the latest block let latest_block_number = await web3.eth.getBlockNumber() let synced_block_number = await sync_to_block(current_block_number, latest_block_number, opts) // subscribe to new blocks web3.eth.subscribe('newBlockHeaders', (error, result) => error && console.log(error)) .on("data", async function(blockHeader) { return await process_block(blockHeader.number, opts) }) return synced_block_number } // Load all data about the given block and call the callbacks if defined async function process_block(block_hash_or_id, opts) { // load block information by id or hash const block = await web3.eth.getBlock(block_hash_or_id, true) // call the onTransactions callback if defined opts.onTransactions ? opts.onTransactions(block.transactions) : null; // call the onBlock callback if defined opts.onBlock ? opts.onBlock(block_hash_or_id) : null; return block } // Traverse all unprocessed blocks between the current index and the lastest block number async function sync_to_block(index, latest, opts) { if (index >= latest) { return index; } await process_block(index + 1, opts) return await sync_to_block(index + 1, latest, opts) } module.exports = sync_blocks
在上面的代碼中,我們從錢包服務(wù)之前處理的最新區(qū)塊開始,一直同步到區(qū)塊鏈的當(dāng)前最新區(qū)塊。一旦我們同步到最新區(qū)塊,就開始訂閱新區(qū)塊事件。對于每一個區(qū)塊,我們都執(zhí)行如下的回調(diào)函數(shù)以處理區(qū)塊頭以及區(qū)塊中的交易列表:
onTransactions
onBlock
通常包含如下的處理步驟:
監(jiān)聽新區(qū)塊,獲取區(qū)塊中的全部交易
過濾掉與錢包地址無關(guān)的交易
將每個相關(guān)的交易都發(fā)往隊列
將地址上的資金歸集到安全的存儲
更新已處理的區(qū)塊編號
最終的代碼如下:
const web3 = require("web3") const redis = require('./redis') const queue = require('./queue') const sync_blocks = require('./sync_blocks') /** * Start syncing blocks and listen for new transactions on the blockchain */ async function start_syncing_blocks() { // start from the last block number processed or 0 (you can use the current block before deploying for the first time) let last_block_number = await redis.getAsync('eth:last-block') last_block_number = last_block_number || 0 // start syncing blocks sync_blocks(last_block_number, { // for every new block update the latest block value in redis onBlock: update_block_head, // for new transactions check each transaction and see if it's new onTransactions: async (transactions) => { for (let i in transactions) { await process_transaction(transactions[i]) } } }) } // save the lastest block on redis async function update_block_head(head) { return await redis.setAsync('eth:last-block', head) } // process a new transaction async function process_transaction(transaction) { const address = transaction.to.toLowerCase() const amount_in_ether = web3.utils.fromWei(transaction.value) // check if the receiving address has been generated by our wallet const watched_address = await redis.existsAsync(`eth:address:public:${address}`) if (watched_address !== 1) { return false } // then check if it's a new transaction that should be taken into account const transaction_exists = await redis.existsAsync(`eth:address:public:${address}`) if (transaction_exists === 1) { return false } // update the list of transactions for that address const data = await redis.getAsync(`eth:address:public:${address}`) let addr_data = JSON.parse(data) addr_data[transaction.hash] = { value: amount_in_ether } await redis.setAsync(`eth:address:public:${address}`, JSON.stringify(addr_data)) await redis.setAsync(`eth:transaction:${transaction.hash}`, transaction) // move funds to the cold wallet address // const cold_txid = await move_to_cold_storage(address, amount_in_ether) // send notification to the kafka server await queue_producer.send('transaction', [{ txid: transaction.hash, value: amount_in_ether, to: transaction.to, from: transaction.from, //cold_txid: cold_txid, }]) return true } module.exports = start_syncing_blocks
到此,關(guān)于“怎么連接Redis服務(wù)器”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
新聞名稱:怎么連接Redis服務(wù)器
當(dāng)前地址:http://weahome.cn/article/igcigh.html