Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
69f24ae
feat: support partial writes
alespour May 5, 2026
f5d0ee5
test: fix E501
alespour May 5, 2026
9b0efb4
test: fix stale test
alespour May 5, 2026
00d49a2
fix: docstring
alespour May 5, 2026
b514d94
refactor: remove redundant try-catch-raise
alespour May 5, 2026
8de9875
fix: preserve ApiException.message for v3-on-v2 compatibility error
alespour May 6, 2026
9bae3f7
refactor: avoid double parsing when building partial write error
alespour May 6, 2026
f387a7f
test: add more coverage for exceptions
alespour May 6, 2026
7754681
refactor: error parsing simplified
alespour May 6, 2026
728ca71
test: refactor for higher-level testing
alespour May 6, 2026
3c9251c
refactor: simplify partial write error parsing guards
alespour May 6, 2026
3818093
fix: resolve write option kwargs before post_write call
alespour May 6, 2026
04b9d31
Merge branch 'main' into feat/partial-writes
alespour May 6, 2026
5c91d25
docs: update CHANGELOG
alespour May 7, 2026
2b59836
fix: use V2 API by default
alespour May 18, 2026
8051335
test: fix formatting
alespour May 18, 2026
0060be7
feat: add kwargs handler for write_no_sync
alespour May 18, 2026
d300ef8
docs: update CHANGELOG
alespour May 18, 2026
57a26a4
test: improve coverage
alespour May 18, 2026
1d0134e
test: fix E241 linter complaint
alespour May 18, 2026
9131053
test: add coverage
alespour May 18, 2026
1191884
fix: option default
alespour May 18, 2026
39941eb
fix: options in batching
alespour May 18, 2026
19659d6
docs: align use_v2_api config examples for V3 feature usage
alespour May 18, 2026
0f8c7e6
fix: default use_v2_api=True in 405 error handler path
alespour May 18, 2026
c3f5407
fix: translate async write errors consistently
alespour May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Features

