From eb4eaddfda77b6f590c8ff00c8b236a47c44b4f1 Mon Sep 17 00:00:00 2001 From: Damien MARCHAT <dmarchat@takima.fr> Date: Mon, 10 Mar 2025 11:39:10 +0100 Subject: [PATCH] wip --- pom.xml | 7 +- .../temporalpractice/bakery/BakeService.java | 4 -- .../bakery/BakeryApplication.java | 11 +++ .../bakery/BatterService.java | 4 -- .../bakery/BestCookieWorkflow.java | 7 -- .../bakery/CookieOrderRunner.java | 44 ++++++++++++ .../temporalpractice/bakery/KitchenApp.java | 5 -- .../bakery/bake/BakeService.java | 26 +++++++ .../bakery/bake/BakeServiceImpl.java | 26 +++++++ .../bakery/bake/BakeWorker.java | 11 +++ .../bakery/batter/BatterService.java | 24 +++++++ .../bakery/batter/BatterServiceImpl.java | 18 +++++ .../bakery/batter/BatterWorker.java | 11 +++ .../bakery/config/BakeWorkerConfig.java | 19 ++++++ .../bakery/config/BatterWorkerConfig.java | 19 ++++++ .../bakery/config/CookieWorkerConfig.java | 20 ++++++ .../bakery/config/TemporalConfig.java | 38 +++++++++++ .../bakery/cookie/BestCookieWorkflow.java | 61 +++++++++++++++++ .../bakery/cookie/CookieWorker.java | 11 +++ .../bakery/cookie/CookieWorkflow.java | 63 +++++++++++++++++ .../bakery/cookie/OrderCookie.java | 32 +++++++++ .../bakery/cookie/OrderManyCookies.java | 67 +++++++++++++++++++ .../bakery/temporal/TemporalUtils.java | 21 ++++++ .../signals/ConsumeSignal.java | 4 ++ .../signals/InventoryActivity.java | 1 - .../signals/InventoryActivityImpl.java | 3 +- .../signals/InventoryWorkflow.java | 8 ++- .../signals/InventoryWorkflowImpl.java | 45 +++++++++++++ .../temporalpractice/signals/MSMain.java | 17 ++--- .../temporal/TemporalUtils.java | 38 +++++++++++ 30 files changed, 627 insertions(+), 38 deletions(-) delete mode 100644 src/main/java/io/takima/temporalpractice/bakery/BakeService.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/BakeryApplication.java delete mode 100644 src/main/java/io/takima/temporalpractice/bakery/BatterService.java delete mode 100644 src/main/java/io/takima/temporalpractice/bakery/BestCookieWorkflow.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/CookieOrderRunner.java delete mode 100644 src/main/java/io/takima/temporalpractice/bakery/KitchenApp.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/bake/BakeService.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/bake/BakeServiceImpl.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/bake/BakeWorker.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/batter/BatterService.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/batter/BatterServiceImpl.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/batter/BatterWorker.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/config/BakeWorkerConfig.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/config/BatterWorkerConfig.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/config/CookieWorkerConfig.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/config/TemporalConfig.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/cookie/BestCookieWorkflow.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorker.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorkflow.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/cookie/OrderCookie.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/cookie/OrderManyCookies.java create mode 100644 src/main/java/io/takima/temporalpractice/bakery/temporal/TemporalUtils.java create mode 100644 src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java create mode 100644 src/main/java/io/takima/temporalpractice/signals/InventoryWorkflowImpl.java create mode 100644 src/main/java/io/takima/temporalpractice/temporal/TemporalUtils.java diff --git a/pom.xml b/pom.xml index a1fb653..a6c3baa 100644 --- a/pom.xml +++ b/pom.xml @@ -22,11 +22,10 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - <scope>test</scope> + <groupId>io.temporal</groupId> + <artifactId>temporal-spring-boot-starter</artifactId> + <version>1.27.1</version> </dependency> </dependencies> diff --git a/src/main/java/io/takima/temporalpractice/bakery/BakeService.java b/src/main/java/io/takima/temporalpractice/bakery/BakeService.java deleted file mode 100644 index 398892c..0000000 --- a/src/main/java/io/takima/temporalpractice/bakery/BakeService.java +++ /dev/null @@ -1,4 +0,0 @@ -package io.takima.temporalpractice.bakery; - -public class BakeService { -} diff --git a/src/main/java/io/takima/temporalpractice/bakery/BakeryApplication.java b/src/main/java/io/takima/temporalpractice/bakery/BakeryApplication.java new file mode 100644 index 0000000..c505370 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/BakeryApplication.java @@ -0,0 +1,11 @@ +package io.takima.temporalpractice.bakery; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class BakeryApplication { + public static void main(String[] args) { + SpringApplication.run(BakeryApplication.class, args); + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/BatterService.java b/src/main/java/io/takima/temporalpractice/bakery/BatterService.java deleted file mode 100644 index bc5a77f..0000000 --- a/src/main/java/io/takima/temporalpractice/bakery/BatterService.java +++ /dev/null @@ -1,4 +0,0 @@ -package io.takima.temporalpractice.bakery; - -public class BatterService { -} diff --git a/src/main/java/io/takima/temporalpractice/bakery/BestCookieWorkflow.java b/src/main/java/io/takima/temporalpractice/bakery/BestCookieWorkflow.java deleted file mode 100644 index 6b92a52..0000000 --- a/src/main/java/io/takima/temporalpractice/bakery/BestCookieWorkflow.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.takima.temporalpractice.bakery; - -public class BestCookieWorkflow { - public void orderCookie() { - System.out.println("Cookie ordered... Yuum"); - } -} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/CookieOrderRunner.java b/src/main/java/io/takima/temporalpractice/bakery/CookieOrderRunner.java new file mode 100644 index 0000000..8002119 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/CookieOrderRunner.java @@ -0,0 +1,44 @@ +package io.takima.temporalpractice.bakery; + + +import io.takima.temporalpractice.bakery.cookie.CookieWorkflow; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutionException; + +@Component +public class CookieOrderRunner implements CommandLineRunner { + + private final WorkflowClient workflowClient; + + public CookieOrderRunner(WorkflowClient workflowClient) { + this.workflowClient = workflowClient; + } + + @Override + public void run(String... args) { + + // Create a stub for CookieWorkflow on the "cookie" task queue + CookieWorkflow cookieWorkflow = workflowClient.newWorkflowStub( + CookieWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("best-cookie") + .setTaskQueue("cookie") // Must match your Worker config + .build() + ); + + // Define your preferences + CookieWorkflow.CookiePreference preferences = new CookieWorkflow.CookiePreference(3, CookieWorkflow.Baking.SOFT, CookieWorkflow.Topping.CHOCOLATE); + System.out.println("Ordering cookies:\n" + preferences); + + // Start the workflow asynchronously + var orderFuture = WorkflowClient.start(cookieWorkflow::orderCookie, preferences); + + // Signal that the oven is ready + cookieWorkflow.ovenReady(); + + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/KitchenApp.java b/src/main/java/io/takima/temporalpractice/bakery/KitchenApp.java deleted file mode 100644 index b824818..0000000 --- a/src/main/java/io/takima/temporalpractice/bakery/KitchenApp.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.takima.temporalpractice.bakery; - -public class KitchenApp { - public static void main(String[] args) {} -} diff --git a/src/main/java/io/takima/temporalpractice/bakery/bake/BakeService.java b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeService.java new file mode 100644 index 0000000..9f6d4a0 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeService.java @@ -0,0 +1,26 @@ +package io.takima.temporalpractice.bakery.bake; + +import io.takima.temporalpractice.bakery.batter.BatterService; +import io.takima.temporalpractice.bakery.cookie.CookieWorkflow; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +import java.util.List; + +@ActivityInterface +public interface BakeService { + + record Input( + BatterService.Batter batter, + CookieWorkflow.Baking baking + ) { + } + + record Output( + List<CookieWorkflow.Cookie> cookies + ) { + } + + @ActivityMethod + Output bake(Input input); +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/bake/BakeServiceImpl.java b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeServiceImpl.java new file mode 100644 index 0000000..a2af6fa --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeServiceImpl.java @@ -0,0 +1,26 @@ +package io.takima.temporalpractice.bakery.bake; + +import io.takima.temporalpractice.bakery.cookie.CookieWorkflow; +import io.temporal.spring.boot.ActivityImpl; +import org.springframework.stereotype.Service; + +import java.util.stream.IntStream; + +import static io.takima.temporalpractice.bakery.temporal.TemporalUtils.BAKE_QUEUE; +import static io.takima.temporalpractice.bakery.temporal.TemporalUtils.BATTER_QUEUE; + + +@Service +public class BakeServiceImpl implements BakeService { + public Output bake(Input input) { + + var targetQuantity = input.batter().quantityInGrams() / 30; + var cookies = IntStream.range(0, targetQuantity) + .mapToObj((i) -> new CookieWorkflow.Cookie(input.baking(), input.batter().topping())) + .toList(); + System.out.println("Baked cookies\n" + cookies); + + return new Output(cookies); + } + +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/bake/BakeWorker.java b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeWorker.java new file mode 100644 index 0000000..7595a26 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/bake/BakeWorker.java @@ -0,0 +1,11 @@ +package io.takima.temporalpractice.bakery.bake; + +import io.takima.temporalpractice.bakery.temporal.TemporalUtils; + +public class BakeWorker { + public static void main(String[] args) { + TemporalUtils.FACTORY.newWorker(TemporalUtils.BAKE_QUEUE) + .registerActivitiesImplementations(new BakeServiceImpl()); + TemporalUtils.FACTORY.start(); + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/batter/BatterService.java b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterService.java new file mode 100644 index 0000000..018ffde --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterService.java @@ -0,0 +1,24 @@ +package io.takima.temporalpractice.bakery.batter; + +import io.takima.temporalpractice.bakery.cookie.CookieWorkflow; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +public interface BatterService { + + record Input( + CookieWorkflow.Topping topping, + int amount + ) { + } + + record Batter( + CookieWorkflow.Topping topping, + int quantityInGrams + ) { + } + + @ActivityMethod + Batter prepareCookieBatter(Input input); +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/batter/BatterServiceImpl.java b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterServiceImpl.java new file mode 100644 index 0000000..f5d8a6e --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterServiceImpl.java @@ -0,0 +1,18 @@ +package io.takima.temporalpractice.bakery.batter; + +import io.temporal.spring.boot.ActivityImpl; +import org.springframework.stereotype.Service; + +import static io.takima.temporalpractice.bakery.temporal.TemporalUtils.BATTER_QUEUE; + + +@Service +public class BatterServiceImpl implements BatterService { + public Batter prepareCookieBatter(Input input) { + + var batter = new Batter(input.topping(), input.amount() * 30); + System.out.println("Prepared cookie batter\n" + batter); + + return new Batter(input.topping(), input.amount() * 30); + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/batter/BatterWorker.java b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterWorker.java new file mode 100644 index 0000000..2dd280d --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/batter/BatterWorker.java @@ -0,0 +1,11 @@ +package io.takima.temporalpractice.bakery.batter; + +import io.takima.temporalpractice.bakery.temporal.TemporalUtils; + +public class BatterWorker { + public static void main(String[] args) { + TemporalUtils.FACTORY.newWorker(TemporalUtils.BATTER_QUEUE) + .registerActivitiesImplementations(new BatterServiceImpl()); + TemporalUtils.FACTORY.start(); + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/config/BakeWorkerConfig.java b/src/main/java/io/takima/temporalpractice/bakery/config/BakeWorkerConfig.java new file mode 100644 index 0000000..b75be68 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/config/BakeWorkerConfig.java @@ -0,0 +1,19 @@ +package io.takima.temporalpractice.bakery.config; + +import io.takima.temporalpractice.bakery.bake.BakeServiceImpl; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static io.takima.temporalpractice.bakery.config.TemporalConfig.BAKE_QUEUE; + +@Configuration +public class BakeWorkerConfig { + @Bean + public Worker bakeWorker(WorkerFactory workerFactory, BakeServiceImpl bakeServiceImpl) { + Worker worker = workerFactory.newWorker(BAKE_QUEUE); + worker.registerActivitiesImplementations(bakeServiceImpl); + return worker; + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/config/BatterWorkerConfig.java b/src/main/java/io/takima/temporalpractice/bakery/config/BatterWorkerConfig.java new file mode 100644 index 0000000..2ffa7a1 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/config/BatterWorkerConfig.java @@ -0,0 +1,19 @@ +package io.takima.temporalpractice.bakery.config; + +import io.takima.temporalpractice.bakery.batter.BatterServiceImpl; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static io.takima.temporalpractice.bakery.config.TemporalConfig.BATTER_QUEUE; + +@Configuration +public class BatterWorkerConfig { + @Bean + public Worker batterWorker(WorkerFactory workerFactory, BatterServiceImpl batterServiceImpl) { + Worker worker = workerFactory.newWorker(BATTER_QUEUE); + worker.registerActivitiesImplementations(batterServiceImpl); + return worker; + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/config/CookieWorkerConfig.java b/src/main/java/io/takima/temporalpractice/bakery/config/CookieWorkerConfig.java new file mode 100644 index 0000000..1c42388 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/config/CookieWorkerConfig.java @@ -0,0 +1,20 @@ +package io.takima.temporalpractice.bakery.config; + +import io.takima.temporalpractice.bakery.cookie.BestCookieWorkflow; + +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static io.takima.temporalpractice.bakery.config.TemporalConfig.COOKIE_QUEUE; + +@Configuration +public class CookieWorkerConfig { + @Bean + public Worker cookieWorker(WorkerFactory workerFactory) { + Worker worker = workerFactory.newWorker(COOKIE_QUEUE); + worker.registerWorkflowImplementationTypes(BestCookieWorkflow.class); + return worker; + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/config/TemporalConfig.java b/src/main/java/io/takima/temporalpractice/bakery/config/TemporalConfig.java new file mode 100644 index 0000000..af85ba9 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/config/TemporalConfig.java @@ -0,0 +1,38 @@ +package io.takima.temporalpractice.bakery.config; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; + +@Configuration +public class TemporalConfig { + + public static final String COOKIE_QUEUE = "cookie"; + public static final String BATTER_QUEUE = "batter"; + public static final String BAKE_QUEUE = "bake"; + + @Bean + public WorkflowServiceStubs workflowServiceStubs() { + return WorkflowServiceStubs.newLocalServiceStubs(); + } + + @Bean + public WorkflowClient workflowClient(WorkflowServiceStubs serviceStubs) { + return WorkflowClient.newInstance(serviceStubs); + } + + @Bean + public WorkerFactory workerFactory(WorkflowClient workflowClient) { + return WorkerFactory.newInstance(workflowClient); + } + + @EventListener + public void startFactory(ContextRefreshedEvent event){ + WorkerFactory factory = (WorkerFactory) event.getApplicationContext().getBean("workerFactory"); + factory.start(); + } +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/bakery/cookie/BestCookieWorkflow.java b/src/main/java/io/takima/temporalpractice/bakery/cookie/BestCookieWorkflow.java new file mode 100644 index 0000000..fcb3acf --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/cookie/BestCookieWorkflow.java @@ -0,0 +1,61 @@ +package io.takima.temporalpractice.bakery.cookie; + +import io.takima.temporalpractice.bakery.bake.BakeService; +import io.takima.temporalpractice.bakery.batter.BatterService; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; + +import java.time.Duration; + +import static io.takima.temporalpractice.bakery.temporal.TemporalUtils.*; + +public class BestCookieWorkflow implements CookieWorkflow { + + private boolean ovenReady = false; + private CookiePreference cookiePreference; + + public CookieOrder orderCookie(CookiePreference cookiePreference) { + this.cookiePreference = cookiePreference; + var freshTimer = Workflow.newTimer(Duration.ofSeconds(30)); + + var batter = batterService.prepareCookieBatter( + new BatterService.Input(cookiePreference.topping(), cookiePreference.amount()) + ); + + Workflow.await(() -> ovenReady); + + var cookies = bakeService + .bake(new BakeService.Input(batter, cookiePreference.baking())) + .cookies(); + + freshTimer.get(); + + return new CookieOrder(cookies); + } + + public void ovenReady() { + ovenReady = true; + } + + public OrderStatus getOrderStatus() { + return new OrderStatus( + ovenReady, + cookiePreference + ); + } + + private final BatterService batterService = Workflow.newActivityStub( + BatterService.class, + ActivityOptions.newBuilder() + .setTaskQueue(BATTER_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build() + ); + + private final BakeService bakeService = Workflow.newActivityStub(BakeService.class, + ActivityOptions.newBuilder() + .setTaskQueue(BAKE_QUEUE) + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build() + ); +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorker.java b/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorker.java new file mode 100644 index 0000000..1c3f35f --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorker.java @@ -0,0 +1,11 @@ +package io.takima.temporalpractice.bakery.cookie; + +import io.takima.temporalpractice.bakery.temporal.TemporalUtils; + +public class CookieWorker { + public static void main(String[] args) { + TemporalUtils.FACTORY.newWorker(TemporalUtils.COOKIE_QUEUE) + .registerWorkflowImplementationTypes(BestCookieWorkflow.class); + TemporalUtils.FACTORY.start(); + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorkflow.java b/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorkflow.java new file mode 100644 index 0000000..2da1e74 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/cookie/CookieWorkflow.java @@ -0,0 +1,63 @@ +package io.takima.temporalpractice.bakery.cookie; + +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +import java.util.List; + +@WorkflowInterface +public interface CookieWorkflow { + + @WorkflowMethod + CookieOrder orderCookie(CookiePreference cookiePreference); + + @SignalMethod + void ovenReady(); + + @QueryMethod + OrderStatus getOrderStatus(); + + + enum Baking { + RAW, + SOFT, + COOKED, + BURNT + + } + + enum Topping { + CHOCOLATE, + NUTS, + NONE + } + + record CookiePreference( + int amount, + Baking baking, + Topping topping + ) { + + } + + record CookieOrder( + List<Cookie> cookies + ) { + + } + + record Cookie( + Baking baking, + Topping topping + ) { + + } + + record OrderStatus( + boolean ovenReady, + CookiePreference preference + ) { + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderCookie.java b/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderCookie.java new file mode 100644 index 0000000..26fdcf0 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderCookie.java @@ -0,0 +1,32 @@ +package io.takima.temporalpractice.bakery.cookie; + +import io.takima.temporalpractice.bakery.cookie.CookieWorkflow.CookiePreference; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.takima.temporalpractice.bakery.temporal.TemporalUtils; + +import java.util.concurrent.ExecutionException; + +public class OrderCookie { + public static void main(String[] args) { + CookieWorkflow cookieWorkflow = TemporalUtils.CLIENT.newWorkflowStub( + CookieWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId("best-cookie").setTaskQueue(TemporalUtils.COOKIE_QUEUE).build() + ); + + var preferences = new CookiePreference(3, CookieWorkflow.Baking.SOFT, CookieWorkflow.Topping.CHOCOLATE); + + System.out.println("Ordering cookies\n" + preferences); + var order = WorkflowClient.execute(cookieWorkflow::orderCookie, preferences); + cookieWorkflow.ovenReady(); + try { + var result = order.get(); + System.out.println("Received order: " + result); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderManyCookies.java b/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderManyCookies.java new file mode 100644 index 0000000..63dcbf4 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/cookie/OrderManyCookies.java @@ -0,0 +1,67 @@ +package io.takima.temporalpractice.bakery.cookie; + +import io.temporal.api.enums.v1.ParentClosePolicy; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.workflow.*; +import org.slf4j.Logger; +import io.takima.temporalpractice.bakery.temporal.TemporalUtils; + +import java.time.Duration; + +public class OrderManyCookies { + public static void main(String[] args) { + var queue = "manycookies"; + + TemporalUtils.FACTORY.newWorker(queue) + .registerWorkflowImplementationTypes(ManyCookiesImpl.class); + + TemporalUtils.FACTORY.start(); + + try { + ManyCookies manyCookies = TemporalUtils.CLIENT.newWorkflowStub( + ManyCookies.class, + WorkflowOptions.newBuilder().setWorkflowId("cookie-orderer").setTaskQueue(queue).build() + ); + WorkflowClient.start(manyCookies::continuousOrders); + } catch (Exception e) { + System.out.println("Many cookies already running, skipping..."); + } + } + + public static class ManyCookiesImpl implements ManyCookies { + + private static final Logger logger = Workflow.getLogger(ManyCookiesImpl.class); + + @Override + public void continuousOrders() { + + var options = ChildWorkflowOptions.newBuilder() + .setWorkflowId("order-" + Workflow.randomUUID()) + .setTaskQueue(TemporalUtils.COOKIE_QUEUE) + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) + .build(); + var orderWorkflow = Workflow.newChildWorkflowStub(CookieWorkflow.class, options); + + var quantity = Workflow.newRandom().nextInt(1, 5); + var baking = CookieWorkflow.Baking.values()[Workflow.newRandom().nextInt(CookieWorkflow.Baking.values().length)]; + var topping = CookieWorkflow.Topping.values()[Workflow.newRandom().nextInt(CookieWorkflow.Topping.values().length)]; + + var cookiesPreferences = new CookieWorkflow.CookiePreference(quantity, baking, topping); + + logger.info("Ordering cookies\n{}", cookiesPreferences); + Async.function(orderWorkflow::orderCookie, cookiesPreferences); + + Workflow.sleep(Duration.ofSeconds(10)); + orderWorkflow.ovenReady(); + + Workflow.continueAsNew(); + } + } + + @WorkflowInterface + public interface ManyCookies { + @WorkflowMethod + void continuousOrders(); + } +} diff --git a/src/main/java/io/takima/temporalpractice/bakery/temporal/TemporalUtils.java b/src/main/java/io/takima/temporalpractice/bakery/temporal/TemporalUtils.java new file mode 100644 index 0000000..8dffad6 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/bakery/temporal/TemporalUtils.java @@ -0,0 +1,21 @@ +package io.takima.temporalpractice.bakery.temporal; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; + +public class TemporalUtils { + + public static final String COOKIE_QUEUE = "cookie"; + public static final String BATTER_QUEUE = "batter"; + public static final String BAKE_QUEUE = "bake"; + + + // Represents the connection to your local cluster. For now, lets keep it simple + public static final WorkflowServiceStubs SERVICE_STUBS = WorkflowServiceStubs.newLocalServiceStubs(); + + // Your key for interacting with the Temporal world. + public static final WorkflowClient CLIENT = WorkflowClient.newInstance(SERVICE_STUBS); + + public static final WorkerFactory FACTORY = WorkerFactory.newInstance(CLIENT); +} diff --git a/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java b/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java new file mode 100644 index 0000000..6109a9e --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java @@ -0,0 +1,4 @@ +package io.takima.temporalpractice.signals; + +public record ConsumeSignal(int amount) { +} \ No newline at end of file diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java b/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java index b30a78a..867ca2b 100644 --- a/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java @@ -1,5 +1,4 @@ package io.takima.temporalpractice.signals; -import io.takima.temporalpractice.signals.InventoryWorkflow.ConsumeSignal; import io.temporal.activity.ActivityInterface; @ActivityInterface diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java b/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java index 427e815..989f9eb 100644 --- a/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java @@ -1,5 +1,4 @@ package io.takima.temporalpractice.signals; -import io.takima.temporalpractice.signals.InventoryWorkflow.ConsumeSignal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +16,7 @@ public class InventoryActivityImpl implements InventoryActivity { try { //Simulate async work - Thread.sleep(100); + Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java index 0a9894a..ee878fa 100644 --- a/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java @@ -4,12 +4,14 @@ import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.util.LinkedList; +import java.util.List; + @WorkflowInterface public interface InventoryWorkflow { @WorkflowMethod - void run(); + void run(LinkedList<ConsumeSignal> updates); @SignalMethod void reserve(ConsumeSignal input); - record ConsumeSignal(int amount) { - } + } diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflowImpl.java b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflowImpl.java new file mode 100644 index 0000000..9279ab7 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflowImpl.java @@ -0,0 +1,45 @@ +package io.takima.temporalpractice.signals; + +import io.takima.temporalpractice.temporal.TemporalUtils; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowQueue; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +public class InventoryWorkflowImpl implements InventoryWorkflow{ + private final WorkflowQueue<ConsumeSignal> requests = Workflow.newWorkflowQueue(Integer.MAX_VALUE); + private final InventoryActivity inventoryActivity = TemporalUtils.activityStub(InventoryActivity.class, TemporalUtils.MASTERING_SIGNALS_QUEUE); + + @Override + public void run(LinkedList<ConsumeSignal> updates) { + updates.forEach(requests::offer); + + var info = Workflow.getInfo(); + + while (!info.isContinueAsNewSuggested()) { + var request = requests.poll(Duration.ofMinutes(1)); + if (request == null) return; + inventoryActivity.reserve(request); + } + + List<ConsumeSignal> leftoverRequests = drainQueue(requests); + Workflow.continueAsNew(leftoverRequests); + } + + + + @Override + public void reserve(ConsumeSignal input) { + requests.offer(input); + } + private List<ConsumeSignal> drainQueue(WorkflowQueue<ConsumeSignal> queue) { + List<ConsumeSignal> result = new LinkedList<>(); + ConsumeSignal next; + while ((next = queue.poll()) != null) { + result.add(next); + } + return result; + } +} diff --git a/src/main/java/io/takima/temporalpractice/signals/MSMain.java b/src/main/java/io/takima/temporalpractice/signals/MSMain.java index 8a1ea5d..34b9c83 100644 --- a/src/main/java/io/takima/temporalpractice/signals/MSMain.java +++ b/src/main/java/io/takima/temporalpractice/signals/MSMain.java @@ -2,7 +2,8 @@ package io.takima.temporalpractice.signals; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; - +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -10,7 +11,7 @@ import static io.takima.temporalpractice.temporal.TemporalUtils.*; public class MSMain { - // Our inventory will contain 1000 elements and we will send 1000 signals, lets see how it goes + // Our inventory will contain 100 elements and we will send 100 signals, lets see how it goes static int inventoryToDecrement = 1000; public static void main(String[] args) { @@ -24,17 +25,17 @@ public class MSMain { InventoryWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(MASTERING_SIGNALS_QUEUE).setWorkflowId("inventory").build() ); - WorkflowClient.start(workflow::run); try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { - // Submit 1000 tasks + // Submit 100 tasks for (int i = 1; i < inventoryToDecrement; i++) { - executor.submit(() -> {; - workflow.reserve(new InventoryWorkflow.ConsumeSignal(1)); - + executor.submit(() -> { + var request = CLIENT.newSignalWithStartRequest(); + request.add(workflow::run, new LinkedList<>(List.of(new ConsumeSignal(3)))); + request.add(workflow::reserve, new ConsumeSignal(1)); + CLIENT.signalWithStart(request); }); } } - } } diff --git a/src/main/java/io/takima/temporalpractice/temporal/TemporalUtils.java b/src/main/java/io/takima/temporalpractice/temporal/TemporalUtils.java new file mode 100644 index 0000000..5479688 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/temporal/TemporalUtils.java @@ -0,0 +1,38 @@ +package io.takima.temporalpractice.temporal; + +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; + +import java.time.Duration; + +public class TemporalUtils { + + public static final String COOKIE_QUEUE = "cookie"; + public static final String BATTER_QUEUE = "batter"; + public static final String BAKE_QUEUE = "bake"; + public static final String MASTERING_SIGNALS_QUEUE = "mastering-signals"; + + // Represents the connection to your local cluster. For now, lets keep it simple + public static final WorkflowServiceStubs SERVICE_STUBS = WorkflowServiceStubs.newLocalServiceStubs(); + + // Your key for interacting with the Temporal world. + public static final WorkflowClient CLIENT = WorkflowClient.newInstance(SERVICE_STUBS); + public static final WorkerFactoryOptions factoryOptions = WorkerFactoryOptions.newBuilder() + .setUsingVirtualWorkflowThreads(true) + .build(); + public static final WorkerFactory FACTORY = WorkerFactory.newInstance(CLIENT); + + public static <T> T activityStub(Class<T> activityInterface, String queue) { + return Workflow.newActivityStub( + activityInterface, + ActivityOptions.newBuilder() + .setTaskQueue(queue) + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build() + ); + } +} -- GitLab