Skip to content

Commit

Permalink
[SPARK-40703][SQL] Introduce shuffle on SinglePartition to improve pa…
Browse files Browse the repository at this point in the history
…rallelism

### What changes were proposed in this pull request?

This PR fixes a performance regression issue when one side of a join uses `HashPartitioning` with `ShuffleExchange` while the other side uses `SinglePartition`. In this case, Spark will re-shuffle the side with `HashPartitioning` and both sides will end up with only a single partition. This could hurt query performance a lot if the side with `HashPartitioning` contains a lot of input data.

### Why are the changes needed?

After SPARK-35703, when Spark sees that one side of the join has `ShuffleExchange` (meaning it needs to be shuffled anyways), and the other side doesn't, it'll try to avoid shuffling the side without `ShuffleExchange`. For instance:

```
ShuffleExchange(HashPartition(200)) <-> HashPartition(150)
```

will be converted into
```
ShuffleExchange(HashPartition(150)) <-> HashPartition(150)
```

However, when the side without `ShuffleExchange` is `SinglePartition`, like the following:
```
ShuffleExchange(HashPartition(150)) <-> SinglePartition
```

Spark will also do the same which causes the left-hand side to only use one partition. This can hurt job parallelism dramatically, especially when using DataSource V2, since `SinglePartition` is used by the V2 scan. On the other hand, it seems DataSource V1 won't be impacted much as it always report `UnknownPartitioning` in this situation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new unit tests in `EnsureRequirementsSuite`.

Closes #38196 from sunchao/SPARK-40703.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit bde6423)
Signed-off-by: Yuming Wang <[email protected]>
  • Loading branch information
sunchao authored and wangyum committed Oct 15, 2022
1 parent 27ca30a commit ca60665
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ trait ShuffleSpec {
* clustering expressions.
*
* This will only be called when:
* - [[canCreatePartitioning]] returns true.
* - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
*/
def createPartitioning(clustering: Seq[Expression]): Partitioning =
Expand All @@ -542,7 +541,7 @@ case object SinglePartitionShuffleSpec extends ShuffleSpec {
other.numPartitions == 1
}

override def canCreatePartitioning: Boolean = true
override def canCreatePartitioning: Boolean = false

override def createPartitioning(clustering: Seq[Expression]): Partitioning =
SinglePartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper {
assert(HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution)
.canCreatePartitioning)
}
assert(SinglePartitionShuffleSpec.canCreatePartitioning)
assert(!SinglePartitionShuffleSpec.canCreatePartitioning)
withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") {
assert(ShuffleSpecCollection(Seq(
HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ case class EnsureRequirements(
case _ => false
}.map(_._2)

// Special case: if all sides of the join are single partition
val allSinglePartition =
childrenIndexes.forall(children(_).outputPartitioning == SinglePartition)

// If there are more than one children, we'll need to check partitioning & distribution of them
// and see if extra shuffles are necessary.
if (childrenIndexes.length > 1) {
if (childrenIndexes.length > 1 && !allSinglePartition) {
val specs = childrenIndexes.map(i => {
val requiredDist = requiredChildDistributions(i)
assert(requiredDist.isInstanceOf[ClusteredDistribution],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
val numExchanges = collect(plan) {
case exchange: ShuffleExchangeExec => exchange
}.length
assert(numExchanges === 3)
assert(numExchanges === 5)
}

{
Expand All @@ -278,7 +278,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
val numExchanges = collect(plan) {
case exchange: ShuffleExchangeExec => exchange
}.length
assert(numExchanges === 3)
assert(numExchanges === 5)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,10 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, SinglePartition, _, _), _),
SortExec(_, _, ShuffleExchangeExec(SinglePartition, _, _), _), _) =>
SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
assert(left.numPartitions == 5)
assert(right.numPartitions == 5)
case other => fail(other.toString)
}

Expand Down Expand Up @@ -690,6 +692,45 @@ class EnsureRequirementsSuite extends SharedSparkSession {
}
}

test("SPARK-40703: shuffle for SinglePartitionShuffleSpec") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> 20.toString) {
// We should re-shuffle the side with single partition when the other side is
// `HashPartitioning` with shuffle node, and respect the minimum parallelism.
var plan1: SparkPlan = ShuffleExchangeExec(
outputPartitioning = HashPartitioning(exprA :: Nil, 10),
DummySparkPlan())
var plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
var smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(left.numPartitions == 20)
assert(right.numPartitions == 20)
case other => fail(other.toString)
}

// We should also re-shuffle the side with only a single partition even the other side does
// not have `ShuffleExchange`, but just `HashPartitioning`. However in this case the minimum
// shuffle parallelism will be ignored since we don't want to introduce extra shuffle.
plan1 = DummySparkPlan(
outputPartitioning = HashPartitioning(exprA :: Nil, 10))
plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(right.numPartitions == 10)
case other => fail(other.toString)
}
}
}

test("Check with KeyGroupedPartitioning") {
// simplest case: identity transforms
var plan1 = DummySparkPlan(
Expand Down

0 comments on commit ca60665

Please sign in to comment.