Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/main/java/net/spy/memcached/collection/BTreeFindPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,23 @@ public class BTreeFindPosition {
private final BTreeOrder order;
private String str;

public BTreeFindPosition(long longBKey, BTreeOrder order) {
this.bkey = String.valueOf(longBKey);
public BTreeFindPosition(String bkey, BTreeOrder order) {
if (order == null) {
throw new IllegalArgumentException("BTreeOrder must not be null.");
}
if (bkey == null || bkey.isEmpty()) {
throw new IllegalArgumentException("BKey must not be null or empty.");
}
this.bkey = bkey;
this.order = order;
}

public BTreeFindPosition(long longBKey, BTreeOrder order) {
this(String.valueOf(longBKey), order);
}

public BTreeFindPosition(byte[] byteArrayBKey, BTreeOrder order) {
this.bkey = BTreeUtil.toHex(byteArrayBKey);
this.order = order;
this(BTreeUtil.toHex(byteArrayBKey), order);
}

public String stringify() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,26 @@ public class BTreeFindPositionWithGet extends CollectionGet {
private final int count;
private BKeyObject bkey;

public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
public BTreeFindPositionWithGet(BKeyObject bkeyObject, BTreeOrder order, int count) {
if (order == null) {
throw new IllegalArgumentException("BTreeOrder must not be null.");
}
if (count < 0 || count > 100) {
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
}
this.bkeyObject = new BKeyObject(longBKey);
this.bkeyObject = bkeyObject;
this.order = order;
this.count = count;
this.eHeadCount = 2;
this.eFlagIndex = 1;
}

public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
this(new BKeyObject(longBKey), order, count);
}

public BTreeFindPositionWithGet(byte[] byteArrayBKey, BTreeOrder order, int count) {
if (order == null) {
throw new IllegalArgumentException("BTreeOrder must not be null.");
}
if (count < 0 || count > 100) {
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
}
this.bkeyObject = new BKeyObject(byteArrayBKey);
this.order = order;
this.count = count;
this.eHeadCount = 2;
this.eFlagIndex = 1;
this(new BKeyObject(byteArrayBKey), order, count);
}

public String stringify() {
Expand All @@ -89,7 +83,7 @@ public String stringify() {

if (count > 0) {
b.append(" ");
b.append(String.valueOf(count));
b.append(count);
}

str = b.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void handleLine(String line) {
pos = position - index;
posDiff = 1;

// BTreeFindPositionWithGetOperation.Callback cb =
// (BTreeFindPositionWithGetOperation.Callback) getCallback();
// cb.gotPosition(position);

// start to read actual data
setReadType(OperationReadType.DATA);
} else {
Expand Down
202 changes: 202 additions & 0 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
import net.spy.memcached.collection.BKeyObject;
import net.spy.memcached.collection.BTreeCount;
import net.spy.memcached.collection.BTreeCreate;
import net.spy.memcached.collection.BTreeFindPosition;
import net.spy.memcached.collection.BTreeFindPositionWithGet;
import net.spy.memcached.collection.BTreeDelete;
import net.spy.memcached.collection.BTreeGet;
import net.spy.memcached.collection.BTreeGetBulk;
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
import net.spy.memcached.collection.BTreeGetByPosition;
import net.spy.memcached.collection.BTreeInsert;
import net.spy.memcached.collection.BTreeInsertAndGet;
import net.spy.memcached.collection.BTreeMutate;
import net.spy.memcached.collection.BTreeOrder;
import net.spy.memcached.collection.BTreeSMGet;
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
Expand All @@ -60,7 +64,10 @@
import net.spy.memcached.collection.ElementValueType;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
import net.spy.memcached.ops.BTreeGetByPositionOperation;
import net.spy.memcached.ops.BTreeInsertAndGetOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
import net.spy.memcached.ops.CollectionCreateOperation;
Expand All @@ -79,6 +86,7 @@
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.v2.vo.BKey;
import net.spy.memcached.v2.vo.BTreeElement;
import net.spy.memcached.v2.vo.BTreePositionElement;
import net.spy.memcached.v2.vo.BTreeElements;
import net.spy.memcached.v2.vo.BTreeUpdateElement;
import net.spy.memcached.v2.vo.BopDeleteArgs;
Expand Down Expand Up @@ -1252,6 +1260,200 @@ private BTreeGetBulk<T> createBTreeGetBulk(MemcachedNode node, List<String> keys
}
}

public ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order) {
AbstractArcusResult<Integer> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Integer> future = new ArcusFutureImpl<>(result);
BTreeFindPosition findPosition = new BTreeFindPosition(bKey.toString(), order);
ArcusClient client = arcusClientSupplier.get();

BTreeFindPositionOperation.Callback cb = new BTreeFindPositionOperation.Callback() {
@Override
public void gotData(int position) {
result.set(position);
}

@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
break;
case ERR_NOT_FOUND:
case ERR_NOT_FOUND_ELEMENT:
result.set(null);
break;
case CANCELLED:
future.internalCancel();
break;
default:
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
result.addError(key, status);
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().bopFindPosition(key, findPosition, cb);
future.setOp(op);
client.addOp(key, op);

return future;
}

public ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order) {
AbstractArcusResult<BTreeElement<T>> result
= new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<BTreeElement<T>> future = new ArcusFutureImpl<>(result);
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, pos);
ArcusClient client = arcusClientSupplier.get();

BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
@Override
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
result.set(buildBTreeElement(flags, bKey, eFlag, data));
}

@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
break;
case ERR_NOT_FOUND:
case ERR_NOT_FOUND_ELEMENT:
result.set(null);
break;
case CANCELLED:
future.internalCancel();
break;
default:
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
result.addError(key, status);
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
future.setOp(op);
client.addOp(key, op);

return future;
}

public ArcusFuture<List<BTreeElement<T>>> bopGetByPosition(String key,
int from, int to,
BTreeOrder order) {

if (from > to) {
throw new IllegalArgumentException("from should be less than or equal to to.");
}

AbstractArcusResult<List<BTreeElement<T>>> result
= new AbstractArcusResult<>(new AtomicReference<>(new ArrayList<>()));
ArcusFutureImpl<List<BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, from, to);
ArcusClient client = arcusClientSupplier.get();

BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
@Override
public void gotData(int pos, int flags, BKeyObject bkey, byte[] eflag, byte[] data) {
result.get().add(buildBTreeElement(flags, bkey, eflag, data));
}

@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
case ERR_NOT_FOUND_ELEMENT:
break;
case ERR_NOT_FOUND:
result.set(null);
break;
case CANCELLED:
future.internalCancel();
break;
default:
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
result.addError(key, status);
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
future.setOp(op);
client.addOp(key, op);

return future;
}

