提交 c43f3557 编写于 作者: J Josh Rosen

[SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when...

[SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (https://github.com/apache/spark/pull/24438#pullrequestreview-232692586), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.
Authored-by: NJosh Rosen <joshrosen@databricks.com>
Signed-off-by: NJosh Rosen <joshrosen@databricks.com>
(cherry picked from commit c4e975e1)
Signed-off-by: NJosh Rosen <joshrosen@databricks.com>
上级 fe2f646a
......@@ -700,6 +700,35 @@ private[spark] class DAGScheduler(
missing.toList
}
/** Invoke `.partitions` on the given RDD and all of its ancestors */
private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = {
val startTime = System.nanoTime
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
def visit(rdd: RDD[_]): Unit = {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
// Eagerly compute:
rdd.partitions
for (dep <- rdd.dependencies) {
waitingForVisit.prepend(dep.rdd)
}
}
}
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
logDebug("eagerlyComputePartitionsForRddAndAncestors for RDD %d took %f seconds"
.format(rdd.id, (System.nanoTime - startTime) / 1e9))
}
/**
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
......@@ -809,6 +838,11 @@ private[spark] class DAGScheduler(
"Total number of partitions: " + maxPartitions)
}
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val clonedProperties = Utils.cloneProperties(properties)
......@@ -898,6 +932,12 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
return new PartialResult(evaluator.currentResult(), true)
}
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
......@@ -930,6 +970,11 @@ private[spark] class DAGScheduler(
throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
}
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
......
......@@ -112,7 +112,7 @@ class HealthTrackerIntegrationSuite extends SchedulerIntegrationSuite[MultiExecu
backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
val pattern = (
s"""|Aborting TaskSet 0.0 because task .*
......@@ -150,7 +150,7 @@ class MockRDDWithLocalityPrefs(
sc: SparkContext,
numPartitions: Int,
shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps, Nil) {
override def getPreferredLocations(split: Partition): Seq[String] = {
Seq(preferredLoc)
}
......
......@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
import java.util.Properties
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
......@@ -205,7 +205,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
def shuffle(nParts: Int, input: MockRDD): MockRDD = {
val partitioner = new HashPartitioner(nParts)
val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner)
new MockRDD(sc, nParts, List(shuffleDep))
new MockRDD(sc, nParts, List(shuffleDep), Nil)
}
/** models a one-to-one dependency within a stage, like a map or filter */
def oneToOne(input: MockRDD): MockRDD = {
val dep = new OneToOneDependency[(Int, Int)](input)
new MockRDD(sc, input.numPartitions, Nil, Seq(dep))
}
/** models a stage boundary with multiple dependencies, like a join */
......@@ -214,7 +220,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
val shuffleDeps = inputs.map { inputRDD =>
new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
}
new MockRDD(sc, nParts, shuffleDeps)
new MockRDD(sc, nParts, shuffleDeps, Nil)
}
val backendException = new AtomicReference[Exception](null)
......@@ -449,10 +455,11 @@ case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: I
class MockRDD(
sc: SparkContext,
val numPartitions: Int,
val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val oneToOneDeps: Seq[OneToOneDependency[(Int, Int)]]
) extends RDD[(Int, Int)](sc, deps = shuffleDeps ++ oneToOneDeps) with Serializable {
MockRDD.validate(numPartitions, shuffleDeps)
MockRDD.validate(numPartitions, shuffleDeps, oneToOneDeps)
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
......@@ -468,14 +475,25 @@ class MockRDD(
object MockRDD extends AssertionsHelper with TripleEquals with Assertions {
/**
* make sure all the shuffle dependencies have a consistent number of output partitions
* and that one-to-one dependencies have the same partition counts as their parents
* (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong)
*/
def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = {
dependencies.foreach { dependency =>
def validate(
numPartitions: Int,
shuffleDependencies: Seq[ShuffleDependency[_, _, _]],
oneToOneDependencies: Seq[OneToOneDependency[_]]): Unit = {
shuffleDependencies.foreach { dependency =>
val partitioner = dependency.partitioner
assert(partitioner != null)
assert(partitioner.numPartitions === numPartitions)
}
oneToOneDependencies.foreach { dependency =>
// In order to support the SPARK-23626 testcase, we cast to MockRDD
// and access `numPartitions` instead of just calling `getNumPartitions`:
// `getNumPartitions` would call `getPartitions`, undermining the intention
// of the SPARK-23626 testcase.
assert(dependency.rdd.asInstanceOf[MockRDD].numPartitions === numPartitions)
}
}
}
......@@ -539,7 +557,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskSuccess(taskDescription, 42)
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
......@@ -564,7 +582,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
}
}
val a = new MockRDD(sc, 2, Nil)
val a = new MockRDD(sc, 2, Nil, Nil)
val b = shuffle(10, a)
val c = shuffle(20, a)
val d = join(30, b, c)
......@@ -604,7 +622,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
* (b) we get a second attempt for stage 0 & stage 1
*/
testScheduler("job with fetch failure") {
val input = new MockRDD(sc, 2, Nil)
val input = new MockRDD(sc, 2, Nil, Nil)
val shuffledRdd = shuffle(10, input)
val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId
......@@ -646,10 +664,88 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
assert(failure.getMessage.contains("test task failure"))
}
assertDataStructuresEmpty(noFailure = false)
}
testScheduler("SPARK-23626: RDD with expensive getPartitions() doesn't block scheduler loop") {
// Before SPARK-23626, expensive `RDD.getPartitions()` calls might occur inside of the
// DAGScheduler event loop, causing concurrently-submitted jobs to block. This test case
// reproduces a scenario where that blocking could occur.
// We'll use latches to simulate an RDD with a slow getPartitions() call.
import MockRDDWithSlowGetPartitions._
// DAGScheduler.submitJob calls `.partitions` on the RDD passed to it.
// Therefore to write a proper regression test for SPARK-23626 we must
// ensure that the slow getPartitions() call occurs deeper in the RDD DAG:
val rddWithSlowGetPartitions = oneToOne(new MockRDDWithSlowGetPartitions(sc, 1))
// A RDD whose execution should not be blocked by the other RDD's slow getPartitions():
val simpleRdd = new MockRDD(sc, 1, Nil, Nil)
getPartitionsShouldNotHaveBeenCalledYet.set(false)
def runBackend(): Unit = {
val (taskDescription, _) = backend.beginTask()
backend.taskSuccess(taskDescription, 42)
}
withBackend(runBackend _) {
// Submit a job containing an RDD which will hang in getPartitions() until we release
// the countdown latch:
import scala.concurrent.ExecutionContext.Implicits.global
val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) }.flatten
// Block the current thread until the other thread has started the getPartitions() call:
beginGetPartitionsLatch.await(duration.toSeconds, SECONDS)
// Submit a concurrent job. This job's execution should not be blocked by the other job:
val fastJobFuture = submit(simpleRdd, Array(0))
awaitJobTermination(fastJobFuture, duration)
// The slow job should still be blocked in the getPartitions() call:
assert(!slowJobFuture.isCompleted)
// Allow it to complete:
endGetPartitionsLatch.countDown()
awaitJobTermination(slowJobFuture, duration)
}
assertDataStructuresEmpty()
}
}
/** Helper class used in SPARK-23626 test case */
private object MockRDDWithSlowGetPartitions {
// Latch for blocking the test execution thread until getPartitions() has been called:
val beginGetPartitionsLatch = new CountDownLatch(1)
// Latch for blocking the getPartitions() call from completing:
val endGetPartitionsLatch = new CountDownLatch(1)
// Atomic boolean which is used to fail the test in case getPartitions() is called earlier
// than expected. This guards against false-negatives (e.g. the test passing because
// `.getPartitions()` was called in the test setup before we even submitted a job):
val getPartitionsShouldNotHaveBeenCalledYet = new AtomicBoolean(true)
}
/** Helper class used in SPARK-23626 test case */
private class MockRDDWithSlowGetPartitions(
sc: SparkContext,
numPartitions: Int) extends MockRDD(sc, numPartitions, Nil, Nil) {
import MockRDDWithSlowGetPartitions._
override def getPartitions: Array[Partition] = {
if (getPartitionsShouldNotHaveBeenCalledYet.get()) {
throw new Exception("getPartitions() should not have been called at this point")
}
beginGetPartitionsLatch.countDown()
val partitions = super.getPartitions
endGetPartitionsLatch.await()
partitions
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册