真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark3.0.1中AQE配置的示例分析

這篇文章主要介紹spark 3.0.1中AQE配置的示例分析,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!

創(chuàng)新互聯(lián)建站是一家專注于網(wǎng)站建設(shè)、成都網(wǎng)站制作與策劃設(shè)計,鄭州網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)10余年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:鄭州等地區(qū)。鄭州做網(wǎng)站價格咨詢:13518219792

AQE簡介

從spark configuration,到在最早在spark 1.6版本就已經(jīng)有了AQE;到了spark 2.x版本,intel大數(shù)據(jù)團(tuán)隊進(jìn)行了相應(yīng)的原型開發(fā)和實踐;到了spark 3.0時代,Databricks和intel一起為社區(qū)貢獻(xiàn)了新的AQE

spark 3.0.1中的AQE的配置

配置項默認(rèn)值官方說明分析
spark.sql.adaptive.enabledfalse是否開啟自適應(yīng)查詢此處設(shè)置為true開啟
spark.sql.adaptive.coalescePartitions.enabledtrue是否合并臨近的shuffle分區(qū)(根據(jù)'spark.sql.adaptive.advisoryPartitionSizeInBytes'的閾值來合并)此處默認(rèn)為true開啟,分析見: 分析1
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle合并分區(qū)之前的初始分區(qū)數(shù),默認(rèn)為spark.sql.shuffle.partitions的值分析見:分析2
spark.sql.adaptive.coalescePartitions.minPartitionNum(none)shuffle 分區(qū)合并后的最小分區(qū)數(shù),默認(rèn)為spark集群的默認(rèn)并行度分析見: 分析3
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB建議的shuffle分區(qū)的大小,在合并分區(qū)和處理join數(shù)據(jù)傾斜的時候用到分析見:分析3
spark.sql.adaptive.skewJoin.enabledtrue是否開啟join中數(shù)據(jù)傾斜的自適應(yīng)處理
spark.sql.adaptive.skewJoin.skewedPartitionFactor5數(shù)據(jù)傾斜判斷因子,必須同時滿足skewedPartitionFactor和skewedPartitionThresholdInBytes分析見:分析4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB數(shù)據(jù)傾斜判斷閾值,必須同時滿足skewedPartitionFactor和skewedPartitionThresholdInBytes分析見:分析4
spark.sql.adaptive.logLeveldebug配置自適應(yīng)執(zhí)行的計劃改變?nèi)罩?/td>調(diào)整為info級別,便于觀察自適應(yīng)計劃的改變
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2轉(zhuǎn)為broadcastJoin的非空分區(qū)比例閾值,>=該值,將不會轉(zhuǎn)換為broadcastjoin分析見:分析5

分析1

在OptimizeSkewedJoin.scala中,我們看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理計劃中的規(guī)則)

 /**
   * The goal of skew join optimization is to make the data distribution more even. The target size
   * to split skewed partitions is the average size of non-skewed partition, or the
   * advisory partition size if avg size is smaller than it.
   */
  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
    // It's impossible that all the partitions are skewed, as we use median size to define skew.
    assert(nonSkewSizes.nonEmpty)
    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
  }

其中:

  1. nonSkewSizes為task非傾斜的分區(qū)

  2. targetSize返回的是max(非傾斜的分區(qū)的平均值,advisorySize),其中advisorySize為spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以說 targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值

  3. medianSize值為task的分區(qū)大小的中位值

分析2

在SQLConf.scala

def numShufflePartitions: Int = {
    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
    } else {
      defaultNumShufflePartitions
    }
  }

從spark 3.0.1開始如果開啟了AQE和shuffle分區(qū)合并,則用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,這在如果有多個shuffle stage的情況下,增加分區(qū)數(shù),可以有效的增強(qiáng)shuffle分區(qū)合并的效果

分析3

在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一個物理計劃的規(guī)則,會執(zhí)行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
      plan
    } else {
      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
      // we should skip it when calculating the `partitionStartIndices`.
      val validMetrics = shuffleStages.flatMap(_.mapStats)

      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
      // in that case. For example when we union fully aggregated data (data is arranged to a single
      // partition) and a result of a SortMergeJoin (multiple partitions).
      val distinctNumPreShufflePartitions =
        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
        // We fall back to Spark default parallelism if the minimum number of coalesced partitions
        // is not set, so to avoid perf regressions compared to no coalescing.
        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)
        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
          validMetrics.toArray,
          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
          minNumPartitions = minPartitionNum)
        // This transformation adds new nodes, so we must use `transformUp` here.
        val stageIds = shuffleStages.map(_.id).toSet
        plan.transformUp {
          // even for shuffle exchange whose input RDD has 0 partition, we should still update its
          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
          // number of output partitions.
          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)
        }
      } else {
        plan
      }
    }
  }