1. [#213](https://github.com/InfluxCommunity/influxdb3-python/pull/213): Add partial write support and default writes to the V2 API endpoint.
See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more.
`no_sync` requires `use_v2_api=False`; `accept_partial` applies only to V3 API endpoint writes.
1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`.

### Bug Fixes
Expand Down
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,56 @@ client.write_dataframe(
)
```

### Accept partial writes and inspect failed lines
`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines.
On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`.

```python
from influxdb_client_3 import InfluxDBClient3
from influxdb_client_3.exceptions import InfluxDBPartialWriteError

client = InfluxDBClient3(
host="http://localhost:8181",
token="token",
database="db",
write_use_v2_api=False,
)
lp = "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200"

try:
client.write(lp) # accept_partial=True by default on V3 API endpoint
except InfluxDBPartialWriteError as e:
for line_err in e.line_errors:
print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})")
```

Disable partial writes:
```python
client = InfluxDBClient3(
host="http://localhost:8181",
token="token",
database="db",
write_use_v2_api=False,
write_accept_partial=False,
)
```

### Compatibility with InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless
Writes use the V2 API endpoint by default, so no additional configuration is required for these products.

`use_v2_api` can be configured by:
- `WriteOptions(use_v2_api=False)` (for V3 API endpoint features)
- constructor kwarg: `write_use_v2_api=False`
- env var: `INFLUX_WRITE_USE_V2_API=false`

When `use_v2_api=True`:
- `accept_partial` is not used
- `no_sync=True` is invalid and rejected before dispatch with:
`invalid write options: no_sync cannot be used with use_v2_api`

To use `no_sync` or `accept_partial` controls, set `use_v2_api=False`
(for example with InfluxDB 3 Core/Enterprise).

## Querying

### Querying with SQL
Expand Down
55 changes: 42 additions & 13 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL"
INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API"
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
Expand Down Expand Up @@ -155,19 +157,23 @@ def _parse_gzip_threshold(threshold: str) -> int:
return threshold


def _parse_write_no_sync(write_no_sync: str):
def _parse_write_bool(value):
"""
Parses and validates the provided write no sync value.
Parses a truthy/falsy value for write options.

This function ensures that the given value is a valid boolean,
and it raises an appropriate error if the value is not valid.
The input is normalized to string and matched against common truthy values.
Any non-truthy value is treated as False.

:param write_no_sync: The input value to be parsed and validated.
:type write_no_sync: Any
:return: The validated write no sync value as an boolean.
:param value: The input value to be parsed and validated.
:type value: Any
:return: Parsed boolean value.
:rtype: bool
"""
return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes']
return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']


def _parse_write_no_sync(write_no_sync: str):
return _parse_write_bool(write_no_sync)


def _parse_timeout(to: str) -> int:
Expand Down Expand Up @@ -233,6 +239,9 @@ def __init__(
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
:key bool write_accept_partial: allow partial writes when some lines fail.
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
:key bool write_no_sync: disable sync confirmation on V3 API endpoint writes.
:key list[str] profilers: list of enabled Flux profilers
"""
self._org = org if org is not None else "default"
Expand All @@ -243,22 +252,37 @@ def __init__(
write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
write_no_sync = DefaultWriteOptions.no_sync.value
write_accept_partial = DefaultWriteOptions.accept_partial.value
write_use_v2_api = DefaultWriteOptions.use_v2_api.value
write_timeout = DefaultWriteOptions.timeout.value

if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial)
write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api)
write_timeout = getattr(write_opts, 'timeout', write_timeout)

if kw_keys.__contains__('write_timeout'):
write_timeout = kwargs.get('write_timeout')

if kw_keys.__contains__('write_accept_partial'):
write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial'))

if kw_keys.__contains__('write_use_v2_api'):
write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api'))

if kw_keys.__contains__('write_no_sync'):
write_no_sync = _parse_write_bool(kwargs.get('write_no_sync'))

write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
no_sync=write_no_sync,
accept_partial=write_accept_partial,
use_v2_api=write_use_v2_api,
)

self._write_client_options = {
Expand Down Expand Up @@ -347,7 +371,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
if write_no_sync is not None:
write_options.no_sync = _parse_write_no_sync(write_no_sync)
write_options.no_sync = _parse_write_bool(write_no_sync)

write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL)
if write_accept_partial is not None:
write_options.accept_partial = _parse_write_bool(write_accept_partial)

write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API)
if write_use_v2_api is not None:
write_options.use_v2_api = _parse_write_bool(write_use_v2_api)

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
Expand Down Expand Up @@ -402,10 +434,7 @@ def write(self, record=None, database=None, **kwargs):
if database is None:
database = self._database

try:
return self._write_api.write(bucket=database, record=record, **kwargs)
except InfluxDBError as e:
raise e
return self._write_api.write(bucket=database, record=record, **kwargs)

def write_dataframe(
self,
Expand Down
3 changes: 2 additions & 1 deletion influxdb_client_3/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa

from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError
from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError, InfluxDBPartialWriteError, \
InfluxDBPartialWriteLineError
172 changes: 153 additions & 19 deletions influxdb_client_3/exceptions/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Exceptions utils for InfluxDB."""

import json
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple

from urllib3 import HTTPResponse

Expand Down Expand Up @@ -39,6 +42,107 @@ def __init__(self, error_message, *args, **kwargs):
self.message = error_message


def _is_partial_write_error(error_message) -> bool:
if not isinstance(error_message, str) or not error_message:
return False
normalized = error_message.lower()
return (
"partial write of line protocol occurred" in normalized or
"parsing failed for write_lp endpoint" in normalized
)


def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]:
if item is None:
return None
if not isinstance(item, dict):
raise ValueError("array item is not an object")

error_message = item.get("error_message")
if not isinstance(error_message, str):
raise ValueError("error_message must be string")
if not error_message:
return None

line_number_raw = item.get("line_number")
if line_number_raw is None:
line_number = 0
elif isinstance(line_number_raw, int):
line_number = line_number_raw
else:
raise ValueError("line_number must be int")

original_line_raw = item.get("original_line")
if original_line_raw is None:
original_line = ""
elif isinstance(original_line_raw, str):
original_line = original_line_raw
else:
raise ValueError("original_line must be string")

return error_message, line_number, original_line


def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str]]]:
if not isinstance(data, list):
return None
line_errors: List[Tuple[str, int, str]] = []
try:
for item in data:
parsed = _parse_partial_write_data_item(item)
if parsed is None:
continue
line_errors.append(parsed)
except ValueError:
return None
return line_errors if len(line_errors) > 0 else None


