Skip to content

Commit

Permalink
[CALCITE-5251] Support SQL hint for Snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
flink-sql authored and julianhyde committed Sep 8, 2022
1 parent ba80b91 commit baeecc8
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 12 deletions.
33 changes: 30 additions & 3 deletions core/src/main/java/org/apache/calcite/rel/core/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Litmus;

import com.google.common.collect.ImmutableList;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;
Expand All @@ -43,30 +47,49 @@
* returns the contents whose versions that overlap with the given specific
* period (i.e. those that started before given period and ended after it).
*/
public abstract class Snapshot extends SingleRel {
public abstract class Snapshot extends SingleRel implements Hintable {
//~ Instance fields --------------------------------------------------------

private final RexNode period;

protected final ImmutableList<RelHint> hints;

//~ Constructors -----------------------------------------------------------

/**
* Creates a Snapshot.
*
* @param cluster Cluster that this relational expression belongs to
* @param traitSet The traits of this relational expression
* @param hints Hints for this node
* @param input Input relational expression
* @param period Timestamp expression which as the table was at the given
* time in the past
*/
@SuppressWarnings("method.invocation.invalid")
protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
RexNode period) {
protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints,
RelNode input, RexNode period) {
super(cluster, traitSet, input);
this.period = Objects.requireNonNull(period, "period");
this.hints = ImmutableList.copyOf(hints);
assert isValid(Litmus.THROW, null);
}

/**
* Creates a Snapshot.
*
* @param cluster Cluster that this relational expression belongs to
* @param traitSet The traits of this relational expression
* @param input Input relational expression
* @param period Timestamp expression which as the table was at the given
* time in the past
*/
@SuppressWarnings("method.invocation.invalid")
protected Snapshot(
RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
this(cluster, traitSet, ImmutableList.of(), input, period);
}

//~ Methods ----------------------------------------------------------------

@Override public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
Expand Down Expand Up @@ -100,4 +123,8 @@ public RexNode getPeriod() {
}
return litmus.succeed();
}

@Override public ImmutableList<RelHint> getHints() {
return hints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class HintPredicates {
public static final HintPredicate WINDOW =
new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.WINDOW);

/** A hint predicate that indicates a hint can only be used to
* {@link org.apache.calcite.rel.core.Snapshot} nodes. */
public static final HintPredicate SNAPSHOT =
new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.SNAPSHOT);

/**
* Returns a composed hint predicate that represents a short-circuiting logical
* AND of an array of hint predicates {@code hintPredicates}. When evaluating the composed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Values;
Expand Down Expand Up @@ -99,7 +100,12 @@ enum NodeType {
/**
* The hint would be propagated to the Window nodes.
*/
WINDOW(Window.class);
WINDOW(Window.class),

/**
* The hint would be propagated to the Snapshot nodes.
*/
SNAPSHOT(Snapshot.class);

