-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrawler.py
More file actions
302 lines (236 loc) · 10.2 KB
/
crawler.py
File metadata and controls
302 lines (236 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import argparse
import asyncio
import json
from pathlib import Path
from urllib.parse import urlparse
from crawl4ai import AsyncWebCrawler, CacheMode, CrawlerRunConfig
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, FilterChain, URLPatternFilter
from crawl4ai.models import CrawlResult
MIN_CONTENT_LENGTH: int = 100
async def crawl_urls(
urls: list[str],
use_cache: bool = False,
recursive: bool = False,
max_depth: int = 2,
recursive_scope: str | None = None,
output_dir: str = "output",
max_session_permit: int = 10,
memory_threshold_percent: float = 80.0,
) -> None:
"""Crawl URLs and save results to markdown files or a single JSON file."""
output_path = Path(output_dir)
is_json_output = output_path.suffix == ".json"
if is_json_output:
json_data: list[dict[str, str]] = []
else:
output_path.mkdir(exist_ok=True)
if not urls:
return
normalized_urls = [_normalize_url(url) for url in urls]
try:
if recursive:
async with AsyncWebCrawler() as crawler:
for url in normalized_urls:
config = _create_config(
recursive=recursive,
max_depth=max_depth,
use_cache=use_cache,
recursive_scope=recursive_scope,
start_url=url,
)
try:
result = await crawler.arun(url, config=config)
if isinstance(result, list):
for item in result:
_process_result(item, output_path, json_data if is_json_output else None)
else:
_process_result(result, output_path, json_data if is_json_output else None)
except KeyboardInterrupt:
print(f"\n⚠️ Crawling interrupted by user while processing {url}")
break
except Exception as exception:
print(f"⚠️ Error crawling {url}: {exception}")
continue
else:
config = _create_config(
recursive=recursive,
max_depth=max_depth,
use_cache=use_cache,
recursive_scope=recursive_scope,
start_url=normalized_urls[0],
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=memory_threshold_percent,
max_session_permit=max_session_permit,
)
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(normalized_urls, config=config, dispatcher=dispatcher)
for result in results:
_process_result(result, output_path, json_data if is_json_output else None)
if is_json_output and json_data:
_save_json(output_path, json_data)
except KeyboardInterrupt:
if is_json_output and json_data:
_save_json(output_path, json_data)
print("\n⚠️ Crawling interrupted by user")
except Exception as exception:
if is_json_output and json_data:
_save_json(output_path, json_data)
print(f"⚠️ Error while crawling {urls}: {exception}")
def main() -> None:
"""Parse command line arguments and start the crawling process."""
parser = argparse.ArgumentParser(description="Web crawler using crawl4ai")
parser.add_argument("url", nargs="?", help="URL to crawl")
parser.add_argument("-f", "--file", help="File with URLs")
parser.add_argument("-c", "--cache", action="store_true", help="Enable HTTP cache")
parser.add_argument("-r", "--recursive", action="store_true", help="Follow links recursively")
parser.add_argument("-d", "--depth", type=int, default=2, help="Max depth")
parser.add_argument(
"--scope",
"--only-under",
"--recursive-scope",
dest="recursive_scope",
help=(
"Only follow links under this prefix. "
"Examples: /products, products, site.com.br/products. "
"If omitted, recursive crawl stays under the start URL path automatically."
),
)
parser.add_argument("-o", "--output", default="output", help="Output directory or JSON file")
args = parser.parse_args()
if not args.url and not args.file:
parser.error("Either provide a URL or use -f")
urls = _load_urls(args.file) if args.file else [args.url]
try:
asyncio.run(
crawl_urls(
urls,
use_cache=args.cache,
recursive=args.recursive,
max_depth=args.depth,
recursive_scope=args.recursive_scope,
output_dir=args.output,
),
)
except KeyboardInterrupt:
print("\n⚠️ Interrupted by user. Exiting gracefully.")
except Exception as exception:
print(f"\n❌ Fatal error: {exception}")
def _create_config(
recursive: bool = False,
max_depth: int = 2,
use_cache: bool = False,
recursive_scope: str | None = None,
start_url: str | None = None,
) -> CrawlerRunConfig:
"""Create a CrawlerRunConfig with the specified options."""
cache_mode = CacheMode.ENABLED if use_cache else CacheMode.DISABLED
if recursive:
filter_chain = _create_recursive_filter_chain(recursive_scope, start_url)
return CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(
max_depth=max_depth,
include_external=False,
filter_chain=filter_chain,
),
scraping_strategy=LXMLWebScrapingStrategy(),
cache_mode=cache_mode,
stream=False,
verbose=True,
)
return CrawlerRunConfig(
cache_mode=cache_mode,
stream=False,
verbose=True,
)
def _process_result(result: CrawlResult, output_path: Path, json_data: list | None = None) -> None:
"""Process and save a crawl result if valid."""
if result.success:
markdown = _get_markdown_content(result)
if markdown and _has_minimal_content(markdown):
if json_data is not None:
_add_to_json(json_data, result.url, markdown)
else:
_save_page(output_path, result.url, markdown)
def _add_to_json(json_data: list, url: str, content: str) -> None:
"""Add a result to the JSON data."""
json_data.append(
{
"url": url,
"content": content,
},
)
def _save_json(output_path: Path, json_data: list) -> None:
"""Save the JSON data to a file."""
output_path.write_text(json.dumps(json_data, indent=2, ensure_ascii=False), encoding="utf-8")
def _load_urls(filepath: str) -> list[str]:
"""Load the URLs from a file."""
try:
with open(filepath, encoding="utf-8") as file:
return [line.strip() for line in file if line.strip() and not line.strip().startswith("#")]
except FileNotFoundError:
print(f"Error: File '{filepath}' not found.")
raise
except Exception as exception:
print(f"Error reading file '{filepath}': {exception}")
raise
def _normalize_url(url: str) -> str:
"""Add https:// if URL doesn't have a protocol."""
url = url.strip()
return url if url.startswith(("http://", "https://", "file://", "raw:")) else f"https://{url}"
def _create_recursive_filter_chain(recursive_scope: str | None, start_url: str | None) -> FilterChain:
"""Create deep-crawl filter chain based on an optional URL/path scope."""
if not recursive_scope:
if not start_url:
return FilterChain()
parsed_start = urlparse(start_url)
if not parsed_start.path or parsed_start.path == "/":
return FilterChain()
auto_scope = f"{parsed_start.scheme}://{parsed_start.netloc}{parsed_start.path}"
scope_pattern = _build_scope_pattern(auto_scope, start_url)
return FilterChain([URLPatternFilter(patterns=[scope_pattern])])
scope_pattern = _build_scope_pattern(recursive_scope, start_url)
return FilterChain([URLPatternFilter(patterns=[scope_pattern])])
def _build_scope_pattern(scope: str, start_url: str | None) -> str:
"""Build a URLPatternFilter-compatible prefix pattern ending with /*."""
raw_scope = scope.strip()
if raw_scope.startswith("/"):
prefix = raw_scope
elif raw_scope.startswith(("http://", "https://")):
parsed_scope = urlparse(raw_scope)
prefix = f"{parsed_scope.scheme}://{parsed_scope.netloc}{parsed_scope.path or '/'}"
elif "." in raw_scope:
normalized_scope = _normalize_url(raw_scope)
parsed_scope = urlparse(normalized_scope)
prefix = f"{parsed_scope.scheme}://{parsed_scope.netloc}{parsed_scope.path or '/'}"
else:
if not start_url:
prefix = f"/{raw_scope.lstrip('/')}"
else:
parsed_start = urlparse(start_url)
prefix = f"{parsed_start.scheme}://{parsed_start.netloc}/{raw_scope.lstrip('/')}"
if prefix != "/" and prefix.endswith("/"):
prefix = prefix[:-1]
return f"{prefix}/*"
def _get_filename(url: str) -> str:
"""Get the filename for the markdown file."""
path = urlparse(url).path.replace("/", "_").strip("_") or "index"
return f"{path}.md" if not path.endswith(".md") else path
def _get_markdown_content(result: CrawlResult) -> str | None:
"""Get the markdown content from the result."""
if not result.markdown:
return None
if isinstance(result.markdown, str):
return result.markdown
return result.markdown.raw_markdown
def _save_page(output_path: Path, url: str, content: str) -> None:
"""Save the markdown content to a file."""
filepath = output_path / _get_filename(url)
filepath.write_text(f"# {url}\n\n{content}", encoding="utf-8")
def _has_minimal_content(markdown: str) -> bool:
"""Check if the markdown content has a minimal length."""
return len(markdown.strip()) > MIN_CONTENT_LENGTH
if __name__ == "__main__":
main()