Skip to content
Open
54 changes: 42 additions & 12 deletions mssql_python/pybind/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,16 @@ void Connection::setAutocommit(bool enable) {
}
SQLINTEGER value = enable ? SQL_AUTOCOMMIT_ON : SQL_AUTOCOMMIT_OFF;
LOG("Setting autocommit=%d", enable);
SQLRETURN ret =
SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_AUTOCOMMIT,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(value)), 0);
SQLRETURN ret;
{
// Release the GIL during the blocking ODBC call. Holding the GIL
// here can deadlock when the network path goes through another
// Python thread (e.g. an in-process SSH tunnel via paramiko +
// sshtunnel), since that thread also needs the GIL to run.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_AUTOCOMMIT,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(value)), 0);
}
checkError(ret);
if (value == SQL_AUTOCOMMIT_ON) {
LOG("Autocommit enabled");
Expand Down Expand Up @@ -296,9 +303,15 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {
// Get the integer value
int64_t longValue = value.cast<int64_t>();

SQLRETURN ret = SQLSetConnectAttr_ptr(
_dbcHandle->get(), attribute,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(longValue)), SQL_IS_INTEGER);
SQLRETURN ret;
{
// Release the GIL around the ODBC call for consistency with the
// other connection-attribute paths; some attributes can block.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(
_dbcHandle->get(), attribute,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(longValue)), SQL_IS_INTEGER);
}

if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set integer attribute=%d, ret=%d", attribute, ret);
Expand Down Expand Up @@ -342,7 +355,11 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {
length = static_cast<SQLINTEGER>(this->wstrStringBuffer.length() * sizeof(SQLWCHAR));
#endif

SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
SQLRETURN ret;
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set string attribute=%d, ret=%d", attribute, ret);
} else {
Expand All @@ -361,7 +378,11 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {
SQLPOINTER ptr = const_cast<char*>(this->strBytesBuffer.c_str());
SQLINTEGER length = static_cast<SQLINTEGER>(this->strBytesBuffer.size());

SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
SQLRETURN ret;
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set binary attribute=%d, ret=%d", attribute, ret);
} else {
Expand Down Expand Up @@ -412,8 +433,14 @@ bool Connection::reset() {
ThrowStdException("Connection handle not allocated");
}
LOG("Resetting connection via SQL_ATTR_RESET_CONNECTION");
SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_RESET_CONNECTION,
(SQLPOINTER)SQL_RESET_CONNECTION_YES, SQL_IS_INTEGER);
SQLRETURN ret;
{
// Release the GIL around the ODBC call for consistency with the
// other connection-attribute paths; some attributes can block.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_RESET_CONNECTION,
(SQLPOINTER)SQL_RESET_CONNECTION_YES, SQL_IS_INTEGER);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to reset connection (ret=%d). Marking as dead.", ret);
return false;
Expand All @@ -423,8 +450,11 @@ bool Connection::reset() {
// Explicitly reset it to the default (SQL_TXN_READ_COMMITTED) to prevent
// isolation level settings from leaking between pooled connection usages.
LOG("Resetting transaction isolation level to READ COMMITTED");
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_TXN_ISOLATION,
(SQLPOINTER)SQL_TXN_READ_COMMITTED, SQL_IS_INTEGER);
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_TXN_ISOLATION,
(SQLPOINTER)SQL_TXN_READ_COMMITTED, SQL_IS_INTEGER);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to reset transaction isolation level (ret=%d). Marking as dead.", ret);
return false;
Expand Down
75 changes: 49 additions & 26 deletions mssql_python/pybind/connection/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::wstring& connStr,
std::vector<std::shared_ptr<Connection>> to_disconnect;
std::shared_ptr<Connection> valid_conn = nullptr;
bool needs_connect = false;

// Phase 1: Prune stale connections (under mutex — no ODBC calls).
{
std::lock_guard<std::mutex> lock(_mutex);
auto now = std::chrono::steady_clock::now();
size_t before = _pool.size();

// Phase 1: Remove stale connections, collect for later disconnect
_pool.erase(std::remove_if(_pool.begin(), _pool.end(),
[&](const std::shared_ptr<Connection>& conn) {
auto idle_time =
Expand All @@ -38,40 +39,62 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::wstring& connStr,
_pool.end());

size_t pruned = before - _pool.size();
// Decrement _current_size eagerly so new slots can be reserved while
// stale connections are being disconnected (Phase 4). This means
// _current_size tracks *reserved capacity* (pooled + checked-out +
// in-flight new), not necessarily live ODBC handles.
_current_size = (_current_size >= pruned) ? (_current_size - pruned) : 0;
}
Comment thread
saurabh500 marked this conversation as resolved.

// Phase 2: Attempt to reuse healthy connections
while (!_pool.empty()) {
auto conn = _pool.front();
_pool.pop_front();
if (conn->isAlive()) {
if (!conn->reset()) {
to_disconnect.push_back(conn);
--_current_size;
continue;
// Phase 2: Pop one candidate at a time and validate it outside the
// mutex. isAlive() and reset() perform ODBC calls that release the
// GIL; calling them while holding the mutex would create a mutex/GIL
// lock-ordering deadlock when multiple threads acquire concurrently.
while (true) {
std::shared_ptr<Connection> candidate;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_pool.empty()) {
// No more candidates — try to reserve a slot for a new connection.
if (_current_size < _max_size) {
valid_conn = std::make_shared<Connection>(connStr, true);
++_current_size;
needs_connect = true;
} else {
// NOTE: Another thread may be validating a popped candidate
// outside the mutex right now. If that candidate fails, a
// slot will open up — but we can't wait for it here without
// adding a condition-variable retry loop. This is an
// acceptable trade-off: transient "pool full" errors under
// heavy contention are rare and callers can retry.
throw std::runtime_error("ConnectionPool::acquire: pool size limit reached");
}
valid_conn = conn;
break;
} else {
to_disconnect.push_back(conn);
--_current_size;
}
candidate = _pool.front();
_pool.pop_front();
}

// Validate the candidate outside the mutex.
try {
if (candidate->isAlive() && candidate->reset()) {
valid_conn = candidate;
break;
}
} catch (const std::exception& ex) {
LOG("Candidate connection validation failed: %s", ex.what());
}

// Reserve a slot for a new connection if none reusable.
// The actual connect() call happens outside the mutex to avoid
// holding the mutex during the blocking ODBC call (which releases
// the GIL and could otherwise cause a mutex/GIL deadlock).
if (!valid_conn && _current_size < _max_size) {
valid_conn = std::make_shared<Connection>(connStr, true);
++_current_size;
needs_connect = true;
} else if (!valid_conn) {
throw std::runtime_error("ConnectionPool::acquire: pool size limit reached");
// Candidate is dead or reset failed — mark for disconnect and
// decrement the pool size.
to_disconnect.push_back(candidate);
{
std::lock_guard<std::mutex> lock(_mutex);
if (_current_size > 0) --_current_size;
}
}

// Phase 2.5: Connect the new connection outside the mutex.
// Phase 3: Connect the new connection outside the mutex.
if (needs_connect) {
try {
valid_conn->connect(attrs_before);
Expand All @@ -85,7 +108,7 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::wstring& connStr,
}
}

// Phase 3: Disconnect expired/bad connections outside lock
// Phase 4: Disconnect expired/bad connections outside lock.
for (auto& conn : to_disconnect) {
try {
conn->disconnect();
Expand Down
Loading
Loading