Skip to content

Commit 52d3b23

Browse files
committed
junit fix: avoid creating of thousands of threads by MatcherResponse when many messages are handled using MockMessagingSpy
Too many threads can lead to OOM, so new Thread is replaced with an executor spin wait is added to HintsServiceTest.testPageSeek to avoid flaky NPE (HintsStore is populated in an async way) patch by Dmitry Konstantinov; reviewed by Brandon Williams for CASSANDRA-21166
1 parent 7532be5 commit 52d3b23

6 files changed

Lines changed: 136 additions & 88 deletions

File tree

test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,23 @@ public void reinstanciateService() throws Throwable
104104
public void testListPendingHints() throws InterruptedException, ExecutionException, TimeoutException
105105
{
106106
HintsService.instance.resumeDispatch();
107-
MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, -1);
108-
Awaitility.await("For the hints file to flush")
109-
.atMost(Duration.ofMillis(DatabaseDescriptor.getHintsFlushPeriodInMS() * 2L))
110-
.until(() -> !HintsService.instance.getPendingHints().isEmpty());
111-
112-
List<PendingHintsInfo> pendingHints = HintsService.instance.getPendingHintsInfo();
113-
assertEquals(1, pendingHints.size());
114-
PendingHintsInfo info = pendingHints.get(0);
115-
assertEquals(StorageService.instance.getLocalHostUUID(), info.hostId);
116-
assertEquals(1, info.totalFiles);
117-
assertEquals(info.oldestTimestamp, info.newestTimestamp); // there is 1 descriptor with only 1 timestamp
118-
119-
// JDK21 genZGC uncovered some flakiness / hanging here waiting on Condition
120-
spy.interceptMessageOut(20000).get(60, TimeUnit.SECONDS);
121-
spy.printMessageCounts();
122-
assertEquals(Collections.emptyList(), HintsService.instance.getPendingHints());
107+
try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, -1))
108+
{
109+
Awaitility.await("For the hints file to flush")
110+
.atMost(Duration.ofMillis(DatabaseDescriptor.getHintsFlushPeriodInMS() * 2L))
111+
.until(() -> !HintsService.instance.getPendingHints().isEmpty());
112+
113+
List<PendingHintsInfo> pendingHints = HintsService.instance.getPendingHintsInfo();
114+
assertEquals(1, pendingHints.size());
115+
PendingHintsInfo info = pendingHints.get(0);
116+
assertEquals(StorageService.instance.getLocalHostUUID(), info.hostId);
117+
assertEquals(1, info.totalFiles);
118+
assertEquals(info.oldestTimestamp, info.newestTimestamp); // there is 1 descriptor with only 1 timestamp
119+
120+
// JDK21 genZGC uncovered some flakiness / hanging here waiting on Condition
121+
spy.interceptMessageOut(20000).get(60, TimeUnit.SECONDS);
122+
spy.printMessageCounts();
123+
assertEquals(Collections.emptyList(), HintsService.instance.getPendingHints());
124+
}
123125
}
124126
}

test/unit/org/apache/cassandra/hints/HintsServiceTest.java

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.TimeoutException;
2323
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicReference;
2425

2526
import javax.annotation.Nullable;
2627

@@ -29,6 +30,7 @@
2930
import com.google.common.util.concurrent.ListenableFuture;
3031
import com.google.common.util.concurrent.MoreExecutors;
3132

33+
import org.awaitility.Awaitility;
3234
import org.junit.After;
3335
import org.junit.Before;
3436
import org.junit.BeforeClass;
@@ -67,6 +69,7 @@
6769
import org.apache.cassandra.utils.MockFailureDetector;
6870

6971
import static java.util.concurrent.TimeUnit.MINUTES;
72+
import static java.util.concurrent.TimeUnit.SECONDS;
7073
import static org.apache.cassandra.Util.spinAssertEquals;
7174
import static org.apache.cassandra.config.CassandraRelevantProperties.HINT_DISPATCH_INTERVAL_MS;
7275
import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
@@ -139,14 +142,15 @@ public void testDispatchHints() throws InterruptedException, ExecutionException
139142
long cnt = StorageMetrics.totalHints.getCount();
140143

141144
// create spy for hint messages
142-
MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
143-
144-
// metrics should have been updated with number of create hints
145-
assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
146-
147-
// wait until hints have been send
148-
spy.interceptMessageOut(100).get();
149-
spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
145+
try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1))
146+
{
147+
// metrics should have been updated with number of create hints
148+
assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
149+
150+
// wait until hints have been send
151+
spy.interceptMessageOut(100).get();
152+
spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
153+
}
150154
}
151155

152156
@Test
@@ -155,66 +159,78 @@ public void testPauseAndResume() throws InterruptedException, ExecutionException
155159
HintsService.instance.pauseDispatch();
156160

