記錄一個調(diào)試 pyspark2sql 訪問 HDFS 透明加密的問題。
創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),浙江企業(yè)網(wǎng)站建設(shè),浙江品牌網(wǎng)站建設(shè),網(wǎng)站定制,浙江網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,浙江網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
訪問源碼如下,使用 pyspark2.1.3,基于 CDH 5.14.0 hive 1.1.0 + parquet,其中select的部分會訪問 hdfs 加密區(qū)域。
from?pyspark.sql?import?SQLContext from?pyspark.sql?import?HiveContext,?Row from?pyspark.sql.types?import?* import?pandas?as?pd import?pyspark.sql.functions?as?F ? trial_pps_order?=?spark.read.parquet('/tmp/exia/trial_pps_select') pps_order?=?spark.read.parquet('/tmp/exia/orders_pps_wc_member') member_info?=?spark.read.parquet('/tmp/exia/member_info') ? ? #?newHiveContext=HiveContext(sc) ? query_T="""?? ? select??*?from?crm.masterdata_hummingbird_product_mst_banner_v1? where?brand_name?=?'pampers' ? """ product_mst=spark.sql(query_T) ? product_mst.show()
在 zeppelin里運(yùn)行后返回報錯如下
Traceback?(most?recent?call?last): ??File?"/tmp/zeppelin_pyspark-7483288776781667654.py",?line?367,?in?????raise?Exception(traceback.format_exc()) Exception:?Traceback?(most?recent?call?last): ??File?"/tmp/zeppelin_pyspark-7483288776781667654.py",?line?360,?in? ????exec(code,?_zcUserQueryNameSpace) ??File?" ",?line?14,?in? ??File?"/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/dataframe.py",?line?318,?in?show ????print(self._jdf.showString(n,?20)) ??File?"/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",?line?1257,?in?__call__ ????answer,?self.gateway_client,?self.target_id,?self.name) ??File?"/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/utils.py",?line?63,?in?deco ????return?f(*a,?**kw) ??File?"/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",?line?328,?in?get_return_value ????format(target_id,?".",?name),?value) Py4JJavaError:?An?error?occurred?while?calling?o76.showString. :?org.apache.spark.SparkException:?Job?aborted?due?to?stage?failure:?Task?0?in?stage?3.0?failed?4?times,?most?recent?failure:?Lost?task?0.3?in?stage?3.0?(TID?6,?pg-dmp-slave28.hadoop,?executor?1):?java.io.IOException:?No?KeyProvider?is?configured,?cannot?access?an?encrypted?file at?org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338) at?org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414) at?org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at?org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) at?org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at?org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) at?org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at?org.apache.hadoop.mapred.LineRecordReader. (LineRecordReader.java:109) at?org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at?org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257) at?org.apache.spark.rdd.HadoopRDD$$anon$1. (HadoopRDD.scala:256) at?org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216) at?org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at?org.apache.spark.scheduler.Task.run(Task.scala:100) at?org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at?java.lang.Thread.run(Thread.java:748) Driver?stacktrace: at?org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455) at?org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443) at?org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at?scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at?scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at?org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442) at?org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at?org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at?scala.Option.foreach(Option.scala:257) at?org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at?org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) at?org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625) at?org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614) at?org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at?org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at?org.apache.spark.SparkContext.runJob(SparkContext.scala:1928) at?org.apache.spark.SparkContext.runJob(SparkContext.scala:1941) at?org.apache.spark.SparkContext.runJob(SparkContext.scala:1954) at?org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at?org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at?org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2390) at?org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at?org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2792) at?org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2389) at?org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2396) at?org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) at?org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2131) at?org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2822) at?org.apache.spark.sql.Dataset.head(Dataset.scala:2131) at?org.apache.spark.sql.Dataset.take(Dataset.scala:2346) at?org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at?sun.reflect.NativeMethodAccessorImpl.invoke0(Native?Method) at?sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at?sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at?java.lang.reflect.Method.invoke(Method.java:498) at?py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at?py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at?py4j.Gateway.invoke(Gateway.java:282) at?py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at?py4j.commands.CallCommand.execute(CallCommand.java:79) at?py4j.GatewayConnection.run(GatewayConnection.java:238) at?java.lang.Thread.run(Thread.java:748) Caused?by:?java.io.IOException:?No?KeyProvider?is?configured,?cannot?access?an?encrypted?file at?org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338) at?org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414) at?org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at?org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) at?org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at?org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) at?org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at?org.apache.hadoop.mapred.LineRecordReader. (LineRecordReader.java:109) at?org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at?org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257) at?org.apache.spark.rdd.HadoopRDD$$anon$1. (HadoopRDD.scala:256) at?org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216) at?org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at?org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at?org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at?org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at?org.apache.spark.scheduler.Task.run(Task.scala:100) at?org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ...?1?more
主要來說,日志里提示是沒有提供訪問加密區(qū)域的key,無法訪問被加密的數(shù)據(jù)。
出現(xiàn)這個報錯的主要原因是spark會優(yōu)先使用其自身conf文件夾下的hive-site.xml配置項來訪問hiveserver2服務(wù),但是這個hive-site.xml文件里面沒有配置訪問加密區(qū)域所需要的配置。加上就OK了。
?????? ??hadoop.security.key.provider.path ????kms://http@dmp-master2.hadoop:16000/kms ?????? ??dfs.encrypt.data.transfer.algorithm ????3des ?????? ??dfs.encrypt.data.transfer.cipher.suites ????AES/CTR/NoPadding ?????? ??dfs.encrypt.data.transfer.cipher.key.bitlength ????256 ?????? dfs.encryption.key.provider.uri ????kms://http@dmp-master2.hadoop:16000/kms ??