public ArcusFuture<List<BTreePositionElement<T>>> bopPositionWithGet(String key,
BKey bKey,
int count,
BTreeOrder order) {

AbstractArcusResult<List<BTreePositionElement<T>>> result =
new AbstractArcusResult<>(new AtomicReference<>(new ArrayList<>()));
ArcusFutureImpl<List<BTreePositionElement<T>>> future = new ArcusFutureImpl<>(result);
BTreeFindPositionWithGet findPositionWithGet =
new BTreeFindPositionWithGet(bKey.toBKeyObject(), order, count);
ArcusClient client = arcusClientSupplier.get();

BTreeFindPositionWithGetOperation.Callback cb = new BTreeFindPositionWithGetOperation
.Callback() {

@Override
public void gotData(int pos, int flags, BKeyObject bKeyObj, byte[] eFlag, byte[] data) {
T decodedData = tcForCollection.decode(
new CachedData(flags, data, tcForCollection.getMaxSize()));
result.get().add(new BTreePositionElement<>(BKey.of(bKeyObj), decodedData, eFlag, pos));
}

@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
case ERR_NOT_FOUND_ELEMENT:
break;
case ERR_NOT_FOUND:
result.set(null);
break;
case CANCELLED:
future.internalCancel();
break;
default:
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
result.addError(key, status);
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().bopFindPositionWithGet(key, findPositionWithGet, cb);
future.setOp(op);
client.addOp(key, op);

return future;
}

private BTreeElement<T> buildBTreeElement(int flags, BKeyObject bKey,
byte[] eFlag, byte[] data) {
T decodedData = tcForCollection.decode(
new CachedData(flags, data, tcForCollection.getMaxSize()));
return new BTreeElement<>(BKey.of(bKey), decodedData, eFlag);
}

public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
boolean unique, BopGetArgs args) {
verifyBKeyRange(from, to);
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.util.Map;

import net.spy.memcached.CASValue;
import net.spy.memcached.collection.BTreeOrder;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.ElementFlagFilter;
import net.spy.memcached.collection.ElementValueType;
import net.spy.memcached.v2.vo.BKey;
import net.spy.memcached.v2.vo.BTreePositionElement;
import net.spy.memcached.v2.vo.BTreeElement;
import net.spy.memcached.v2.vo.BTreeElements;
import net.spy.memcached.v2.vo.BTreeUpdateElement;
Expand Down Expand Up @@ -369,6 +371,57 @@ ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
BKey from, BKey to,
BopGetArgs args);

/**
* Get the position of an element with the given bKey in a btree item.
*
* @param key key of the btree item
* @param bKey BKey of the element to find
* @param order the order of the btree to determine position
* @return the 0-based position of the element,
* or {@code null} if the key or element is not found.
*/
ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order);

/**
* Get an element at the given position in a btree item.
*
* @param key key of the btree item
* @param pos 0-based position of the element to get
* @param order the order of the btree to determine position
* @return the {@code BTreeElement} at the given position,
* or {@code null} if the key is not found or position is out of range.
*/
ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order);

/**
* Get elements in a position range from a btree item.
*
* @param key key of the btree item
* @param posFrom start position (inclusive)
* @param posTo end position (inclusive)
* @param order the order of the btree to determine position
* @return list of {@code BTreeElement} in the given position range, in traversal order.
* An empty list if no elements exist in the range but the key exists.
* {@code null} if the key is not found.
*/
ArcusFuture<List<BTreeElement<T>>> bopGetByPosition(String key,
int posFrom, int posTo, BTreeOrder order);

/**
* Get the position of an element with the given bKey and retrieve neighboring elements.
*
* @param key key of the btree item
* @param bKey BKey of the element to find
* @param count the number of elements
* to retrieve on each side of the found position (0 &le; count &le; 100)
* @param order the order of the btree to determine position
* @return {@code BopPositionWithGetResult} containing the position of the target element
* and a map of neighboring elements (keyed by position).
* {@code null} if the key or element is not found.
*/
ArcusFuture<List<BTreePositionElement<T>>> bopPositionWithGet(String key, BKey bKey,
int count, BTreeOrder order);

/**
* Get sort-merged elements from multiple btree items.
*
Expand Down
Loading
Loading