也就是說:

  1. 如果是用戶自己指定的分區(qū)操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum無效,且跳過分區(qū)合并優(yōu)化

  2. 如果多個task進(jìn)行shuffle,且task有不同的分區(qū)數(shù)的話,spark.sql.adaptive.coalescePartitions.minPartitionNum無效,且跳過分區(qū)合并優(yōu)化

  3. 見ShufflePartitionsUtil.coalescePartition分析

分析4

在OptimizeSkewedJoin.scala中,我們看到

/**
   * A partition is considered as a skewed partition if its size is larger than the median
   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
   * ADVISORY_PARTITION_SIZE_IN_BYTES.
   */
  private def isSkewed(size: Long, medianSize: Long): Boolean = {
    size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
  }
  1. OptimizeSkewedJoin是個物理計劃的規(guī)則,會根據(jù)isSkewed來判斷是否數(shù)據(jù)數(shù)據(jù)有傾斜,而且必須是滿足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才會判斷為數(shù)據(jù)傾斜了

  2. medianSize為task的分區(qū)大小的中位值

分析5

在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中調(diào)用了reOptimize方法,而reOptimize方法則會執(zhí)行邏輯計劃的優(yōu)化操作:

private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
    logicalPlan.invalidateStatsCache()
    val optimized = optimizer.execute(logicalPlan)
    val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
    val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
    (newPlan, optimized)
  }

而optimizer 中有個DemoteBroadcastHashJoin規(guī)則:

@transient private val optimizer = new RuleExecutor[LogicalPlan] {
    // TODO add more optimization rules
    override protected def batches: Seq[Batch] = Seq(
      Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
    )
  }

而對于DemoteBroadcastHashJoin則有對是否broadcastjoin的判斷:

