真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flink使用問題有哪些-創(chuàng)新互聯(lián)

這篇文章主要介紹了flink使用問題有哪些,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

創(chuàng)新互聯(lián)公司-云計算及IDC服務(wù)提供商,涵蓋公有云、IDC機房租用、重慶服務(wù)器托管、等保安全、私有云建設(shè)等企業(yè)級互聯(lián)網(wǎng)基礎(chǔ)服務(wù),歡迎聯(lián)系:13518219792
  1. 注冊表時,請勿使用result

 tableEnv.registerTable("result_agg", talbe);

如上,如果你寫為

 tableEnv.registerTable("result", talbe);

那么會報以下錯誤

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "from result" at line 1, column 13.
Was expecting one of:
     
    "ORDER" ...
    "LIMIT" ...
    "OFFSET" ...
    "FETCH" ...
    "FROM"  ...
    "FROM"  ...
    "FROM"  ...
    "FROM"  ...
    "FROM"  ...
    "FROM" "LATERAL" ...
    "FROM" "(" ...
    "FROM" "UNNEST" ...
    "FROM" "TABLE" ...
    "," ...
    "AS" ...
     ...
     ...
     ...
     ...
     ...
  1. 如果mysql 類型是tinyint 要轉(zhuǎn)以下,否則flink會報錯

     Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String
    at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)

轉(zhuǎn)的方式很簡單比如

select project_fid, cast(project_info_type as CHAR) as type from table
  1. join的時候如果有一側(cè)為map類型的數(shù)據(jù)(比如你使用了collect方法,類似于mysql的group_concat),回報空指針,類似于

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while applying rule FlinkLogicalJoinConverter, args [rel#56:LogicalJoin.NONE(left=rel#52:Subset#0.NONE,right=rel#55:Subset#2.NONE,condition==($0, $2),joinType=inner)]

這個bug目前還沒有修復,連接如下 https://issues.apache.org/jira/browse/FLINK-11433

只能想辦法繞過去,把map類型的數(shù)據(jù),變成字符串,方法是自定義函數(shù)

public class MapToString extends ScalarFunction {

    public String eval(Map map) {
        if(map==null || map.size()==0) {
            return "";
        }
        StringBuffer sb=new StringBuffer();
        for(Map.Entry entity : map.entrySet()) {
            sb.append(entity.getKey()+",");
        }
        String result=sb.toString();
        return result.substring(0, result.length()-1);
      }
}

調(diào)用的時候使用

select id, mapToString(collect(type)) as type from table  group by id

當然你還需要注冊一下

tableEnv.registerFunction("mapToString", new MapToString());
  1. 類型轉(zhuǎn)化錯誤

最近總遇到類型轉(zhuǎn)化錯誤的提示,目前發(fā)現(xiàn)了兩個, 做個記錄

a 如果是tiny(1) 會自動轉(zhuǎn)為 boolean, 除了上面的解決方案,更優(yōu)雅的是修改mysql 的連接,加上參數(shù) tinyInt1isBit=false, 注意大小寫

b 有時候mysql數(shù)據(jù)庫id字段明明是int,但flink卻認定為long。 貌似以前mybatis也有此問題(https://blog.csdn.net/ahwsk/article/details/81975117)。
后來我又認真的看了一下表設(shè)計(別人的表)發(fā)現(xiàn) 勾選了“無符號” 這個選項,當我去掉這個選項,再次運行,居然不報錯了,看來是無符號,讓flink轉(zhuǎn)化錯誤的,無符號比有符號在范圍上多了一位, 多出的這一位,有可能已經(jīng)超過了java中int 的范圍(java中int 都是有符號的),所以自動轉(zhuǎn)為long型了。

  1. Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory'

雖然fatjar已經(jīng)有對應(yīng)的類了,但是依然報錯,最后的解決辦法是在flink的lib目錄中再次加入相關(guān)的類,問題解決。

  1. cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

這個錯誤需要在flink-conf.yaml  加入  classloader.resolve-order: parent-first

  1. Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#66653408]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".

出現(xiàn)此錯誤,在flink的配置中增加

akka.ask.timeout: 120s
web.timeout: 120000

8.Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms

發(fā)生此錯誤是提交任務(wù)時出錯,使用 yarn logs -applicationId application_1565600987111  查看錯誤,找到原因,我遇到的原因是: akka.watch.heartbeat.pause 值小于 akka.watch.heartbeat.interval。修改后錯誤消失
或者kill 掉 CliFrontend 的進程

  1. Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms.
    java.io.IOException: The client is stopped
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1519)
    at org.apache.hadoop.ipc.Client.call(Client.java:1381)
    at org.apache.hadoop.ipc.Client.call(Client.java:1345)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)

出現(xiàn)此異常還是內(nèi)存的問題,檢查一下內(nèi)存是否足夠,必須是free的不能是available, 如果發(fā)現(xiàn)后者很高, 請執(zhí)行 以下兩條命令釋放內(nèi)存

sync
echo 3 > /proc/sys/vm/drop_caches

10
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 8081
at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161)
... 9 more

此錯誤是說端口被占用。查看源代碼:

Iterator portsIterator;
            try {
                portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
            } catch (IllegalConfigurationException e) {
                throw e;
            } catch (Exception e) {
                throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);
            }

對應(yīng)的配置是 flink-conf.yaml中的rest.bind-port。
rest.bind-port不設(shè)置,則Rest Server默認綁定到rest.port端口(8081)。
rest.bind-port可以設(shè)置成列表格式如50100,50101,也可設(shè)置成范圍格式如50100-50200。推薦范圍格式,避免端口沖突。

感謝你能夠認真閱讀完這篇文章,希望小編分享的“flink使用問題有哪些”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學習!

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


分享標題:flink使用問題有哪些-創(chuàng)新互聯(lián)
標題網(wǎng)址:http://weahome.cn/article/dpipgj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部