Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose UpdateId in interceptors and workflow scope #477

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Interceptor/WorkflowInbound/UpdateInput.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
class UpdateInput
{
/**
* @param non-empty-string $updateName
* @param non-empty-string $updateId
*
* @internal Don't use the constructor. Use {@see self::with()} instead.
*/
public function __construct(
public readonly string $updateName,
public readonly string $updateId,
public readonly WorkflowInfo $info,
public readonly ValuesInterface $arguments,
public readonly HeaderInterface $header,
Expand All @@ -36,6 +40,7 @@ public function with(
): self {
return new self(
$this->updateName,
$this->updateId,
$info ?? $this->info,
$arguments ?? $this->arguments,
$header ?? $this->header
Expand Down
1 change: 1 addition & 0 deletions src/Internal/Transport/Router/InvokeUpdate.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred

$input = new UpdateInput(
updateName: $name,
updateId: $updateId,
info: $context->getInfo(),
arguments: $request->getPayloads(),
header: $request->getHeader(),
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Workflow/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ function (UpdateInput $input) use ($handler): void {
context: $this->context->withInput(
new Input($input->info, $input->arguments, $input->header),
),
updateContext: Workflow\UpdateContext::fromInput($input),
);

$scope->startUpdate(
function () use ($handler, $inboundPipeline, $input): mixed {
Workflow::setCurrentContext($this->scopeContext);
return $inboundPipeline->with(
static fn(UpdateInput $input): mixed => $handler($input->arguments),
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
Expand Down
24 changes: 14 additions & 10 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ class Scope implements CancellationScopeInterface, Destroyable
*/
private bool $cancelled = false;

/**
* @param WorkflowContext $ctx
* @param ServiceContainer $services
*/
public function __construct(ServiceContainer $services, WorkflowContext $ctx)
{
public function __construct(
ServiceContainer $services,
WorkflowContext $ctx,
?Workflow\UpdateContext $updateContext = null,
) {
$this->context = $ctx;
$this->scopeContext = ScopeContext::fromWorkflowContext(
$this->context,
$this,
\Closure::fromCallable([$this, 'onRequest'])
\Closure::fromCallable([$this, 'onRequest']),
$updateContext,
);

$this->services = $services;
Expand Down Expand Up @@ -347,9 +347,13 @@ public function onAwait(Deferred $deferred): void
$deferred->promise()->then($cleanup, $cleanup);
}

protected function createScope(bool $detached, ?string $layer = null, WorkflowContext $context = null): self
{
$scope = new Scope($this->services, $context ?? $this->context);
protected function createScope(
bool $detached,
?string $layer = null,
WorkflowContext $context = null,
?Workflow\UpdateContext $updateContext = null,
): self {
$scope = new Scope($this->services, $context ?? $this->context, $updateContext);
$scope->detached = $detached;

if ($layer !== null) {
Expand Down
17 changes: 13 additions & 4 deletions src/Internal/Workflow/ScopeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Exception\Failure\CanceledFailure;
use Temporal\Internal\Declaration\Destroyable;
use Temporal\Internal\Transport\CompletableResult;
use Temporal\Internal\Workflow\Process\Scope;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\CancellationScopeInterface;
use Temporal\Workflow\ScopedContextInterface;
use Temporal\Internal\Transport\Request\UpsertSearchAttributes;
use Temporal\Workflow\UpdateContext;

class ScopeContext extends WorkflowContext implements ScopedContextInterface, Destroyable
class ScopeContext extends WorkflowContext implements ScopedContextInterface
{
private WorkflowContext $parent;
private Scope $scope;
/** @var callable */
private $onRequest;
private ?UpdateContext $updateContext = null;

/**
* Creates scope specific context.
Expand All @@ -41,19 +42,21 @@ class ScopeContext extends WorkflowContext implements ScopedContextInterface, De
public static function fromWorkflowContext(
WorkflowContext $context,
Scope $scope,
callable $onRequest
callable $onRequest,
?UpdateContext $updateContext,
): self {
$ctx = new self(
$context->services,
$context->client,
$context->workflowInstance,
$context->input,
$context->getLastCompletionResultValues()
$context->getLastCompletionResultValues(),
);

$ctx->parent = $context;
$ctx->scope = $scope;
$ctx->onRequest = $onRequest;
$ctx->updateContext = $updateContext;

return $ctx;
}
Expand Down Expand Up @@ -100,6 +103,11 @@ public function request(RequestInterface $request, bool $cancellable = true): Pr
);
}

public function getUpdateContext(): ?UpdateContext
{
return $this->updateContext;
}

/**
* @param string $conditionGroupId
* @param callable $condition
Expand Down Expand Up @@ -147,6 +155,7 @@ public function upsertSearchAttributes(array $searchAttributes): void
);
}

#[\Override]
public function destroy(): void
{
parent::destroy();
Expand Down
1 change: 1 addition & 0 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ protected function recordTrace(): void
$this->trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
}

#[\Override]
public function destroy(): void
{
$this->awaits = [];
Expand Down
9 changes: 9 additions & 0 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Temporal\Workflow\ContinueAsNewOptions;
use Temporal\Workflow\ExternalWorkflowStubInterface;
use Temporal\Workflow\ScopedContextInterface;
use Temporal\Workflow\UpdateContext;
use Temporal\Workflow\WorkflowExecution;
use Temporal\Workflow\WorkflowInfo;
use Temporal\Internal\Support\DateInterval;
Expand Down Expand Up @@ -104,6 +105,14 @@ public static function getInfo(): WorkflowInfo
return self::getCurrentContext()->getInfo();
}

/**
* @throws OutOfContextException in the absence of the workflow execution context.
*/
public static function getUpdateContext(): ?UpdateContext
{
return self::getCurrentContext()->getUpdateContext();
}

/**
* Returns workflow execution input arguments.
*
Expand Down
41 changes: 41 additions & 0 deletions src/Workflow/UpdateContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Workflow;

use JetBrains\PhpStorm\Immutable;
use Temporal\Interceptor\WorkflowInbound\UpdateInput;
use Temporal\Internal\Marshaller\Meta\Marshal;

#[Immutable]
final class UpdateContext
{
/**
* @param non-empty-string $updateId
*/
public function __construct(
#[Marshal(name: 'UpdateId')]
private string $updateId,
) {}

public static function fromInput(UpdateInput $input): self
{
return new self($input->updateId);
}

/**
* @return non-empty-string
*/
public function getUpdateId(): string
{
return $this->updateId;
}
}
73 changes: 73 additions & 0 deletions tests/Acceptance/Harness/Update/ContextTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

declare(strict_types=1);

namespace Temporal\Tests\Acceptance\Harness\WorkflowUpdate\Context;

use PHPUnit\Framework\Attributes\Test;
use Temporal\Client\Update\UpdateOptions;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Tests\Acceptance\App\Attribute\Stub;
use Temporal\Tests\Acceptance\App\TestCase;
use Temporal\Workflow;
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

class ContextTest extends TestCase
{
#[Test]
public static function check(
#[Stub('Harness_WorkflowUpdate_Context')]WorkflowStubInterface $stub,
): void {
$handle = $stub->startUpdate(UpdateOptions::new('my_update')->withUpdateId('test-update-id'));

$updated2 = $stub->startUpdate(UpdateOptions::new('my_update2')->withUpdateId('test-update-id-2'))->getResult();
self::assertSame('test-update-id-2', $updated2);

// Check ID from the first Update
$updated = $handle->getResult();
self::assertSame('test-update-id', $updated);

self::assertNull($stub->getResult());
}
}

#[WorkflowInterface]
class FeatureWorkflow
{
private bool $done = false;
private bool $upd2 = false;

#[WorkflowMethod('Harness_WorkflowUpdate_Context')]
public function run()
{
yield Workflow::await(fn(): bool => $this->done);
return Workflow::getUpdateContext()?->getUpdateId();
}

#[Workflow\UpdateMethod('my_update')]
public function myUpdate()
{
Workflow::getUpdateContext() === null and throw new \RuntimeException('Update context should not be null.');

$updateId = Workflow::getUpdateContext()->getUpdateID();

yield Workflow::await(fn() => $this->upd2);
Workflow::getUpdateContext() === null and throw new \RuntimeException('Update context should not be null.');
$updateId !== Workflow::getUpdateContext()->getUpdateID() and throw new \RuntimeException(
'Update ID should not change.'
);

$this->done = true;
return $updateId;
}

#[Workflow\UpdateMethod('my_update2')]
public function myUpdate2()
{
Workflow::getUpdateContext() === null and throw new \RuntimeException('Update context should not be null.');

$this->upd2 = true;
return Workflow::getUpdateContext()->getUpdateID();
}
}
97 changes: 97 additions & 0 deletions tests/Acceptance/Harness/Update/TaskFailureTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

declare(strict_types=1);

namespace Temporal\Tests\Acceptance\Harness\Update\TaskFailure;

use PHPUnit\Framework\Attributes\DoesNotPerformAssertions;
use PHPUnit\Framework\Attributes\Test;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Exception\Client\WorkflowUpdateException;
use Temporal\Exception\Failure\ApplicationFailure;
use Temporal\Tests\Acceptance\App\Attribute\Stub;
use Temporal\Tests\Acceptance\App\TestCase;
use Temporal\Workflow;
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

class TaskFailureTest extends TestCase
{
#[Test]
public static function retryableException(
#[Stub('Harness_Update_TaskFailure')]WorkflowStubInterface $stub,
): void {
try {
$stub->update('do_update');
throw new \RuntimeException('Expected validation exception');
} catch (WorkflowUpdateException $e) {
self::assertStringContainsString("I'll fail update", $e->getPrevious()?->getMessage());
} finally {
# Finish Workflow
$stub->update('throw_or_done', doThrow: false);
}

self::assertSame(2, $stub->getResult());
}

#[Test]
#[DoesNotPerformAssertions]
public static function validationException(
#[Stub('Harness_Update_TaskFailure')]WorkflowStubInterface $stub,
): void {
try {
$stub->update('throw_or_done', true);
throw new \RuntimeException('Expected validation exception');
} catch (WorkflowUpdateException) {
# Expected
} finally {
# Finish Workflow
$stub->update('throw_or_done', doThrow: false);
}
}
}

#[WorkflowInterface]
class FeatureWorkflow
{
private bool $done = false;
private static int $fails = 0;

#[WorkflowMethod('Harness_Update_TaskFailure')]
public function run()
{
yield Workflow::await(fn(): bool => $this->done);

return static::$fails;
}

#[Workflow\UpdateMethod('do_update')]
public function doUpdate(): string
{
# Don't use static variables like this. We do here because we need to fail the task a
# controlled number of times.
if (static::$fails < 2) {
++static::$fails;
throw new class extends \Error {
public function __construct()
{
parent::__construct("I'll fail task");
}
};
}

throw new ApplicationFailure("I'll fail update", 'task-failure', true);
}

#[Workflow\UpdateMethod('throw_or_done')]
public function throwOrDone(bool $doThrow): void
{
$this->done = true;
}

#[Workflow\UpdateValidatorMethod('throw_or_done')]
public function validateThrowOrDone(bool $doThrow): void
{
$doThrow and throw new \RuntimeException('This will fail validation, not task');
}
}
Loading