def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]:
try:
return _parse_partial_write_data_item(data)
except ValueError:
return None


def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]:
details: List[str] = []
for error_message, line_number, original_line in line_errors:
if line_number != 0:
if original_line != "":
details.append(f"\tline {line_number}: {error_message} ({original_line})")
else:
details.append(f"\tline {line_number}: {error_message}")
elif error_message:
details.append(f"\t{error_message}")
return details


def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str]], List[str]]:
if data is None:
return [], []

typed_array = _parse_typed_partial_write_array(data)
if typed_array is not None:
return typed_array, _format_partial_write_details(typed_array)

if isinstance(data, list):
details: List[str] = []
for item in data:
if item is None:
continue
raw = json.dumps(item, separators=(',', ':'))
if raw and raw.lower() != "null":
details.append(raw)
return [], details

typed_single = _parse_typed_partial_write_object_or_none(data)
if typed_single is not None:
return [typed_single], _format_partial_write_details([typed_single])

return [], []


# This error is for all write operations
class InfluxDBError(InfluxDB3ClientError):
"""Raised when a server error occurs."""
Expand All @@ -56,10 +160,7 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
super().__init__(self.message)

def _get_message(self, response):
# Body
if response.data:
import json

def get(d, key):
if not key or d is None:
return d
Expand All @@ -80,23 +181,15 @@ def get(d, key):
# "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ]
# }
error_text = node.get("error")
data = node.get("data")
if error_text and isinstance(data, list):
details = []
for item in data:
if not isinstance(item, dict):
continue
line_number = item.get("line_number")
error_message = item.get("error_message")
original_line = item.get("original_line")
if line_number is not None and error_message and original_line:
details.append(
f"\tline {line_number}: {error_message} ({original_line})"
)
elif error_message:
details.append(f"\t{error_message}")
if error_text and _is_partial_write_error(error_text):
_, details = _parse_partial_write_line_error_info(node.get("data"))
if details:
return error_text + ":\n" + "\n".join(details)
return error_text + ":\n" + "\n".join(
detail if detail.startswith("\t") else f"\t{detail}"
for detail in details
)
return error_text
if error_text:
return error_text
for key in [['message'], ['data', 'error_message'], ['error']]:
value = get(node, key)
Expand All @@ -119,3 +212,44 @@ def get(d, key):
def getheaders(self):
"""Helper method to make response headers more accessible."""
return self.response.getheaders()


@dataclass(frozen=True)
class InfluxDBPartialWriteLineError:
line_number: int
error_message: str
original_line: str


class InfluxDBPartialWriteError(InfluxDBError):
"""Structured partial-write error with per-line failures."""

def __init__(self, response: HTTPResponse, line_errors: List[InfluxDBPartialWriteLineError]):
super().__init__(response=response)
self.line_errors = line_errors

@classmethod
def from_response(cls, response: HTTPResponse):
if response is None or not response.data:
return None
try:
node = json.loads(response.data)
except Exception:
return None
if not isinstance(node, dict):
return None
error_text = node.get("error")
if not _is_partial_write_error(error_text):
return None
parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data"))
if not parsed_line_errors:
return None
line_errors = [
InfluxDBPartialWriteLineError(
line_number=line_number,
error_message=error_message,
original_line=original_line,
)
for error_message, line_number, original_line in parsed_line_errors
]
return cls(response=response, line_errors=line_errors)
Loading
Loading