diff --git a/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java b/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java new file mode 100644 index 0000000000000000000000000000000000000000..ea74daf4991015ae277c287aa2402193e24eaee9 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/ConsumeSignal.java @@ -0,0 +1,4 @@ +package io.takima.temporalpractice.signals; + +public record ConsumeSignal(int amount) { +} diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java b/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java new file mode 100644 index 0000000000000000000000000000000000000000..0f9daf849b048050b542ab4f49173bfc9c709d44 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryActivity.java @@ -0,0 +1,8 @@ +package io.takima.temporalpractice.signals; + + +@ActivityInterface +public interface InventoryActivity { + void reserve(ConsumeSignal input); + +} diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java b/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..db1105d9751e2a4be98e0240afbf2f94065bc38d --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryActivityImpl.java @@ -0,0 +1,26 @@ +package io.takima.temporalpractice.signals; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InventoryActivityImpl implements InventoryActivity { + private static final Logger logger = LoggerFactory.getLogger(InventoryActivityImpl.class); + int counter; + + public InventoryActivityImpl(int inventorySize){ + this.counter = inventorySize; + } + + @Override + public void reserve(ConsumeSignal input) { + int currentValue = counter; + + try { + //Simulate async work + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + counter = currentValue - 1; + logger.info("counter: " + counter); + } +} diff --git a/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java new file mode 100644 index 0000000000000000000000000000000000000000..7e7cd5ba745c9b13d290adcb54ff942a507ff2b2 --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/InventoryWorkflow.java @@ -0,0 +1,10 @@ +package io.takima.temporalpractice.signals; + +@WorkflowInterface +public interface InventoryWorkflow { + @WorkflowMethod + void run(); + @SignalMethod + void reserve(ConsumeSignal input); + +} diff --git a/src/main/java/io/takima/temporalpractice/signals/MSMain.java b/src/main/java/io/takima/temporalpractice/signals/MSMain.java new file mode 100644 index 0000000000000000000000000000000000000000..32daba5c1c0bbf94a161b99d72984499c600c96b --- /dev/null +++ b/src/main/java/io/takima/temporalpractice/signals/MSMain.java @@ -0,0 +1,31 @@ +package io.takima.temporalpractice.signals; + +import static io.takima.temporalpractice.temporal.TemporalUtils.*; +import static io.takima.temporalpractice.temporal.TemporalQueues.*; +public class MSMain { + + // Our inventory will contain 100 elements and we will send 100 signals, lets see how it goes + static int inventoryToDecrement = 100; + + public static void main(String[] args) { + var worker = FACTORY.newWorker(MASTERING_SIGNALS); + worker.registerActivitiesImplementations(new InventoryActivityImpl(inventoryToDecrement)); + worker.registerWorkflowImplementationTypes(InventoryWorkflowImpl.class); + + FACTORY.start(); + + var workflow = CLIENT.newWorkflowStub( + InventoryWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(MASTERING_SIGNALS).setWorkflowId("inventory").build() + ); + WorkflowClient.start(workflow::run); + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + // Submit 100 tasks + for (int i = 1; i < inventoryToDecrement; i++) { + executor.submit(() -> { + workflow.reserve(new ConsumeSignal(1)); + }); + } + } + } +}