diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index d03015069..5f9775992 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -39,7 +39,9 @@ import net.spy.memcached.collection.BTreeGetBulk; import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey; import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey; +import net.spy.memcached.collection.BTreeUpdate; import net.spy.memcached.collection.BTreeUpsert; +import net.spy.memcached.collection.CollectionUpdate; import net.spy.memcached.internal.result.GetsResultImpl; import net.spy.memcached.ops.BTreeGetBulkOperation; import net.spy.memcached.collection.BTreeSMGet; @@ -70,6 +72,7 @@ import net.spy.memcached.v2.vo.BKey; import net.spy.memcached.v2.vo.BTreeElement; import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BTreeUpdateElement; import net.spy.memcached.v2.vo.BopGetArgs; import net.spy.memcached.v2.vo.SMGetElements; @@ -623,6 +626,63 @@ public void complete() { return future; } + public ArcusFuture bopUpdate(String key, BTreeUpdateElement element) { + BTreeUpdate update = new BTreeUpdate<>(element.getValue(), element.getEFlagUpdate(), false); + return collectionUpdate(key, element.getBkey().toString(), update); + } + + private ArcusFuture collectionUpdate(String key, + String internalKey, + CollectionUpdate collectionUpdate) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + CachedData co = null; + if (collectionUpdate.getNewValue() != null) { + co = tcForCollection.encode(collectionUpdate.getNewValue()); + collectionUpdate.setFlags(co.getFlags()); + } + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(true); + break; + case ERR_NOT_FOUND_ELEMENT: + result.set(false); + break; + case ERR_NOT_FOUND: + result.set(null); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / EFLAG_MISMATCH / NOTHING_TO_UPDATE / + * OVERFLOWED / OUT_OF_RANGE / NOT_SUPPORTED or unknown statement + */ + result.addError(key, status); + break; + } + } + + @Override + public void complete() { + future.complete(); + } + }; + Operation op = client.getOpFact() + .collectionUpdate(key, internalKey, collectionUpdate, + (co == null) ? null : co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + public ArcusFuture>> bopInsertAndGetTrimmed( String key, BTreeElement element, CollectionAttributes attributes) { return bopInsertOrUpsertAndGetTrimmed(key, element, false, attributes); diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index e08959f38..7bfbfc457 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -26,6 +26,7 @@ import net.spy.memcached.v2.vo.BKey; import net.spy.memcached.v2.vo.BTreeElement; import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BTreeUpdateElement; import net.spy.memcached.v2.vo.BopGetArgs; import net.spy.memcached.v2.vo.SMGetElements; @@ -207,6 +208,16 @@ ArcusFuture bopUpsert(String key, BTreeElement element, */ ArcusFuture bopUpsert(String key, BTreeElement element); + /** + * Update an element in a btree item + * + * @param key key to update + * @param element btree element to update. + * @return {@code Boolean.True} if updated, {@code Boolean.False} if element does not exist, + * {@code null} if key is not found + */ + ArcusFuture bopUpdate(String key, BTreeUpdateElement element); + /** * Insert an element into a btree item and get trimmed element if overflow trim occurs. * diff --git a/src/main/java/net/spy/memcached/v2/vo/BTreeUpdateElement.java b/src/main/java/net/spy/memcached/v2/vo/BTreeUpdateElement.java new file mode 100644 index 000000000..f66a24397 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BTreeUpdateElement.java @@ -0,0 +1,55 @@ +package net.spy.memcached.v2.vo; + +import net.spy.memcached.collection.ElementFlagUpdate; + +public final class BTreeUpdateElement { + private final BKey bkey; + private final V value; + private final ElementFlagUpdate eFlagUpdate; + + private BTreeUpdateElement(BKey bkey, V value, ElementFlagUpdate eFlagUpdate) { + if (bkey == null) { + throw new IllegalArgumentException("BKey cannot be null."); + } + + this.bkey = bkey; + this.value = value; + this.eFlagUpdate = eFlagUpdate; + } + + public static BTreeUpdateElement withValue(BKey bkey, V value) { + if (value == null) { + throw new IllegalArgumentException("Value cannot be null."); + } + return new BTreeUpdateElement<>(bkey, value, null); + } + + public static BTreeUpdateElement withEFlagUpdate(BKey bkey, + ElementFlagUpdate eFlagUpdate) { + if (eFlagUpdate == null) { + throw new IllegalArgumentException("EFlagUpdate cannot be null."); + } + return new BTreeUpdateElement<>(bkey, null, eFlagUpdate); + } + + public static BTreeUpdateElement withValueAndEFlag(BKey bkey, + V value, + ElementFlagUpdate eFlagUpdate) { + if (value == null || eFlagUpdate == null) { + throw new IllegalArgumentException("Both value and eFlagUpdate cannot be null."); + } + return new BTreeUpdateElement<>(bkey, value, eFlagUpdate); + } + + public BKey getBkey() { + return bkey; + } + + public V getValue() { + return value; + } + + public ElementFlagUpdate getEFlagUpdate() { + return eFlagUpdate; + } +} diff --git a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java index 6728b4906..bdbb15bd4 100644 --- a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java +++ b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java @@ -2,22 +2,27 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.CollectionOverflowAction; +import net.spy.memcached.collection.ElementFlagUpdate; import net.spy.memcached.collection.ElementValueType; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.v2.vo.BKey; import net.spy.memcached.v2.vo.BTreeElement; import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BTreeUpdateElement; import net.spy.memcached.v2.vo.BopGetArgs; import net.spy.memcached.v2.vo.SMGetElements; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; @@ -827,4 +832,146 @@ void bopSortMergeGetNotHaveTrimmedKeysOutOfElementsRangeAscending() throws Excep .toCompletableFuture() .get(300, TimeUnit.MILLISECONDS); } + + @Test + void bopUpdateSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, BKey.of(1L), BopGetArgs.DEFAULT); + }) + .thenAccept(element -> { + assertNotNull(element); + assertEquals(ELEMENTS.get(0).getBkey(), element.getBkey()); + assertEquals(ELEMENTS.get(0).getValue(), element.getValue()); + assertNull(element.getEFlag()); + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + ElementFlagUpdate eFlagUpdate = new ElementFlagUpdate(new byte[]{1, 2, 3}); + BTreeUpdateElement updatedElement = BTreeUpdateElement + .withValueAndEFlag(BKey.of(1L), "updated_value", eFlagUpdate); + + // when + async.bopUpdate(key, updatedElement) + // then + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, BKey.of(1L), BopGetArgs.DEFAULT); + }) + .thenAccept(result -> { + assertNotNull(result); + assertEquals(updatedElement.getBkey(), result.getBkey()); + assertEquals(updatedElement.getValue(), result.getValue()); + assertArrayEquals(eFlagUpdate.getElementFlag(), result.getEFlag()); + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpdateValueSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + BTreeUpdateElement updatedElement = BTreeUpdateElement + .withValue(BKey.of(1L), "updated_value"); + + // when + async.bopUpdate(key, updatedElement) + // then + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, BKey.of(1L), BopGetArgs.DEFAULT); + }) + .thenAccept(result -> { + assertNotNull(result); + assertEquals(updatedElement.getBkey(), result.getBkey()); + assertEquals(updatedElement.getValue(), result.getValue()); + assertNull(result.getEFlag()); + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpdateEFlagSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + ElementFlagUpdate eFlagUpdate = new ElementFlagUpdate(new byte[]{1, 2, 3}); + BTreeUpdateElement updatedElement = BTreeUpdateElement + .withEFlagUpdate(BKey.of(1L), eFlagUpdate); + + // when + async.bopUpdate(key, updatedElement) + // then + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, BKey.of(1L), BopGetArgs.DEFAULT); + }) + .thenAccept(result -> { + assertNotNull(result); + assertEquals(updatedElement.getBkey(), result.getBkey()); + assertEquals(ELEMENTS.get(0).getValue(), result.getValue()); + assertArrayEquals(eFlagUpdate.getElementFlag(), result.getEFlag()); + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpdateNotFoundElement() throws ExecutionException, InterruptedException, + TimeoutException { + + // given + String key = keys.get(0); + + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + ElementFlagUpdate eFlagUpdate = new ElementFlagUpdate(new byte[]{1, 2, 3}); + BTreeUpdateElement updatedElement = BTreeUpdateElement + .withValueAndEFlag(BKey.of(1L), "updated_value", eFlagUpdate); + + // when + async.bopUpdate(key, updatedElement) + // then + .thenAccept(Assertions::assertFalse) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpdateNotFound() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + + ElementFlagUpdate eFlagUpdate = new ElementFlagUpdate(new byte[]{1, 2, 3}); + BTreeUpdateElement updatedElement = BTreeUpdateElement + .withValueAndEFlag(BKey.of(1L), "updated_value", eFlagUpdate); + + // when + async.bopUpdate(key, updatedElement) + // then + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } }