-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathchunker.py
More file actions
206 lines (167 loc) · 7.29 KB
/
chunker.py
File metadata and controls
206 lines (167 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from __future__ import annotations
from typing import Protocol, runtime_checkable
from ...core.catalog import CatalogEntry, IndexedChunk, TextDocument
@runtime_checkable
class DocumentChunkerPort(Protocol):
"""
Interface for TextDocument → list[IndexedChunk] conversion.
Default implementation: RecursiveCharacterChunker
Advanced implementation: SemanticChunker (integrations/chunking/semantic_.py)
Custom implementation: any class satisfying this Protocol can be passed as splitter.
Example (wrapping LangChain)::
class LangChainChunkerAdapter:
def __init__(self, lc_splitter):
self._splitter = lc_splitter
def chunk(self, doc: TextDocument) -> list[IndexedChunk]:
texts = self._splitter.split_text(doc["content"])
return [
IndexedChunk(
chunk_id=f"{doc['id']}__{i}", text=t,
source_type="document", source_id=doc["id"],
chunk_index=i, metadata={"title": doc.get("title", "")},
)
for i, t in enumerate(texts)
]
retriever = VectorRetriever.from_sources(..., splitter=LangChainChunkerAdapter(...))
"""
def chunk(self, doc: TextDocument) -> list[IndexedChunk]: ...
class CatalogChunker:
"""
Converts a CatalogEntry into a list of IndexedChunks.
Solves the problem where a table with 100+ columns loses column-level
semantics when represented as a single vector.
Chunk 0 is a table header summary; subsequent chunks are column groups.
Each chunk's metadata preserves the full CatalogEntry so VectorRetriever
can reconstruct it on retrieval.
Args:
max_columns_per_chunk: Maximum columns per column-group chunk. Default 20.
"""
def __init__(self, max_columns_per_chunk: int = 20) -> None:
self._max_cols = max_columns_per_chunk
def split(self, catalog: list[CatalogEntry]) -> list[IndexedChunk]:
"""LangChain-style batch split: list input → list output."""
return [c for entry in catalog for c in self.chunk(entry)]
def chunk(self, entry: CatalogEntry) -> list[IndexedChunk]:
name = entry.get("name", "")
description = entry.get("description", "")
columns = entry.get("columns", {})
chunks: list[IndexedChunk] = []
# Chunk 0: table header
chunks.append(
IndexedChunk(
chunk_id=f"{name}__0",
text=f"{name}: {description}".strip(),
source_type="catalog",
source_id=name,
chunk_index=0,
metadata=dict(entry), # preserve full CatalogEntry for reconstruction
)
)
# Chunk 1+: column groups
col_items = list(columns.items())
for i, start in enumerate(range(0, len(col_items), self._max_cols)):
group = col_items[start : start + self._max_cols]
col_text = " ".join(f"{k} {v}" for k, v in group)
chunks.append(
IndexedChunk(
chunk_id=f"{name}__col_{i + 1}",
text=f"{name} columns: {col_text}",
source_type="catalog",
source_id=name,
chunk_index=i + 1,
metadata=dict(
entry
), # preserve full CatalogEntry in every column chunk
)
)
return chunks
class RecursiveCharacterChunker(DocumentChunkerPort):
"""
Hierarchical separator-based document chunker. No external dependencies.
Separator priority: ["\\n\\n", "\\n", ". ", " ", ""]
— tries paragraph → line → sentence → word boundaries in order.
Character-count-based so it works for both Korean and English
(unlike str.split() which assumes whitespace-delimited words).
For higher chunking quality, use SemanticChunker (integrations/chunking/semantic_.py).
Args:
chunk_size: Maximum characters per chunk. Default 1000.
chunk_overlap: Overlap characters between consecutive chunks. Default 100.
separators: Separator priority list. None uses the default list above.
"""
_DEFAULT_SEPARATORS = ["\n\n", "\n", ". ", " ", ""]
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 100,
separators: list[str] | None = None,
) -> None:
if chunk_overlap >= chunk_size:
raise ValueError(
f"chunk_overlap ({chunk_overlap}) must be less than chunk_size ({chunk_size})"
)
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._separators = separators or self._DEFAULT_SEPARATORS
def split(self, docs: list[TextDocument]) -> list[IndexedChunk]:
"""LangChain-style batch split: list input → list output."""
return [c for doc in docs for c in self.chunk(doc)]
def chunk(self, doc: TextDocument) -> list[IndexedChunk]:
content = doc.get("content", "")
if not content:
return []
raw_chunks = self._split(content, self._separators)
title = doc.get("title", "")
doc_id = doc.get("id", "")
return [
IndexedChunk(
chunk_id=f"{doc_id}__{i}",
text=f"{title}: {text}" if title else text,
source_type="document",
source_id=doc_id,
chunk_index=i,
metadata={
"id": doc_id,
"title": title,
"source": doc.get("source", ""),
},
)
for i, text in enumerate(raw_chunks)
]
def _split(self, text: str, separators: list[str]) -> list[str]:
"""Recursively try separators until all chunks fit within chunk_size."""
if not separators:
return [text] if text else []
chunks: list[str] = []
separator = separators[-1] # fallback: character-level split
for sep in separators:
if sep and sep in text:
separator = sep
break
parts = text.split(separator) if separator else list(text)
current = ""
for part in parts:
candidate = (
(current + separator + part).lstrip(separator) if current else part
)
if len(candidate) <= self._chunk_size:
current = candidate
else:
if current:
chunks.append(current)
# part itself exceeds chunk_size → recurse with finer separators
if len(part) > self._chunk_size and len(separators) > 1:
chunks.extend(self._split(part, separators[1:]))
current = ""
else:
current = part
if current:
chunks.append(current)
if self._chunk_overlap > 0 and len(chunks) > 1:
chunks = self._apply_overlap(chunks)
return chunks
def _apply_overlap(self, chunks: list[str]) -> list[str]:
overlapped = [chunks[0]]
for i in range(1, len(chunks)):
prev_tail = chunks[i - 1][-self._chunk_overlap :]
overlapped.append(prev_tail + chunks[i])
return overlapped