Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,34 @@
import net.spy.memcached.collection.BTreeCreate;
import net.spy.memcached.collection.BTreeGet;
import net.spy.memcached.collection.BTreeGetBulk;
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
import net.spy.memcached.collection.BTreeUpdate;
import net.spy.memcached.collection.BTreeUpsert;
import net.spy.memcached.collection.CollectionUpdate;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.ops.BTreeGetBulkOperation;
import net.spy.memcached.collection.BTreeSMGet;
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
import net.spy.memcached.collection.BTreeInsert;
import net.spy.memcached.collection.BTreeInsertAndGet;
import net.spy.memcached.collection.BTreeMutate;
import net.spy.memcached.collection.BTreeSMGet;
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
import net.spy.memcached.collection.BTreeUpdate;
import net.spy.memcached.collection.BTreeUpsert;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionCreate;
import net.spy.memcached.collection.CollectionInsert;
import net.spy.memcached.collection.CollectionMutate;
import net.spy.memcached.collection.CollectionUpdate;
import net.spy.memcached.collection.ElementValueType;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.BTreeGetBulkOperation;
import net.spy.memcached.ops.BTreeInsertAndGetOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
import net.spy.memcached.ops.CollectionCreateOperation;
import net.spy.memcached.ops.CollectionGetOperation;
import net.spy.memcached.ops.CollectionInsertOperation;
import net.spy.memcached.ops.ConcatenationType;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
Expand Down Expand Up @@ -1360,4 +1362,65 @@ private BTreeSMGet<T> createBTreeSMGet(BKey from, BKey to, BopGetArgs args,
args.getCount(), unique);
}
}

public ArcusFuture<Long> bopIncr(String key, BKey bKey, int delta) {
CollectionMutate mutate = new BTreeMutate(Mutator.incr, delta);
return collectionMutate(key, bKey.toString(), mutate);
}

public ArcusFuture<Long> bopIncr(String key, BKey bKey, int delta, long initial, byte[] eFlag) {
CollectionMutate mutate = new BTreeMutate(Mutator.incr, delta, initial, eFlag);
return collectionMutate(key, bKey.toString(), mutate);
}

public ArcusFuture<Long> bopDecr(String key, BKey bKey, int delta) {
CollectionMutate mutate = new BTreeMutate(Mutator.decr, delta);
return collectionMutate(key, bKey.toString(), mutate);
}

public ArcusFuture<Long> bopDecr(String key, BKey bKey, int delta, long initial, byte[] eFlag) {
CollectionMutate mutate = new BTreeMutate(Mutator.decr, delta, initial, eFlag);
return collectionMutate(key, bKey.toString(), mutate);
}

private ArcusFuture<Long> collectionMutate(String key, String internalKey,
CollectionMutate mutate) {
AbstractArcusResult<Long> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Long> future = new ArcusFutureImpl<>(result);
ArcusClient client = arcusClientSupplier.get();

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.set(Long.parseLong(status.getMessage()));
break;
case ERR_NOT_FOUND:
case ERR_NOT_FOUND_ELEMENT:
result.set(null);
break;
case CANCELLED:
future.internalCancel();
break;
default:
/*
* TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE /
* OVERFLOWED / NOT_SUPPORTED or unknown statement
*/
result.addError(key, status);
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().collectionMutate(key, internalKey, mutate, cb);
future.setOp(op);
client.addOp(key, op);

return future;
}
}
52 changes: 52 additions & 0 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,56 @@ ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
*/
ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
boolean unique, BopGetArgs args);

/**
* Increments a numeric value of an element with the given bKey in a btree item by {@code delta}
*
* @param key key of the btree item
* @param bKey BKey of the element to increment
* @param delta the amount to increment (&gt; 0)
* @return the new value after increment, or {@code null} if the key or element is not found.
*/
ArcusFuture<Long> bopIncr(String key, BKey bKey, int delta);

/**
* Increments a numeric value of an element with the given bKey in a btree item by {@code delta}.
* If the element does not exist, it is created with {@code initial} value and {@code eFlag}.
*
* @param key key of the btree item
* @param bKey BKey of the element to increment
* @param delta the amount to increment (&gt; 0)
* @param initial the value to store if the element does not exist
* ({@code delta} is ignored) (&ge; 0)
* @param eFlag eFlag of the element to create, or {@code null} if not needed
* @return the new value after increment,
* or {@code initial} if the element did not exist.
*/
ArcusFuture<Long> bopIncr(String key, BKey bKey, int delta, long initial, byte[] eFlag);

/**
* Decrements a numeric value of an element with the given bKey in a btree item by {@code delta}.
* <p>If the value is decremented below 0, it will be set to 0.</p>
*
* @param key key of the btree item
* @param bKey BKey of the element to decrement
* @param delta the amount to decrement (&gt; 0)
* @return the new value after decrement, or {@code null} if the key or element is not found.
*/
ArcusFuture<Long> bopDecr(String key, BKey bKey, int delta);

/**
* Decrements a numeric value of an element with the given bKey in a btree item by {@code delta}.
* If the element does not exist, it is created with {@code initial} value and {@code eFlag}.
* <p>If the value is decremented below 0, it will be set to 0.</p>
*
* @param key key of the btree item
* @param bKey BKey of the element to decrement
* @param delta the amount to decrement (&gt; 0)
* @param initial the value to store if the element does not exist
* ({@code delta} is ignored) (&ge; 0)
* @param eFlag eFlag of the element to create, or {@code null} if not needed
* @return the new value after decrement,
* or {@code initial} if the element did not exist.
*/
ArcusFuture<Long> bopDecr(String key, BKey bKey, int delta, long initial, byte[] eFlag);
}
Loading
Loading