真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Storm中Thrift如何使用

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Storm中Thrift如何使用,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)專注于曲水企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),商城網(wǎng)站定制開發(fā)。曲水網(wǎng)站建設(shè)公司,為曲水等地區(qū)提供建站服務(wù)。全流程按需制作網(wǎng)站,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

1 IDL

首先是storm.thrift, 作為IDL里面定義了用到的數(shù)據(jù)結(jié)構(gòu)和service 
然后backtype.storm.generated, 存放從IDL通過Thrift自動轉(zhuǎn)化成的Java代碼

比如對于nimbus service 
在IDL的定義為,

service Nimbus {

  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);

  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);

  void killTopology(1: string name) throws (1: NotAliveException e);

  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

  void activate(1: string name) throws (1: NotAliveException e);

  void deactivate(1: string name) throws (1: NotAliveException e);

  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);

  // need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs

string beginFileUpload();

  void uploadChunk(1: string location, 2: binary chunk);

  void finishFileUpload(1: string location);

string beginFileDownload(1: string file);

  //can stop downloading chunks when receive0-length byte array back

binary downloadChunk(1: string id);

  // returns json

string getNimbusConf();

  // stats functions

ClusterSummary getClusterInfo();

TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);

  //returns json

string getTopologyConf(1: string id) throws (1:NotAliveException e);

  StormTopologygetTopology(1: string id) throws (1: NotAliveException e);

StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);

}

而對應(yīng)在Nimbus.java的Java代碼如下,

public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public String beginFileUpload() throwsorg.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throwsorg.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

2 Client

1. 首先Get Client,

NimbusClient client =NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的邏輯, 
只是從配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {

       try {

           String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

           int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

           return new NimbusClient(conf, nimbusHost, nimbusPort);

       } catch (TTransportException ex) {

           throw new RuntimeException(ex);

       }

    }

NimbusClient 繼承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 關(guān)鍵是怎么進(jìn)行數(shù)據(jù)序列化和怎么將數(shù)據(jù)傳輸?shù)絩emote 
這里看出Thrift對Transport和Protocol的封裝 
對于Transport, 其實就是對Socket的封裝, 使用TSocket(host, port) 
然后對于protocol, 默認(rèn)使用TBinaryProtocol, 如果你不指定的話

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

       try {

           //locate loginconfiguration

           Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

           //construct atransport plugin

           ITransportPlugin  transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);

           //create a socketwith server

           if(host==null) {

                throw new IllegalArgumentException("host is not set");

           }

           if(port<=0) {

                throw new IllegalArgumentException("invalid port: "+port);

           }            

           TSocket socket = new TSocket(host, port);

           if(timeout!=null) {

                socket.setTimeout(timeout);

           }

           final TTransport underlyingTransport = socket;

           //establishclient-server transport via plugin

           _transport = transportPlugin.connect(underlyingTransport, host);

       } catch (IOException ex) {

           throw new RuntimeException(ex);

       }

       _protocol = null;

        if (_transport != null)

           _protocol = new TBinaryProtocol(_transport);

    }

2. 調(diào)用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);

可以看出上面的Nimbus的interface里面有這個方法的定義, 而且Thrift不僅僅自動產(chǎn)生java interface, 而且還提供整個RPC client端的實現(xiàn)

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

    {

     send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);

     recv_submitTopologyWithOpts();

    }

分兩步, 
首先send_submitTopologyWithOpts, 調(diào)用sendBase 
接著, recv_submitTopologyWithOpts, 調(diào)用receiveBase

  protected void sendBase(String methodName, TBase args) throws TException {

   oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));

   args.write(oprot_);

   oprot_.writeMessageEnd();

   oprot_.getTransport().flush();

  }

  protected void receiveBase(TBase result, String methodName)throws TException {

   TMessage msg = iprot_.readMessageBegin();

    if (msg.type == TMessageType.EXCEPTION) {

     TApplicationException x = TApplicationException.read(iprot_);

      iprot_.readMessageEnd();

      throw x;

    }

    if (msg.seqid != seqid_) {

      throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");

    }

   result.read(iprot_);

   iprot_.readMessageEnd();

  }

可以看出Thrift對protocol的封裝, 不需要自己處理序列化, 調(diào)用protocol的接口搞定 

3 Server

Thrift強大的地方是, 實現(xiàn)了整個協(xié)議棧而不光只是IDL的轉(zhuǎn)化, 對于server也給出多種實現(xiàn) 
下面看看在nimbus server端, 是用clojure來寫的 
可見其中使用Thrift封裝的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常簡單 
其中processor會使用service-handle來處理recv到的數(shù)據(jù), 所以作為使用者只需要在service-handle中實現(xiàn)Nimbus$Iface, 其他和server相關(guān)的, Thrift都已經(jīng)幫你封裝好了, 這里使用的IDL也在backtype.storm.generated, 因為clojure基于JVM所以IDL只需要轉(zhuǎn)化成Java即可.

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

  (let[service-handler (service-handler conf nimbus)

       options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

                    (THsHaServer$Args.)

                    (.workerThreads 64)

                    (.protocolFactory (TBinaryProtocol$Factory.))

                    (.processor(Nimbus$Processor. service-handler))

                    )

    (.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))

   (log-message "StartingNimbus server...")

   (.serve server)))

上述就是小編為大家分享的Storm中Thrift如何使用了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)站名稱:Storm中Thrift如何使用
文章分享:http://weahome.cn/article/pdsjdd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部