/** Relational expression clazz that the hint can apply to. */
@SuppressWarnings("ImmutableEnumChecker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,39 @@
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMdCollation;
import org.apache.calcite.rel.metadata.RelMdDistribution;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* Sub-class of {@link org.apache.calcite.rel.core.Snapshot}
* not targeted at any particular engine or calling convention.
*/
public class LogicalSnapshot extends Snapshot {

//~ Constructors -----------------------------------------------------------
/**
* Creates a LogicalSnapshot.
*
* <p>Use {@link #create} unless you know what you're doing.
*
* @param cluster Cluster that this relational expression belongs to
* @param traitSet The traits of this relational expression
* @param hints Hints for this node
* @param input Input relational expression
* @param period Timestamp expression which as the table was at the given
* time in the past
*/
public LogicalSnapshot(RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints,
RelNode input, RexNode period) {
super(cluster, traitSet, hints, input, period);
}

/**
* Creates a LogicalSnapshot.
Expand All @@ -49,12 +70,12 @@ public class LogicalSnapshot extends Snapshot {
*/
public LogicalSnapshot(RelOptCluster cluster, RelTraitSet traitSet,
RelNode input, RexNode period) {
super(cluster, traitSet, input, period);
super(cluster, traitSet, ImmutableList.of(), input, period);
}

@Override public Snapshot copy(RelTraitSet traitSet, RelNode input,
RexNode period) {
return new LogicalSnapshot(getCluster(), traitSet, input, period);
return new LogicalSnapshot(getCluster(), traitSet, hints, input, period);
}

/** Creates a LogicalSnapshot. */
Expand All @@ -69,4 +90,8 @@ public static LogicalSnapshot create(RelNode input, RexNode period) {
() -> RelMdDistribution.snapshot(mq, input));
return new LogicalSnapshot(cluster, traitSet, input, period);
}

@Override public RelNode withHints(final List<RelHint> hintList) {
return new LogicalSnapshot(getCluster(), traitSet, hintList, input, getPeriod());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ public final Fixture sql(String sql) {
sql(sql).ok();
}

@Test void testSnapshotHints() {
final String sql = "select /*+ fast_snapshot(products_temporal) */ stream * from\n"
+ " orders join products_temporal for system_time as of timestamp '2022-08-11 15:00:00'\n"
+ " on orders.productid = products_temporal.productid";
sql(sql).ok();
}

@Test void testHintsInSubQueryWithDecorrelation() {
final String sql = "select /*+ resource(parallelism='3'), AGG_STRATEGY(TWO_PHASE) */\n"
+ "sum(e1.empno) from emp e1, dept d1\n"
Expand Down Expand Up @@ -889,6 +896,11 @@ private static class HintCollector extends RelShuttleImpl {
if (window.getHints().size() > 0) {
this.hintsCollect.add("Window:" + window.getHints());
}
} else if (other instanceof Snapshot) {
Snapshot snapshot = (Snapshot) other;
if (snapshot.getHints().size() > 0) {
this.hintsCollect.add("Snapshot:" + snapshot.getHints());
}
}
return super.visit(other);
}
Expand Down Expand Up @@ -978,6 +990,7 @@ static HintStrategyTable createHintStrategies(HintStrategyTable.Builder builder)
.hintStrategy("async_merge", HintPredicates.SORT)
.hintStrategy("mini_batch",
HintPredicates.and(HintPredicates.WINDOW, HintPredicates.PROJECT))
.hintStrategy("fast_snapshot", HintPredicates.SNAPSHOT)
.hintStrategy("use_merge_join",
HintStrategy.builder(
HintPredicates.and(HintPredicates.JOIN, joinWithFixedTableName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,8 @@ EnumerableProject(ENAME=[$3], JOB=[$4], SAL=[$7], NAME=[$1])
</TestCase>
<TestCase name="testHintsPropagationInVolcanoPlannerRules2">
<Resource name="sql">
<![CDATA[select /*+ my_hint */ ename, job
from emp where not exists (select 1 from dept where emp.deptno = dept.deptno)
]]>
<![CDATA[select /*+ no_hash_join */ ename, job
from emp where not exists (select 1 from dept where emp.deptno = dept.deptno)]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
Expand Down Expand Up @@ -232,9 +231,8 @@ EnumerableProject(ENAME=[$0], JOB=[$1])
</TestCase>
<TestCase name="testHintsPropagationInVolcanoPlannerRules3">
<Resource name="sql">
<![CDATA[select /*+ my_hint */ ename, job
from emp where not exists (select 1 from dept where emp.deptno = dept.deptno) order by ename
]]>
<![CDATA[select /*+ no_hash_join */ ename, job
from emp where not exists (select 1 from dept where emp.deptno = dept.deptno) order by ename]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
Expand Down Expand Up @@ -358,6 +356,19 @@ on emp.deptno = dept.deptno]]>
Project:[[PROPERTIES inheritPath:[] options:{K1=v1, K2=v2}]]
TableScan:[[INDEX inheritPath:[] options:[IDX1, IDX2]], [PROPERTIES inheritPath:[0, 0] options:{K1=v1, K2=v2}]]
TableScan:[[PROPERTIES inheritPath:[] options:{K1=v1, K2=v2}], [PROPERTIES inheritPath:[0, 1] options:{K1=v1, K2=v2}]]
]]>
</Resource>
</TestCase>
<TestCase name="testSnapshotHints">
<Resource name="sql">
<![CDATA[select /*+ fast_snapshot(products_temporal) */ stream * from
orders join products_temporal for system_time as of timestamp '2022-08-11 15:00:00'
on orders.productid = products_temporal.productid]]>
</Resource>
<Resource name="hints">
<![CDATA[
Project:[[FAST_SNAPSHOT inheritPath:[] options:[PRODUCTS_TEMPORAL]]]
Snapshot:[[FAST_SNAPSHOT inheritPath:[0, 1] options:[PRODUCTS_TEMPORAL]]]
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit baeecc8

Please sign in to comment.