157161
// create spy for hint messages
158-
MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
159-
160-
// we should not send any hints while paused
161-
ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
162-
Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>()
162+
try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1))
163163
{
164-
public void onSuccess(@Nullable Boolean aBoolean)
164+
165+
// we should not send any hints while paused
166+
ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
167+
Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>()
165168
{
166-
HintsService.instance.resumeDispatch();
167-
}
168-
}, MoreExecutors.directExecutor());
169-
170-
Futures.allAsList(
171-
noMessagesWhilePaused,
172-
spy.interceptMessageOut(100),
173-
spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
174-
).get();
169+
public void onSuccess(@Nullable Boolean aBoolean)
170+
{
171+
HintsService.instance.resumeDispatch();
172+
}
173+
}, MoreExecutors.directExecutor());
174+
175+
Futures.allAsList(
176+
noMessagesWhilePaused,
177+
spy.interceptMessageOut(100),
178+
spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
179+
).get();
180+
}
175181
}
176182

177183
@Test
178184
public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException
179185
{
180186
// create spy for hint messages, but only create responses for 5 hints
181-
MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5);
187+
try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5))
188+
{
182189

183-
Futures.allAsList(
184-
// the dispatcher will always send all hints within the current page
185-
// and only wait for the acks before going to the next page
186-
spy.interceptMessageOut(20),
187-
spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
190+
Futures.allAsList(
191+
// the dispatcher will always send all hints within the current page
192+
// and only wait for the acks before going to the next page
193+
spy.interceptMessageOut(20),
194+
spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
188195

189-
// next tick will trigger a retry of the same page as we only replied with 5/20 acks
190-
spy.interceptMessageOut(20)
191-
).get();
196+
// next tick will trigger a retry of the same page as we only replied with 5/20 acks
197+
spy.interceptMessageOut(20)
198+
).get();
192199

193-
// marking the destination node as dead should stop sending hints
194-
failureDetector.isAlive = false;
195-
spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
200+
// marking the destination node as dead should stop sending hints
201+
failureDetector.isAlive = false;
202+
spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
203+
}
196204
}
197205

198206
@Test
199207
public void testPageSeek() throws InterruptedException, ExecutionException
200208
{
201209
// create spy for hint messages, stop replying after 12k (should be on 3rd page)
202-
MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, 12000);
203-
204-
// At this point the dispatcher will constantly retry the page we stopped acking,
205-
// thus we receive the same hints from the page multiple times and in total more than
206-
// all written hints. Lets just consume them for a while and then pause the dispatcher.
207-
spy.interceptMessageOut(22000).get();
208-
HintsService.instance.pauseDispatch();
209-
Thread.sleep(1000);
210-
211-
// verify that we have a dispatch offset set for the page we're currently stuck at
212-
HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
213-
HintsDescriptor descriptor = store.poll();
214-
store.offerFirst(descriptor); // add again for cleanup during re-instanciation
215-
InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
216-
assertTrue(dispatchOffset != null);
217-
assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
210+
try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, 12000))
211+
{
212+
// At this point the dispatcher will constantly retry the page we stopped acking,
213+
// thus we receive the same hints from the page multiple times and in total more than
214+
// all written hints. Lets just consume them for a while and then pause the dispatcher.
215+
spy.interceptMessageOut(22000).get();
216+
HintsService.instance.pauseDispatch();
217+
Thread.sleep(1000);
218+
219+
// verify that we have a dispatch offset set for the page we're currently stuck at
220+
HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
221+
AtomicReference<HintsDescriptor> hintDescriptorRef = new AtomicReference<>();
222+
Awaitility.waitAtMost(20, SECONDS).until(() -> {
223+
HintsDescriptor descriptor = store.poll();
224+
if (descriptor != null)
225+
hintDescriptorRef.set(descriptor);
226+
return descriptor != null;
227+
});
228+
HintsDescriptor descriptor = hintDescriptorRef.get();
229+
store.offerFirst(descriptor); // add again for cleanup during re-instanciation
230+
InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
231+
assertTrue(dispatchOffset != null);
232+
assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
233+
}
218234
}
219235

220236
/*

test/unit/org/apache/cassandra/net/MatcherResponse.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.cassandra.net;
1919

20+
import java.io.Closeable;
2021
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.HashMap;
@@ -37,7 +38,7 @@
3738
* The actual behavior by any instance of this class can be inspected by
3839
* interacting with the returned {@link MockMessagingSpy}.
3940
*/
40-
public class MatcherResponse
41+
public class MatcherResponse implements Closeable
4142
{
4243
private final Matcher<?> matcher;
4344
private final Multimap<Long, InetAddressAndPort> sendResponses =
@@ -182,7 +183,7 @@ public boolean test(Message message, InetAddressAndPort to)
182183
}
183184

184185
// create response asynchronously to match request/response communication execution behavior
185-
new Thread(() ->
186+
spy.responseExecutor.execute(() ->
186187
{
187188
Message<?> response = fnResponse.apply(message, to);
188189
if (response != null)
@@ -202,7 +203,7 @@ public boolean test(Message message, InetAddressAndPort to)
202203

203204
spy.matchingResponse(response);
204205
}
205-
}).start();
206+
});
206207

207208
return false;
208209
}
@@ -238,4 +239,11 @@ public void destroy()
238239
{
239240
MessagingService.instance().outboundSink.remove(sink);
240241
}
242+
243+
@Override
244+
public void close()
245+
{
246+
destroy();
247+
spy.close();
248+
}
241249
}

