Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
*/
package org.knime.python3;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -105,8 +107,18 @@
}

private static class DummyPythonGateway implements PythonGateway<DummyEntryPoint> {
private final IOException m_closeException;

private boolean m_isClosed = false;

DummyPythonGateway() {
this(null);
}

DummyPythonGateway(final IOException closeException) {
m_closeException = closeException;
}

@Override
public DummyEntryPoint getEntryPoint() {
return null;
Expand All @@ -125,6 +137,9 @@
@Override
public void close() throws IOException {
m_isClosed = true;
if (m_closeException != null) {
throw m_closeException;
}
}

public boolean isClosed() {
Expand Down Expand Up @@ -198,4 +213,26 @@
TRACKER.onPythonGatewayCreationGateClose();
assertTrue(gateway.isClosed());
}

@SuppressWarnings("resource")
@Test
public void testTrackerClosesForCheckpoint() throws IOException {
final var gateway = new DummyPythonGateway();
TRACKER.createTrackedGateway(gateway);
TRACKER.clearForCheckpoint();
assertTrue(gateway.isClosed());

Check warning on line 223 in org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a message to this assertion.

See more on https://sonarcloud.io/project/issues?id=knime_knime-python&issues=AZ1KRXyeZbWaBRLAjddc&open=AZ1KRXyeZbWaBRLAjddc&pullRequest=88
}

@SuppressWarnings("resource")
@Test
public void testCheckpointCleanupPropagatesIOException() {
final var gateway = new DummyPythonGateway(new IOException("close failed"));
TRACKER.createTrackedGateway(gateway);

final var exception = assertThrows(IOException.class, TRACKER::clearForCheckpoint);

Check warning on line 232 in org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a message to this assertion.

See more on https://sonarcloud.io/project/issues?id=knime_knime-python&issues=AZ1KRXyeZbWaBRLAjddd&open=AZ1KRXyeZbWaBRLAjddd&pullRequest=88

assertTrue(gateway.isClosed());

Check warning on line 234 in org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a message to this assertion.

See more on https://sonarcloud.io/project/issues?id=knime_knime-python&issues=AZ1KRXyeZbWaBRLAjdde&open=AZ1KRXyeZbWaBRLAjdde&pullRequest=88
assertEquals("Aborting Python process failed.", exception.getMessage());

Check warning on line 235 in org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a message to this assertion.

See more on https://sonarcloud.io/project/issues?id=knime_knime-python&issues=AZ1KRXyeZbWaBRLAjddf&open=AZ1KRXyeZbWaBRLAjddf&pullRequest=88
assertEquals("close failed", exception.getCause().getMessage());

Check warning on line 236 in org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a message to this assertion.

See more on https://sonarcloud.io/project/issues?id=knime_knime-python&issues=AZ1KRXyeZbWaBRLAjddg&open=AZ1KRXyeZbWaBRLAjddg&pullRequest=88
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import org.knime.core.checkpoint.PhasedInit;
import org.knime.core.checkpoint.PhasedInitSupport;
import org.knime.core.node.NodeLogger;
import org.knime.python3.PythonGatewayCreationGate.PythonGatewayCreationGateListener;

Expand All @@ -79,6 +83,18 @@ public final class PythonGatewayTracker implements PythonGatewayCreationGateList

private PythonGatewayTracker() {
m_openGateways = gatewaySet();
// Support CRaC (Coordinated Restore at Checkpoint) and close all connections prior to checkpointing
PhasedInitSupport.registerOrActivate(new PhasedInit<RuntimeException>() {
@Override
public void beforeCheckpoint() throws RuntimeException {
try {
clearForCheckpoint();
} catch (IOException ex) {
throw new UncheckedIOException(
"Error when forcefully terminating Python processes before checkpointing", ex);
}
}
});
Comment on lines +87 to +97
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are race conditions on checkpoint/restore, which should not happen by design. Restore willl be a controlled phase, and checkpoint will only happen when the init of all bundles has stabilized (but no workflows have been executed). So, leaving unchanged!

}

/**
Expand Down Expand Up @@ -107,15 +123,30 @@ public void onPythonGatewayCreationGateClose() {
}

void clear() throws IOException {
clear(LOGGER::error,
"Found running Python processes (%d). Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.");
}

void clearForCheckpoint() throws IOException {
clear(LOGGER::info,
"Found running Python processes (%d). Aborting them prior to checkpointing. Triggered from thread '%s'.");
}

/**
* Closes all open gateways and logs a message using the provided consumer.
*
* @param logMessageConsumer consumer for logging messages
* @param logMessage message format string with placeholders for gateway count and thread name
* @throws IOException if an error occurs while closing the gateways
*/
private void clear(final Consumer<String> logMessageConsumer, final String logMessage) throws IOException {
if (m_openGateways.isEmpty()) {
return;
}

LOGGER.errorWithFormat(
"Found running Python processes (%d). Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.",
m_openGateways.size(), Thread.currentThread().getName());
logMessageConsumer.accept(String.format(logMessage, m_openGateways.size(), Thread.currentThread().getName()));

var exceptions = new ArrayList<Exception>();
for (var gateway : m_openGateways) {
Expand Down