diff --git a/clotho-core/src/main/java/com/binarygolem/clotho/mapping/SubgraphPersistor.java b/clotho-core/src/main/java/com/binarygolem/clotho/mapping/SubgraphPersistor.java index 5339d84..499d0e8 100644 --- a/clotho-core/src/main/java/com/binarygolem/clotho/mapping/SubgraphPersistor.java +++ b/clotho-core/src/main/java/com/binarygolem/clotho/mapping/SubgraphPersistor.java @@ -12,6 +12,7 @@ import com.binarygolem.clotho.model.ClothoEntity; import com.binarygolem.clotho.model.ClothoVertex; import com.binarygolem.clotho.model.EntityState; import com.binarygolem.clotho.model.TraversalDirection; +import com.binarygolem.clotho.model.VersionGuard; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; @@ -88,6 +89,116 @@ public final class SubgraphPersistor { } } + /** + * Executes an arbitrary developer-supplied Gremlin update traversal under optimistic + * concurrency protection for the declared {@code guarded} entities. + * + *
Execution proceeds in three Gremlin round-trips: + *
Entities without a {@code @Version} field are silently skipped in steps 1 and 3; + * the user traversal still executes regardless. Entities where the {@code @Version} + * value is {@code null} (not yet persisted) are also skipped. + * + *
After execution, {@link ClothoEntity#clearDirtyFields()} is called on every + * guarded entity to prevent the session's auto-save from re-applying the same + * mutations on close. + * + *
TOCTOU caveat: a narrow gap exists between the pre-flight (step 1) and
+ * the version increment (step 3). This is the same inherent limitation as
+ * {@link #persistAllDirty} and cannot be eliminated without native ArcadeDB
+ * transaction support over the Gremlin WebSocket interface.
+ *
+ * @param userTraversal the arbitrary update traversal to execute; consumed by
+ * {@code .iterate()} — do not reuse after this call
+ * @param guarded the entities whose {@code @Version} fields gate the write
+ * @throws OptimisticLockException if any versioned guarded entity is stale
+ */
+ public void executeVersionedWrite(GraphTraversal, ?> userTraversal,
+ List extends ClothoEntity> guarded) {
+ List Use this when the graph elements being updated are not mapped to a registered
+ * Clotho entity class — for example {@link com.binarygolem.clotho.model.GenericVertex},
+ * {@link com.binarygolem.clotho.model.GenericEdge}, or vertices and edges from labels
+ * with no {@code @VertexType}/{@code @EdgeType} declaration.
+ *
+ * The same three-round-trip execution model and TOCTOU caveat apply.
+ * Because {@link VersionGuard} is a value type, there is no in-memory state to
+ * update after the call — the caller must create a new guard (or reload the entity)
+ * before the next version-checked write.
+ *
+ * @param userTraversal the arbitrary update traversal to execute; consumed — do not reuse
+ * @param guards version expectations for each graph element the traversal touches
+ * @throws OptimisticLockException if any guard's expected version does not match the graph
+ */
+ public void executeVersionedWrite(GraphTraversal, ?> userTraversal,
+ VersionGuard... guards) {
+ List Use this when complex multi-hop queries produce results you intend to mutate:
+ * All guarded entities that carry {@code @Version} are verified before the traversal
+ * fires. A stale version throws {@link com.binarygolem.clotho.exception.OptimisticLockException}
+ * and the traversal is never executed. On success, version fields are incremented and
+ * {@code clearDirtyFields()} is called on every guarded entity so the session's
+ * auto-save on close does not re-apply the same writes.
+ *
+ * Guarded entities must be loaded via the session before being passed here.
+ *
+ * @param session the active Clotho session
+ * @param traversal the update traversal; consumed — do not reuse after this call
+ * @param guarded entities whose {@code @Version} fields gate the write
+ * @throws com.binarygolem.clotho.exception.OptimisticLockException if any versioned
+ * entity is stale
+ */
+ public void executeUpdate(ClothoSession session, GraphTraversal, ?> traversal,
+ ClothoEntity... guarded) {
+ session.executeUpdate(traversal, guarded);
+ }
+
+ /**
+ * Variant of {@link #executeUpdate(ClothoSession, GraphTraversal, ClothoEntity...)}
+ * for graph elements not mapped to a registered Clotho entity class.
+ *
+ * @param session the active Clotho session
+ * @param traversal the update traversal; consumed — do not reuse after this call
+ * @param guards version expectations for each graph element the traversal touches
+ * @throws com.binarygolem.clotho.exception.OptimisticLockException if any guard is stale
+ */
+ public void executeUpdate(ClothoSession session, GraphTraversal, ?> traversal,
+ VersionGuard... guards) {
+ session.executeUpdate(traversal, guards);
+ }
+
/**
* Removes the entity from the graph.
*
diff --git a/clotho-core/src/main/java/com/binarygolem/clotho/session/ClothoSession.java b/clotho-core/src/main/java/com/binarygolem/clotho/session/ClothoSession.java
index 1dc2337..515c7f5 100644
--- a/clotho-core/src/main/java/com/binarygolem/clotho/session/ClothoSession.java
+++ b/clotho-core/src/main/java/com/binarygolem/clotho/session/ClothoSession.java
@@ -12,6 +12,8 @@ import com.binarygolem.clotho.model.ClothoEdge;
import com.binarygolem.clotho.model.ClothoEntity;
import com.binarygolem.clotho.model.ClothoVertex;
import com.binarygolem.clotho.model.EntityState;
+import com.binarygolem.clotho.model.VersionGuard;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.jspecify.annotations.Nullable;
@@ -225,6 +227,67 @@ public class ClothoSession implements AutoCloseable {
return transactional;
}
+ /**
+ * Executes an arbitrary Gremlin update traversal with {@code @Version} pre-flight
+ * checking on the declared {@code guarded} entities.
+ *
+ * All guarded entities that carry {@code @Version} are verified before the traversal
+ * fires. A stale version throws {@link com.binarygolem.clotho.exception.OptimisticLockException}
+ * and the traversal is never executed. On success, version fields are incremented in
+ * both the graph and in memory, and {@code clearDirtyFields()} is called on every
+ * guarded entity so the session's auto-save on close does not re-apply the same writes.
+ *
+ * Guarded entities must be loaded via this session (or explicitly
+ * {@link #register}-ed) before being passed here so that Clotho holds the correct
+ * baseline version.
+ *
+ * Example:
+ * Example — reading the version and writing with OCC on an unregistered vertex:
+ * Entities already cached by this session are skipped, so calling register()
+ * on a graph with shared references is safe and cycle-free.
+ */
+ public void register(ClothoEdge edge) {
if (edge.getId() == null || edgeCache.containsKey(edge.getId())) return;
markManaged(edge);
edgeCache.put(edge.getId(), edge);
diff --git a/clotho-core/src/test/java/com/binarygolem/clotho/mapping/SubgraphPersistorTest.java b/clotho-core/src/test/java/com/binarygolem/clotho/mapping/SubgraphPersistorTest.java
index 243a231..03c99c6 100644
--- a/clotho-core/src/test/java/com/binarygolem/clotho/mapping/SubgraphPersistorTest.java
+++ b/clotho-core/src/test/java/com/binarygolem/clotho/mapping/SubgraphPersistorTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import com.binarygolem.clotho.annotation.Version;
import com.binarygolem.clotho.exception.OptimisticLockException;
import com.binarygolem.clotho.model.EntityState;
+import com.binarygolem.clotho.model.VersionGuard;
import java.util.List;
@@ -488,4 +489,211 @@ class SubgraphPersistorTest {
assertEquals(4L, g.V("vi-6").values("_version").next());
assertEquals(4L, y.version);
}
+
+ // --- executeVersionedWrite with ClothoEntity ---
+
+ @Test
+ void executeVersionedWrite_entity_executesTraversalAndIncrementsVersion() {
+ graph.addVertex(T.id, "ev-1", T.label, "VersionedItem", "label", "original", "_version", 0L);
+
+ VersionedItem item = new VersionedItem();
+ item.setId(ARID.of("ev-1"));
+ item.version = 0L;
+ item.setState(EntityState.MANAGED);
+
+ persistor.executeVersionedWrite(
+ g.V("ev-1").property("label", "from-traversal"),
+ List.of(item));
+
+ assertEquals("from-traversal", g.V("ev-1").values("label").next());
+ assertEquals(1L, g.V("ev-1").values("_version").next(), "version must be incremented in graph");
+ assertEquals(1L, item.version, "in-memory version must be bumped");
+ }
+
+ @Test
+ void executeVersionedWrite_entity_throwsBeforeTraversalOnStaleVersion() {
+ graph.addVertex(T.id, "ev-2", T.label, "VersionedItem", "label", "untouched", "_version", 3L);
+
+ VersionedItem item = new VersionedItem();
+ item.setId(ARID.of("ev-2"));
+ item.version = 1L; // stale — graph has 3
+ item.setState(EntityState.MANAGED);
+
+ assertThrows(OptimisticLockException.class, () ->
+ persistor.executeVersionedWrite(
+ g.V("ev-2").property("label", "should-not-apply"),
+ List.of(item)));
+
+ assertEquals("untouched", g.V("ev-2").values("label").next(), "traversal must not have run");
+ assertEquals(3L, g.V("ev-2").values("_version").next(), "version must be unchanged");
+ }
+
+ @Test
+ void executeVersionedWrite_entity_clearsDirtyFieldsOnSuccess() {
+ graph.addVertex(T.id, "ev-3", T.label, "VersionedItem", "label", "old", "_version", 0L);
+
+ VersionedItem item = new VersionedItem();
+ item.setId(ARID.of("ev-3"));
+ item.version = 0L;
+ item.setState(EntityState.MANAGED);
+ item.setLabel("dirty"); // marks field dirty
+
+ persistor.executeVersionedWrite(
+ g.V("ev-3").property("label", "via-traversal"),
+ List.of(item));
+
+ assertTrue(item.getDirtyFields().isEmpty(), "dirty fields must be cleared after executeVersionedWrite");
+ }
+
+ @Test
+ void executeVersionedWrite_entity_skipsVersionCheckForEntityWithoutVersionField() {
+ // Employee has no @Version — executeVersionedWrite should still execute the traversal
+ graph.addVertex(T.id, "ev-4", T.label, "Employee", "name", "before");
+
+ Employee emp = new Employee();
+ emp.setId(ARID.of("ev-4"));
+ emp.setState(EntityState.MANAGED);
+
+ assertDoesNotThrow(() ->
+ persistor.executeVersionedWrite(
+ g.V("ev-4").property("name", "after"),
+ List.of(emp)));
+
+ assertEquals("after", g.V("ev-4").values("name").next());
+ }
+
+ @Test
+ void executeVersionedWrite_entity_simulatesRaceCondition() {
+ // Both "sessions" load version 0
+ graph.addVertex(T.id, "ev-5", T.label, "VersionedItem", "label", "initial", "_version", 0L);
+
+ VersionedItem sessionA = new VersionedItem();
+ sessionA.setId(ARID.of("ev-5"));
+ sessionA.version = 0L;
+ sessionA.setState(EntityState.MANAGED);
+
+ VersionedItem sessionB = new VersionedItem();
+ sessionB.setId(ARID.of("ev-5"));
+ sessionB.version = 0L; // same stale snapshot
+ sessionB.setState(EntityState.MANAGED);
+
+ // Session A writes first — succeeds, version is now 1 in the graph
+ persistor.executeVersionedWrite(
+ g.V("ev-5").property("label", "written-by-A"),
+ List.of(sessionA));
+ assertEquals(1L, g.V("ev-5").values("_version").next());
+
+ // Session B attempts to write — pre-flight detects stale version 0
+ assertThrows(OptimisticLockException.class, () ->
+ persistor.executeVersionedWrite(
+ g.V("ev-5").property("label", "written-by-B"),
+ List.of(sessionB)));
+
+ // Session A's write survives; session B's traversal never ran
+ assertEquals("written-by-A", g.V("ev-5").values("label").next());
+ assertEquals(1L, g.V("ev-5").values("_version").next());
+ }
+
+ // --- executeVersionedWrite with VersionGuard ---
+
+ @Test
+ void executeVersionedWrite_guard_executesTraversalAndIncrementsVersion() {
+ graph.addVertex(T.id, "gv-1", T.label, "AnyLabel", "status", "pending", "_version", 0L);
+
+ persistor.executeVersionedWrite(
+ g.V("gv-1").property("status", "done"),
+ VersionGuard.onVertex(ARID.of("gv-1"), 0L));
+
+ assertEquals("done", g.V("gv-1").values("status").next());
+ assertEquals(1L, g.V("gv-1").values("_version").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_throwsBeforeTraversalOnStaleVersion() {
+ graph.addVertex(T.id, "gv-2", T.label, "AnyLabel", "status", "original", "_version", 5L);
+
+ assertThrows(OptimisticLockException.class, () ->
+ persistor.executeVersionedWrite(
+ g.V("gv-2").property("status", "overwrite-attempt"),
+ VersionGuard.onVertex(ARID.of("gv-2"), 2L))); // stale — graph has 5
+
+ assertEquals("original", g.V("gv-2").values("status").next(), "traversal must not have run");
+ assertEquals(5L, g.V("gv-2").values("_version").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_worksOnUnregisteredVertexType() {
+ // Label "UnknownThing" has no @VertexType class — VersionGuard works regardless
+ graph.addVertex(T.id, "gv-3", T.label, "UnknownThing", "value", "old", "_version", 0L);
+
+ assertDoesNotThrow(() ->
+ persistor.executeVersionedWrite(
+ g.V("gv-3").property("value", "new"),
+ VersionGuard.onVertex(ARID.of("gv-3"), 0L)));
+
+ assertEquals("new", g.V("gv-3").values("value").next());
+ assertEquals(1L, g.V("gv-3").values("_version").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_worksOnEdge() {
+ var empV = graph.addVertex(T.id, "gv-emp", T.label, "Employee");
+ var coV = graph.addVertex(T.id, "gv-co", T.label, "Company");
+ empV.addEdge("WORKS_AT", coV, T.id, "gv-e1", "role", "junior", "_version", 0L);
+
+ persistor.executeVersionedWrite(
+ g.E("gv-e1").property("role", "senior"),
+ VersionGuard.onEdge(ARID.of("gv-e1"), 0L));
+
+ assertEquals("senior", g.E("gv-e1").values("role").next());
+ assertEquals(1L, g.E("gv-e1").values("_version").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_customVersionProperty() {
+ graph.addVertex(T.id, "gv-4", T.label, "AnyLabel", "data", "before", "rev", 7L);
+
+ persistor.executeVersionedWrite(
+ g.V("gv-4").property("data", "after"),
+ VersionGuard.onVertex(ARID.of("gv-4"), "rev", 7L));
+
+ assertEquals("after", g.V("gv-4").values("data").next());
+ assertEquals(8L, g.V("gv-4").values("rev").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_multipleGuards_allPassOrNoneFire() {
+ graph.addVertex(T.id, "gv-5", T.label, "AnyLabel", "x", "A", "_version", 1L);
+ graph.addVertex(T.id, "gv-6", T.label, "AnyLabel", "x", "B", "_version", 2L);
+
+ // Both versions correct — traversal fires, both versions increment
+ persistor.executeVersionedWrite(
+ g.V("gv-5").property("x", "A2").V("gv-6").property("x", "B2"),
+ VersionGuard.onVertex(ARID.of("gv-5"), 1L),
+ VersionGuard.onVertex(ARID.of("gv-6"), 2L));
+
+ assertEquals("A2", g.V("gv-5").values("x").next());
+ assertEquals(2L, g.V("gv-5").values("_version").next());
+ assertEquals("B2", g.V("gv-6").values("x").next());
+ assertEquals(3L, g.V("gv-6").values("_version").next());
+ }
+
+ @Test
+ void executeVersionedWrite_guard_simulatesRaceCondition() {
+ graph.addVertex(T.id, "gv-7", T.label, "AnyLabel", "val", "start", "_version", 0L);
+
+ // Writer A reads version 0 and succeeds
+ persistor.executeVersionedWrite(
+ g.V("gv-7").property("val", "A-wins"),
+ VersionGuard.onVertex(ARID.of("gv-7"), 0L));
+ assertEquals(1L, g.V("gv-7").values("_version").next());
+
+ // Writer B had also read version 0 — now stale
+ assertThrows(OptimisticLockException.class, () ->
+ persistor.executeVersionedWrite(
+ g.V("gv-7").property("val", "B-loses"),
+ VersionGuard.onVertex(ARID.of("gv-7"), 0L)));
+
+ assertEquals("A-wins", g.V("gv-7").values("val").next(), "B's traversal must not have run");
+ }
}
diff --git a/clotho-dev-app/pom.xml b/clotho-dev-app/pom.xml
index 8f38ce1..f3ebcfe 100644
--- a/clotho-dev-app/pom.xml
+++ b/clotho-dev-app/pom.xml
@@ -18,6 +18,10 @@
exploratory testing during library development. Not a production artifact.
+ {@code
+ * try (ClothoSession session = sessionFactory.openSession()) {
+ * List
+ */
+ @SuppressWarnings("unchecked")
+ public List executeSubgraph(ClothoSession session, Class subgraphType,
+ GraphTraversal, ?> traversal) {
+ return traversal.toList().stream()
+ .map(item -> {
+ if (!(item instanceof Map, ?> m)) {
+ throw new ClothoException(
+ "executeSubgraph() expected Map results but got: "
+ + item.getClass().getName());
+ }
+ S assembled = assembler.assemble(subgraphType, (Map{@code
+ * Order order = session.load(Order.class, orderId);
+ * Account acct = session.load(Account.class, accountId);
+ *
+ * session.executeUpdate(
+ * g().V(orderId.asString()).property("status", "SHIPPED")
+ * .sideEffect(__.inV("PLACED_BY").property("orderCount", newCount)),
+ * order, acct
+ * );
+ * }
+ *
+ * @param traversal the update traversal; consumed — do not reuse after this call
+ * @param guarded entities whose {@code @Version} fields gate the write
+ * @throws com.binarygolem.clotho.exception.OptimisticLockException if any versioned
+ * entity is stale
+ */
+ public void executeUpdate(GraphTraversal, ?> traversal, ClothoEntity... guarded) {
+ persistor.executeVersionedWrite(traversal, List.of(guarded));
+ }
+
+ /**
+ * Variant of {@link #executeUpdate(GraphTraversal, ClothoEntity...)} for graph elements
+ * that are not mapped to a registered Clotho entity class — such as
+ * {@link com.binarygolem.clotho.model.GenericVertex},
+ * {@link com.binarygolem.clotho.model.GenericEdge}, or any vertex/edge from a label
+ * with no {@code @VertexType}/{@code @EdgeType} declaration.
+ *
+ * {@code
+ * ARID id = ARID.of("#10:0");
+ * long ver = (long) executeRaw(g().V(id.asString()).values("_version")).get(0);
+ *
+ * session.executeUpdate(
+ * g().V(id.asString()).property("status", "archived"),
+ * VersionGuard.onVertex(id, ver)
+ * );
+ * }
+ *
+ * @param traversal the update traversal; consumed — do not reuse after this call
+ * @param guards version expectations for each graph element the traversal touches
+ * @throws com.binarygolem.clotho.exception.OptimisticLockException if any guard is stale
+ */
+ public void executeUpdate(GraphTraversal, ?> traversal, VersionGuard... guards) {
+ persistor.executeVersionedWrite(traversal, guards);
+ }
+
/**
* Evicts all loaded objects from the session identity map without touching the database.
* Forces a fresh graph fetch on the next {@link #load} call for any previously-loaded ID.
@@ -305,15 +368,23 @@ public class ClothoSession implements AutoCloseable {
Object value = readField(field, vertex);
if (value instanceof List> list) {
for (Object item : list) {
- if (item instanceof ClothoEdge e) registerEdge(e);
+ if (item instanceof ClothoEdge e) register(e);
else if (item instanceof ClothoVertex v) register(v);
}
- } else if (value instanceof ClothoEdge e) registerEdge(e);
+ } else if (value instanceof ClothoEdge e) register(e);
else if (value instanceof ClothoVertex v) register(v);
}
}
- private void registerEdge(ClothoEdge edge) {
+ /**
+ * Registers an already-hydrated edge (and its {@code @OutVertex}/{@code @InVertex}
+ * vertices) into this session's identity map as MANAGED.
+ * After this call, dirty-field tracking and auto-save-on-close apply to the edge.
+ *
+ *