case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {

  private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
      && stage.mapStats.isDefined =>
      val mapStats = stage.mapStats.get
      val partitionCnt = mapStats.bytesByPartitionId.length
      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
      partitionCnt > 0 && nonZeroCnt > 0 &&
        (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
    case _ => false
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
    case j @ Join(left, right, _, _, hint) =>
      var newHint = hint
      if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {
        newHint = newHint.copy(leftHint =
          Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {
        newHint = newHint.copy(rightHint =
          Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (newHint.ne(hint)) {
        j.copy(hint = newHint)
      } else {
        j
      }
  }
}

shouldDemote就是對是否進(jìn)行broadcastjoin的判斷:

  1. 首先得是ShuffleQueryStageExec操作

  2. 如果非空分區(qū)比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,則不會把mergehashjoin轉(zhuǎn)換為broadcastJoin

  3. 這在sql中先join在groupby的場景中比較容易出現(xiàn)

ShufflePartitionsUtil.coalescePartition分析(合并分區(qū)的核心代碼)

見coalescePartition如示:

def coalescePartitions(
      mapOutputStatistics: Array[MapOutputStatistics],
      advisoryTargetSize: Long,
      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
    // If `minNumPartitions` is very large, it is possible that we need to use a value less than
    // `advisoryTargetSize` as the target size of a coalesced task.
    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
    // The max at here is to make sure that when we have an empty table, we only have a single
    // coalesced partition.
    // There is no particular reason that we pick 16. We just need a number to prevent
    // `maxTargetSize` from being set to 0.
    val maxTargetSize = math.max(
      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
    val targetSize = math.min(maxTargetSize, advisoryTargetSize)

    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
      s"actual target size $targetSize.")

    // Make sure these shuffles have the same number of partitions.
    val distinctNumShufflePartitions =
      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
    // The reason that we are expecting a single value of the number of shuffle partitions
    // is that when we add Exchanges, we set the number of shuffle partitions
    // (i.e. map output partitions) using a static setting, which is the value of
    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different
    // number of partitions, they will have the same number of shuffle partitions
    // (i.e. map output partitions).
    assert(
      distinctNumShufflePartitions.length == 1,
      "There should be only one distinct value of the number of shuffle partitions " +
        "among registered Exchange operators.")

    val numPartitions = distinctNumShufflePartitions.head
    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()
    var latestSplitPoint = 0
    var coalescedSize = 0L
    var i = 0
    while (i < numPartitions) {
      // We calculate the total size of i-th shuffle partitions from all shuffles.
      var totalSizeOfCurrentPartition = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {
        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }

      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
      // new coalesced partition.
      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
        latestSplitPoint = i
        // reset postShuffleInputSize.
        coalescedSize = totalSizeOfCurrentPartition
      } else {
        coalescedSize += totalSizeOfCurrentPartition
      }
      i += 1
    }
    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)

    partitionSpecs
  }
  1. totalPostShuffleInputSize 先計算出總的shuffle的數(shù)據(jù)大小

  2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值

  3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以說該值只是建議值,不一定是targetSize

  4. while循環(huán)就是取相鄰的分區(qū)合并,對于每個task中的每個相鄰分區(qū)合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin分析(數(shù)據(jù)傾斜優(yōu)化的核心代碼)

見optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
    case smj @ SortMergeJoinExec(_, _, joinType, _,
        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
        if supportedJoinTypes.contains(joinType) =>
      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
      val numPartitions = left.partitionsWithSizes.length
      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
      logDebug(
        s"""
          |Optimizing skewed join.
          |Left side partitions size info:
          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
          |Right side partitions size info:
          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
        """.stripMargin)
      val canSplitLeft = canSplitLeftSide(joinType)
      val canSplitRight = canSplitRightSide(joinType)
      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
      // the final data distribution is even (coalesced partitions + split partitions).
      val leftActualSizes = left.partitionsWithSizes.map(_._2)
      val rightActualSizes = right.partitionsWithSizes.map(_._2)
      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val leftSkewDesc = new SkewDesc
      val rightSkewDesc = new SkewDesc
      for (partitionIndex <- 0 until numPartitions) {
        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
          val reducerId = leftPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            left.mapStats.shuffleId, reducerId, leftTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(leftPartSpec))
        } else {
          Seq(leftPartSpec)
        }

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val rightParts = if (isRightSkew && !isRightCoalesced) {
          val reducerId = rightPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            right.mapStats.shuffleId, reducerId, rightTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(rightPartSpec))
        } else {
          Seq(rightPartSpec)
        }

        for {
          leftSidePartition <- leftParts
          rightSidePartition <- rightParts
        } {
          leftSidePartitions += leftSidePartition
          rightSidePartitions += rightSidePartition
        }
      }

      logDebug("number of skewed partitions: " +
        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
        val newLeft = CustomShuffleReaderExec(
          left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
        val newRight = CustomShuffleReaderExec(
          right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
        smj.copy(
          left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
      } else {
        smj
      }
  }
  1. SortMergeJoinExec說明適用于sort merge join

  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保證進(jìn)行join的兩個task的分區(qū)數(shù)相等

  3. 分別計算進(jìn)行join的task的分區(qū)中位數(shù)的大小leftMedSize和rightMedSize

  4. 分別計算進(jìn)行join的task的分區(qū)的targetzise大小leftTargetSize和rightTargetSize

  5. 循環(huán)判斷兩個task的每個分區(qū)的是否存在傾斜,如果傾斜且滿足沒有進(jìn)行過shuffle分區(qū)合并,則進(jìn)行傾斜分區(qū)處理,否則不處理

  6. createSkewPartitionSpecs方法為: 1.獲取每個join的task的對應(yīng)分區(qū)的數(shù)據(jù)大小 2.根據(jù)targetSize分成多個slice

  7. 如果存在數(shù)據(jù)傾斜,則構(gòu)造包裝成CustomShuffleReaderExec,進(jìn)行后續(xù)任務(wù)的運行,最最終調(diào)用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec進(jìn)行數(shù)據(jù)的讀取,其中還會自動開啟“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch減少io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被調(diào)用

如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf)
  )

可見在AdaptiveSparkPlanExec中被調(diào)用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin, 而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被調(diào)用 ,而InsertAdaptiveSparkPlan在QueryExecution中被調(diào)用

而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我們看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
      plan.find {
        case _: Exchange => true
        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
        case p => p.expressions.exists(_.find {
          case _: SubqueryExpression => true
          case _ => false
        }.isDefined)
      }.isDefined
    }
  }

private def supportAdaptive(plan: SparkPlan): Boolean = {
    // TODO migrate dynamic-partition-pruning onto adaptive execution.
    sanityCheck(plan) &&
      !plan.logicalLink.exists(_.isStreaming) &&
      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
    plan.children.forall(supportAdaptive)
  }

如果不滿足以上條件也是不會開啟AQE的,如果要強(qiáng)制開啟,也可以配置spark.sql.adaptive.forceApply 為true(文檔中提示是內(nèi)部配置)

注意:

在spark 3.0.1中已經(jīng)廢棄了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    
spark.sql.adaptive.skewedPartitionRowCountThreshold    
spark.sql.adaptive.skewedPartitionSizeThreshold

以上是“spark 3.0.1中AQE配置的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


當(dāng)前文章:spark3.0.1中AQE配置的示例分析
當(dāng)前地址:http://weahome.cn/article/ipgihh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部