diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index f3d89f95c..0693dc6dc 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -37,32 +37,34 @@ import net.spy.memcached.collection.BTreeCreate; import net.spy.memcached.collection.BTreeGet; import net.spy.memcached.collection.BTreeGetBulk; -import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey; import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey; -import net.spy.memcached.collection.BTreeUpdate; -import net.spy.memcached.collection.BTreeUpsert; -import net.spy.memcached.collection.CollectionUpdate; -import net.spy.memcached.internal.result.GetsResultImpl; -import net.spy.memcached.ops.BTreeGetBulkOperation; -import net.spy.memcached.collection.BTreeSMGet; -import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey; -import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey; -import net.spy.memcached.ops.BTreeSortMergeGetOperation; +import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey; import net.spy.memcached.collection.BTreeInsert; import net.spy.memcached.collection.BTreeInsertAndGet; +import net.spy.memcached.collection.BTreeMutate; +import net.spy.memcached.collection.BTreeSMGet; +import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey; +import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey; +import net.spy.memcached.collection.BTreeUpdate; +import net.spy.memcached.collection.BTreeUpsert; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.CollectionCreate; import net.spy.memcached.collection.CollectionInsert; +import net.spy.memcached.collection.CollectionMutate; +import net.spy.memcached.collection.CollectionUpdate; import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.internal.result.GetsResultImpl; import net.spy.memcached.ops.APIType; +import net.spy.memcached.ops.BTreeGetBulkOperation; import net.spy.memcached.ops.BTreeInsertAndGetOperation; +import net.spy.memcached.ops.BTreeSortMergeGetOperation; import net.spy.memcached.ops.CollectionCreateOperation; import net.spy.memcached.ops.CollectionGetOperation; import net.spy.memcached.ops.CollectionInsertOperation; import net.spy.memcached.ops.ConcatenationType; import net.spy.memcached.ops.GetOperation; -import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.GetsOperation; +import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; @@ -1360,4 +1362,65 @@ private BTreeSMGet createBTreeSMGet(BKey from, BKey to, BopGetArgs args, args.getCount(), unique); } } + + public ArcusFuture bopIncr(String key, BKey bKey, int delta) { + CollectionMutate mutate = new BTreeMutate(Mutator.incr, delta); + return collectionMutate(key, bKey.toString(), mutate); + } + + public ArcusFuture bopIncr(String key, BKey bKey, int delta, long initial, byte[] eFlag) { + CollectionMutate mutate = new BTreeMutate(Mutator.incr, delta, initial, eFlag); + return collectionMutate(key, bKey.toString(), mutate); + } + + public ArcusFuture bopDecr(String key, BKey bKey, int delta) { + CollectionMutate mutate = new BTreeMutate(Mutator.decr, delta); + return collectionMutate(key, bKey.toString(), mutate); + } + + public ArcusFuture bopDecr(String key, BKey bKey, int delta, long initial, byte[] eFlag) { + CollectionMutate mutate = new BTreeMutate(Mutator.decr, delta, initial, eFlag); + return collectionMutate(key, bKey.toString(), mutate); + } + + private ArcusFuture collectionMutate(String key, String internalKey, + CollectionMutate mutate) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(Long.parseLong(status.getMessage())); + break; + case ERR_NOT_FOUND: + case ERR_NOT_FOUND_ELEMENT: + result.set(null); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / + * OVERFLOWED / NOT_SUPPORTED or unknown statement + */ + result.addError(key, status); + } + } + + @Override + public void complete() { + future.complete(); + } + }; + Operation op = client.getOpFact().collectionMutate(key, internalKey, mutate, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index 73a026814..dd46e7361 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -380,4 +380,56 @@ ArcusFuture>> bopMultiGet(List keys, */ ArcusFuture> bopSortMergeGet(List keys, BKey from, BKey to, boolean unique, BopGetArgs args); + + /** + * Increments a numeric value of an element with the given bKey in a btree item by {@code delta} + * + * @param key key of the btree item + * @param bKey BKey of the element to increment + * @param delta the amount to increment (> 0) + * @return the new value after increment, or {@code null} if the key or element is not found. + */ + ArcusFuture bopIncr(String key, BKey bKey, int delta); + + /** + * Increments a numeric value of an element with the given bKey in a btree item by {@code delta}. + * If the element does not exist, it is created with {@code initial} value and {@code eFlag}. + * + * @param key key of the btree item + * @param bKey BKey of the element to increment + * @param delta the amount to increment (> 0) + * @param initial the value to store if the element does not exist + * ({@code delta} is ignored) (≥ 0) + * @param eFlag eFlag of the element to create, or {@code null} if not needed + * @return the new value after increment, + * or {@code initial} if the element did not exist. + */ + ArcusFuture bopIncr(String key, BKey bKey, int delta, long initial, byte[] eFlag); + + /** + * Decrements a numeric value of an element with the given bKey in a btree item by {@code delta}. + *

If the value is decremented below 0, it will be set to 0.

+ * + * @param key key of the btree item + * @param bKey BKey of the element to decrement + * @param delta the amount to decrement (> 0) + * @return the new value after decrement, or {@code null} if the key or element is not found. + */ + ArcusFuture bopDecr(String key, BKey bKey, int delta); + + /** + * Decrements a numeric value of an element with the given bKey in a btree item by {@code delta}. + * If the element does not exist, it is created with {@code initial} value and {@code eFlag}. + *

If the value is decremented below 0, it will be set to 0.

+ * + * @param key key of the btree item + * @param bKey BKey of the element to decrement + * @param delta the amount to decrement (> 0) + * @param initial the value to store if the element does not exist + * ({@code delta} is ignored) (≥ 0) + * @param eFlag eFlag of the element to create, or {@code null} if not needed + * @return the new value after decrement, + * or {@code initial} if the element did not exist. + */ + ArcusFuture bopDecr(String key, BKey bKey, int delta, long initial, byte[] eFlag); } diff --git a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java index bdbb15bd4..52a009c2b 100644 --- a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java +++ b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java @@ -11,6 +11,7 @@ import net.spy.memcached.collection.CollectionOverflowAction; import net.spy.memcached.collection.ElementFlagUpdate; import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.ops.OperationException; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.v2.vo.BKey; import net.spy.memcached.v2.vo.BTreeElement; @@ -25,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -974,4 +976,310 @@ void bopUpdateNotFound() throws ExecutionException, InterruptedException, Timeou .toCompletableFuture() .get(300L, TimeUnit.MILLISECONDS); } + + @Test + void bopIncrSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + BTreeElement element = new BTreeElement<>(bKey, "100", null); + + async.bopInsert(key, element, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, bKey, 10) + // then + .thenAccept(result -> assertEquals(110L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrNotFound() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + + // when + async.bopIncr(key, bKey, 10) + // then + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrNotFoundElement() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(999L); + + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, bKey, 10) + // then + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrInitialNotExistsElement() throws ExecutionException, InterruptedException, + TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(999L); + + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, bKey, 10, 100L, null) + // then + .thenCompose(result -> { + assertEquals(100L, result); + return async.bopGet(key, bKey, BopGetArgs.DEFAULT); + }) + .thenAccept(result -> { + assertEquals(bKey, result.getBkey()); + assertEquals("100", result.getValue()); + assertNull(result.getEFlag()); + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrInitialExistingElement() throws ExecutionException, InterruptedException, + TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + BTreeElement element = new BTreeElement<>(bKey, "100", null); + + async.bopInsert(key, element, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, bKey, 10, 1000L, null) + // then + .thenAccept(result -> assertEquals(110L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrTypeMismatch() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + + async.set(key, 60, "invalid-type-value") + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, bKey, 10) + // then + .handle((result, ex) -> { + assertInstanceOf(OperationException.class, ex); + assertTrue(ex.getMessage().contains("TYPE_MISMATCH")); + return result; + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopIncrBKeyMismatch() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopIncr(key, BKey.of(new byte[]{0x01}), 10) + // then + .handle((result, ex) -> { + assertInstanceOf(OperationException.class, ex); + assertTrue(ex.getMessage().contains("BKEY_MISMATCH")); + return result; + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + BTreeElement element = new BTreeElement<>(bKey, "100", null); + + async.bopInsert(key, element, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10) + // then + .thenAccept(result -> assertEquals(90L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrNotFound() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + + // when + async.bopDecr(key, bKey, 10) + // then + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrNotFoundElement() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(999L); + + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10) + // then + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrInitialNotExistsElement() throws ExecutionException, InterruptedException, + TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(999L); + + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10, 100L, null) + // then + .thenAccept(result -> assertEquals(100L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrInitialExistingElement() throws ExecutionException, InterruptedException, + TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + BTreeElement element = new BTreeElement<>(bKey, "100", null); + + async.bopInsert(key, element, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10, 1000L, null) + // then + .thenAccept(result -> assertEquals(90L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrResultUnderZero() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + BTreeElement element = new BTreeElement<>(bKey, "5", null); + + async.bopInsert(key, element, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10) + // then + .thenAccept(result -> assertEquals(0L, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrTypeMismatch() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + BKey bKey = BKey.of(1L); + + async.set(key, 60, "invalid-type-value") + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, bKey, 10) + // then + .handle((result, ex) -> { + assertInstanceOf(OperationException.class, ex); + assertTrue(ex.getMessage().contains("TYPE_MISMATCH")); + return result; + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopDecrBKeyMismatch() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.bopDecr(key, BKey.of(new byte[]{0x01}), 10) + // then + .handle((result, ex) -> { + assertInstanceOf(OperationException.class, ex); + assertTrue(ex.getMessage().contains("BKEY_MISMATCH")); + return result; + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } }