Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -84,8 +85,9 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientGrpc.class);
private static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100;
private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
Expand Down Expand Up @@ -124,7 +126,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new HashMap<>();
this.channels = new ConcurrentHashMap<>();
this.asyncStubs = new HashMap<>();
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
Expand Down Expand Up @@ -235,24 +237,39 @@ private boolean isConnected(ManagedChannel channel) {
* Closes all the communication channels of the client one-by-one.
* When a channel is closed, no further requests can be sent via the channel,
* and the method waits to finish all ongoing communication.
*
* Note: the method wait 1 hour per channel tops and if that is not enough
* to finish ongoing communication, then interrupts the connection anyway.
*/
@Override
public synchronized void close() {
public void close() {
closed = true;
for (ManagedChannel channel : channels.values()) {
channel.shutdownNow();
channel.shutdown();
}

final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
List<ManagedChannel> nonTerminatedChannels = new ArrayList<>(channels.values());

while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
if (nonTerminatedChannels.isEmpty()) {
break;
}
try {
channel.awaitTermination(60, TimeUnit.MINUTES);
Thread.sleep(SHUTDOWN_WAIT_INTERVAL_MILLIS);
} catch (InterruptedException e) {
LOG.error("InterruptedException while waiting for channel termination",
e);
// Re-interrupt the thread while catching InterruptedException
LOG.error("Interrupted while waiting for channels to terminate", e);
Thread.currentThread().interrupt();
break;
}
}

if (!nonTerminatedChannels.isEmpty()) {
List<DatanodeID> failedChannels = channels.entrySet().stream()
.filter(e -> !e.getValue().isTerminated())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
LOG.warn("Channels {} did not terminate within timeout.", failedChannels);
}
}

@Override
Expand Down
Loading