2323
2424import com .spotify .mobius .functions .Consumer ;
2525import java .util .concurrent .ConcurrentLinkedQueue ;
26+ import java .util .concurrent .atomic .AtomicBoolean ;
27+ import java .util .concurrent .atomic .AtomicReference ;
2628
2729class FireAtLeastOnceObserver <V > implements Consumer <V > {
28- private enum State {
29- UNFIRED ,
30- FIRING ,
31- READY ,
32- }
3330
3431 private final Consumer <V > delegate ;
35- private volatile State state = State . UNFIRED ;
32+ private volatile boolean hasStartedEmitting = false ;
3633 private final ConcurrentLinkedQueue <V > queue = new ConcurrentLinkedQueue <>();
3734
3835 public FireAtLeastOnceObserver (Consumer <V > delegate ) {
@@ -41,58 +38,55 @@ public FireAtLeastOnceObserver(Consumer<V> delegate) {
4138
4239 @ Override
4340 public void accept (V value ) {
44- // this is a bit racy, with three threads handling values with order 1, 2 and 3, respectively:
45- // 1. thread 1 has called accceptIfUnfired and is in safeConsume, having published its value to
46- // observers, and having just set the state to READY
47- // 2. thread 2 has called accept, and is in safeConsume, before the first synchronized section
48- // 3. thread 3 has called accept and is about to check the current state.
49- //
50- // now, if thread 3 reads READY and calls the delegate's accept method directly, before
51- // thread 2 sets the state to FIRING and publishes its data, the observer will see 1, 3, 2.
52- // this means that this class isn't safe for racing calls to accept(), but given that it's
53- // only intended to be used within the event processing, which is sequential, that is not a
54- // risk.
55- // do note that this class isn't generally useful outside the specific context of event
56- // processing.
57- if (state != State .READY ) {
58- safeConsume (value , true );
59- } else {
60- delegate .accept (value );
61- }
41+ queue .add (value );
42+ drainQueue ();
6243 }
6344
45+ private final AtomicReference <AtomicReference <V >> firstValue = new AtomicReference <>(null );
46+
6447 public void acceptIfFirst (V value ) {
65- if (state == State .UNFIRED ) {
66- safeConsume (value , false );
48+ // Wrap the value, so that we are able to represent having a `null` value vs. not having a value
49+ // at all.
50+ AtomicReference <V > wrappedValue = new AtomicReference <>(value );
51+ if (firstValue .compareAndSet (null , wrappedValue )) {
52+ drainQueue ();
6753 }
6854 }
6955
70- private void safeConsume (V value , boolean acceptAlways ) {
71- // this synchronized section mustn't call unsafe external code like the delegate's accept
72- // method to avoid the risk of deadlocks. It's synchronized because it's changing two stateful
73- // fields: the 'state' and the 'queue', and those need to go together to guarantee ordering
74- // of the emitted values.
75- synchronized (this ) {
76- // add this item to the queue if we haven't fired, or if it should be added anyway
77- if (state == State .UNFIRED || acceptAlways ) {
78- queue .add (value );
79- }
56+ private final AtomicBoolean processing = new AtomicBoolean (false );
8057
81- // set state to FIRING to prevent acceptIfUnfired from adding items to the queue and messing
82- // ordering up - regular accept mustn't invoke the delegate consumer directly until we've
83- // processed the queue and entered READY state.
84- state = State . FIRING ;
58+ private void drainQueue () {
59+ if (! processing . compareAndSet ( false , true )) {
60+ // already draining the queue
61+ return ;
8562 }
8663
87- for (V toSend = queue .poll (); toSend != null ; toSend = queue .poll ()) {
88- delegate .accept (value );
64+ // We are now in a safe section that can only execute on one thread at the time.
65+
66+ // If this is the first time, try to emit a value that only can be emitted if it is first.
67+ if (!hasStartedEmitting ) {
68+ hasStartedEmitting = true ;
69+ AtomicReference <V > wrappedValue = firstValue .get ();
70+ if (wrappedValue != null ) {
71+ delegate .accept (wrappedValue .get ());
72+ }
8973 }
9074
91- synchronized (this ) {
92- // it's possible for a racing 'accept' call to add an item to the queue after the last poll
93- // above, so check in an exclusive way that the queue is in fact empty TODO: not correct.
94- if (queue .isEmpty ()) {
95- state = State .READY ;
75+ boolean done = false ;
76+
77+ while (!done ) {
78+ try {
79+ for (V toSend = queue .poll (); toSend != null ; toSend = queue .poll ()) {
80+ delegate .accept (toSend );
81+ }
82+
83+ } finally {
84+ processing .set (false ); // leave the safe section
85+
86+ // If the queue is empty or if we can't reacquire the processing lock, we're done,
87+ // because either there is nothing to do, or someone else will process the queue.
88+ // Note: it's important that we check the queue first, otherwise we might leak the lock.
89+ done = queue .isEmpty () || !processing .compareAndSet (false , true );
9690 }
9791 }
9892 }
0 commit comments