這篇文章主要介紹了flink使用問題有哪些,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)自2013年創(chuàng)立以來,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、成都網(wǎng)站建設(shè)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元潁上做網(wǎng)站,已為上家服務(wù),為潁上各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18980820575
注冊(cè)表時(shí),請(qǐng)勿使用result
tableEnv.registerTable("result_agg", talbe);
如上,如果你寫為
tableEnv.registerTable("result", talbe);
那么會(huì)報(bào)以下錯(cuò)誤
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "from result" at line 1, column 13. Was expecting one of:"ORDER" ... "LIMIT" ... "OFFSET" ... "FETCH" ... "FROM" ... "FROM" ... "FROM" ... "FROM" ... "FROM" ... "FROM" "LATERAL" ... "FROM" "(" ... "FROM" "UNNEST" ... "FROM" "TABLE" ... "," ... "AS" ... ... ... ... ... ...
如果MySQL 類型是tinyint 要轉(zhuǎn)以下,否則flink會(huì)報(bào)錯(cuò)
Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
轉(zhuǎn)的方式很簡(jiǎn)單比如
select project_fid, cast(project_info_type as CHAR) as type from table
join的時(shí)候如果有一側(cè)為map類型的數(shù)據(jù)(比如你使用了collect方法,類似于mysql的group_concat),回報(bào)空指針,類似于
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while applying rule FlinkLogicalJoinConverter, args [rel#56:LogicalJoin.NONE(left=rel#52:Subset#0.NONE,right=rel#55:Subset#2.NONE,condition==($0, $2),joinType=inner)]
這個(gè)bug目前還沒有修復(fù),連接如下 https://issues.apache.org/jira/browse/FLINK-11433
只能想辦法繞過去,把map類型的數(shù)據(jù),變成字符串,方法是自定義函數(shù)
public class MapToString extends ScalarFunction { public String eval(Mapmap) { if(map==null || map.size()==0) { return ""; } StringBuffer sb=new StringBuffer(); for(Map.Entry entity : map.entrySet()) { sb.append(entity.getKey()+","); } String result=sb.toString(); return result.substring(0, result.length()-1); } }
調(diào)用的時(shí)候使用
select id, mapToString(collect(type)) as type from table group by id
當(dāng)然你還需要注冊(cè)一下
tableEnv.registerFunction("mapToString", new MapToString());
類型轉(zhuǎn)化錯(cuò)誤
最近總遇到類型轉(zhuǎn)化錯(cuò)誤的提示,目前發(fā)現(xiàn)了兩個(gè), 做個(gè)記錄
a 如果是tiny(1) 會(huì)自動(dòng)轉(zhuǎn)為 boolean, 除了上面的解決方案,更優(yōu)雅的是修改mysql 的連接,加上參數(shù) tinyInt1isBit=false, 注意大小寫
b 有時(shí)候mysql數(shù)據(jù)庫id字段明明是int,但flink卻認(rèn)定為long。 貌似以前mybatis也有此問題(https://blog.csdn.net/ahwsk/article/details/81975117)。
后來我又認(rèn)真的看了一下表設(shè)計(jì)(別人的表)發(fā)現(xiàn) 勾選了“無符號(hào)” 這個(gè)選項(xiàng),當(dāng)我去掉這個(gè)選項(xiàng),再次運(yùn)行,居然不報(bào)錯(cuò)了,看來是無符號(hào),讓flink轉(zhuǎn)化錯(cuò)誤的,無符號(hào)比有符號(hào)在范圍上多了一位, 多出的這一位,有可能已經(jīng)超過了java中int 的范圍(java中int 都是有符號(hào)的),所以自動(dòng)轉(zhuǎn)為long型了。
Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory'
雖然fatjar已經(jīng)有對(duì)應(yīng)的類了,但是依然報(bào)錯(cuò),最后的解決辦法是在flink的lib目錄中再次加入相關(guān)的類,問題解決。
cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
這個(gè)錯(cuò)誤需要在flink-conf.yaml 加入 classloader.resolve-order: parent-first
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#66653408]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
出現(xiàn)此錯(cuò)誤,在flink的配置中增加
akka.ask.timeout: 120s web.timeout: 120000
8.Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms
發(fā)生此錯(cuò)誤是提交任務(wù)時(shí)出錯(cuò),使用 yarn logs -applicationId application_1565600987111 查看錯(cuò)誤,找到原因,我遇到的原因是: akka.watch.heartbeat.pause 值小于 akka.watch.heartbeat.interval。修改后錯(cuò)誤消失
或者kill 掉 CliFrontend 的進(jìn)程
Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms. java.io.IOException: The client is stopped at org.apache.hadoop.ipc.Client.getConnection(Client.java:1519) at org.apache.hadoop.ipc.Client.call(Client.java:1381) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
出現(xiàn)此異常還是內(nèi)存的問題,檢查一下內(nèi)存是否足夠,必須是free的不能是available, 如果發(fā)現(xiàn)后者很高, 請(qǐng)執(zhí)行 以下兩條命令釋放內(nèi)存
sync echo 3 > /proc/sys/vm/drop_caches
10
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 8081
at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161)
... 9 more
此錯(cuò)誤是說端口被占用。查看源代碼:
IteratorportsIterator; try { portsIterator = NetUtils.getPortRangeFromString(restBindPortRange); } catch (IllegalConfigurationException e) { throw e; } catch (Exception e) { throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange); }
對(duì)應(yīng)的配置是 flink-conf.yaml中的rest.bind-port。
rest.bind-port不設(shè)置,則Rest Server默認(rèn)綁定到rest.port端口(8081)。
rest.bind-port可以設(shè)置成列表格式如50100,50101,也可設(shè)置成范圍格式如50100-50200。推薦范圍格式,避免端口沖突。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“flink使用問題有哪些”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!