test/unit/org/apache/cassandra/net/MockMessagingService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.cassandra.net;
1919

2020
import java.net.UnknownHostException;
21+
import java.util.List;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2123
import java.util.function.Predicate;
2224

2325
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,12 +39,16 @@ private MockMessagingService()
3739
{
3840
}
3941

42+
private static final List<MatcherResponse> matcherResponses = new CopyOnWriteArrayList<>();
43+
4044
/**
4145
* Creates a MatcherResponse based on specified matcher.
4246
*/
4347
public static MatcherResponse when(Matcher matcher)
4448
{
45-
return new MatcherResponse(matcher);
49+
MatcherResponse matcherResponse = new MatcherResponse(matcher);
50+
matcherResponses.add(matcherResponse);
51+
return matcherResponse;
4652
}
4753

4854
/**
@@ -53,6 +59,11 @@ public static void cleanup()
5359
{
5460
MessagingService.instance().outboundSink.clear();
5561
MessagingService.instance().inboundSink.clear();
62+
for (MatcherResponse matcher : matcherResponses)
63+
{
64+
matcher.close();
65+
}
66+
matcherResponses.clear();
5667
}
5768

5869
/**

test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,30 @@ public void testRequestResponse() throws InterruptedException, ExecutionExceptio
5656
{
5757
// echo message that we like to mock as incoming response for outgoing echo message
5858
Message<NoPayload> echoMessage = Message.out(ECHO_REQ, NoPayload.noPayload);
59-
MockMessagingSpy spy = MockMessagingService
59+
try(MockMessagingSpy spy = MockMessagingService
6060
.when(
6161
all(
6262
to(FBUtilities.getBroadcastAddressAndPort()),
6363
verb(ECHO_REQ)
6464
)
6565
)
66-
.respond(echoMessage);
67-
68-
Message<NoPayload> echoMessageOut = Message.out(ECHO_REQ, NoPayload.noPayload);
69-
MessagingService.instance().sendWithCallback(echoMessageOut, FBUtilities.getBroadcastAddressAndPort(), msg ->
66+
.respond(echoMessage))
7067
{
71-
assertEquals(ECHO_REQ, msg.verb());
72-
assertEquals(echoMessage.payload, msg.payload);
73-
});
7468

75-
// we must have intercepted the outgoing message at this point
76-
Message<?> msg = spy.captureMessageOut().get();
77-
assertEquals(1, spy.messagesIntercepted());
78-
assertSame(echoMessage.payload, msg.payload);
69+
Message<NoPayload> echoMessageOut = Message.out(ECHO_REQ, NoPayload.noPayload);
70+
MessagingService.instance().sendWithCallback(echoMessageOut, FBUtilities.getBroadcastAddressAndPort(), msg ->
71+
{
72+
assertEquals(ECHO_REQ, msg.verb());
73+
assertEquals(echoMessage.payload, msg.payload);
74+
});
75+
76+
// we must have intercepted the outgoing message at this point
77+
Message<?> msg = spy.captureMessageOut().get();
78+
assertEquals(1, spy.messagesIntercepted());
79+
assertSame(echoMessage.payload, msg.payload);
7980

80-
// and return a mocked response
81-
Util.spinAssertEquals(1, spy::mockedMessageResponses, 60);
81+
// and return a mocked response
82+
Util.spinAssertEquals(1, spy::mockedMessageResponses, 60);
83+
}
8284
}
8385
}

test/unit/org/apache/cassandra/net/MockMessagingSpy.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222
import java.util.concurrent.BlockingQueue;
23-
import java.util.concurrent.Executor;
23+
import java.util.concurrent.ExecutorService;
2424
import java.util.concurrent.Executors;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +50,7 @@
5050
* @see MatcherResponse
5151
* @see MockMessagingService
5252
*/
53-
public class MockMessagingSpy
53+
public class MockMessagingSpy implements AutoCloseable
5454
{
5555
private static final Logger logger = LoggerFactory.getLogger(MockMessagingSpy.class);
5656

@@ -73,7 +73,9 @@ private void debugLog(String format, Object... args)
7373
private final BlockingQueue<Message<?>> interceptedMessages = newBlockingQueue();
7474
private final BlockingQueue<Message<?>> deliveredResponses = newBlockingQueue();
7575

76-
private static final Executor executor = Executors.newSingleThreadExecutor();
76+
private final ExecutorService executor = Executors.newSingleThreadExecutor();
77+
final ExecutorService responseExecutor = Executors.newFixedThreadPool(5);
78+
7779

7880
/**
7981
* Returns a future with the first mocked incoming message that has been created and delivered.
@@ -186,6 +188,13 @@ void matchingResponse(Message<?> response)
186188
deliveredResponses.add(response);
187189
}
188190

191+
@Override
192+
public void close()
193+
{
194+
executor.shutdown();
195+
responseExecutor.shutdown();
196+
}
197+
189198
private static class CapturedResultsFuture<T> extends AbstractFuture<List<T>> implements Runnable
190199
{
191200
private final int waitForResults;

0 commit comments

Comments
 (0)