fetch.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. #!/usr/bin/env python3
  2. """
  3. Pulse Fetch - Parallel URL fetching for Claude Code news digest.
  4. Uses asyncio + ThreadPoolExecutor to fetch multiple URLs via Firecrawl simultaneously.
  5. Outputs JSON with fetched content for LLM summarization.
  6. Usage:
  7. python fetch.py # Fetch all sources
  8. python fetch.py --sources blogs # Fetch only blogs
  9. python fetch.py --max-workers 20 # Increase parallelism
  10. python fetch.py --output pulse.json
  11. python fetch.py --discover-articles # Extract recent articles from blog homepages
  12. """
  13. import os
  14. import sys
  15. import json
  16. import re
  17. from datetime import datetime, timezone
  18. from concurrent.futures import ThreadPoolExecutor, as_completed
  19. from pathlib import Path
  20. from urllib.parse import urlparse, urljoin
  21. import argparse
  22. # Try to import firecrawl
  23. try:
  24. from firecrawl import FirecrawlApp
  25. FIRECRAWL_AVAILABLE = True
  26. except ImportError:
  27. FIRECRAWL_AVAILABLE = False
  28. print("Warning: firecrawl not installed. Install with: pip install firecrawl-py")
  29. # Sources configuration
  30. SOURCES = {
  31. "official": [
  32. {"name": "Anthropic Engineering", "url": "https://www.anthropic.com/engineering", "type": "blog"},
  33. {"name": "Claude Blog", "url": "https://claude.ai/blog", "type": "blog"},
  34. {"name": "Claude Code Docs", "url": "https://code.claude.com", "type": "docs"},
  35. ],
  36. "blogs": [
  37. {"name": "Simon Willison", "url": "https://simonwillison.net", "type": "blog"},
  38. {"name": "Every", "url": "https://every.to", "type": "blog"},
  39. {"name": "SSHH Blog", "url": "https://blog.sshh.io", "type": "blog"},
  40. {"name": "Lee Han Chung", "url": "https://leehanchung.github.io", "type": "blog"},
  41. {"name": "Nick Nisi", "url": "https://nicknisi.com", "type": "blog"},
  42. {"name": "HumanLayer", "url": "https://www.humanlayer.dev/blog", "type": "blog"},
  43. {"name": "Chris Dzombak", "url": "https://www.dzombak.com/blog", "type": "blog"},
  44. {"name": "GitButler", "url": "https://blog.gitbutler.com", "type": "blog"},
  45. {"name": "Docker Blog", "url": "https://www.docker.com/blog", "type": "blog"},
  46. {"name": "Nx Blog", "url": "https://nx.dev/blog", "type": "blog"},
  47. {"name": "Yee Fei Ooi", "url": "https://medium.com/@ooi_yee_fei", "type": "blog"},
  48. ],
  49. "community": [
  50. {"name": "SkillsMP", "url": "https://skillsmp.com", "type": "marketplace"},
  51. {"name": "Awesome Claude AI", "url": "https://awesomeclaude.ai", "type": "directory"},
  52. ],
  53. }
  54. # Relevance keywords for filtering
  55. RELEVANCE_KEYWORDS = [
  56. "claude", "claude code", "anthropic", "mcp", "model context protocol",
  57. "agent", "skill", "subagent", "cli", "terminal", "prompt engineering",
  58. "cursor", "windsurf", "copilot", "aider", "coding assistant", "hooks"
  59. ]
  60. # Patterns to identify article links in markdown content
  61. ARTICLE_LINK_PATTERNS = [
  62. # Standard markdown links with date-like paths
  63. r'\[([^\]]+)\]\((https?://[^\)]+/\d{4}/[^\)]+)\)',
  64. # Links with /blog/, /posts/, /p/ paths
  65. r'\[([^\]]+)\]\((https?://[^\)]+/(?:blog|posts?|p|articles?)/[^\)]+)\)',
  66. # Links with slugified titles (word-word-word pattern)
  67. r'\[([^\]]+)\]\((https?://[^\)]+/[\w]+-[\w]+-[\w]+[^\)]*)\)',
  68. ]
  69. # Exclude patterns (navigation, categories, tags, etc.)
  70. EXCLUDE_PATTERNS = [
  71. r'/tag/', r'/category/', r'/author/', r'/page/', r'/archive/',
  72. r'/about', r'/contact', r'/subscribe', r'/newsletter', r'/feed',
  73. r'/search', r'/login', r'/signup', r'/privacy', r'/terms',
  74. r'\.xml$', r'\.rss$', r'\.atom$', r'#', r'\?',
  75. ]
  76. def fetch_url_firecrawl(app: 'FirecrawlApp', source: dict) -> dict:
  77. """Fetch a single URL using Firecrawl API."""
  78. url = source["url"]
  79. name = source["name"]
  80. try:
  81. result = app.scrape(url, formats=['markdown'])
  82. # Handle both dict and object responses
  83. if hasattr(result, 'markdown'):
  84. markdown = result.markdown or ''
  85. metadata = result.metadata.__dict__ if hasattr(result.metadata, '__dict__') else {}
  86. else:
  87. markdown = result.get('markdown', '')
  88. metadata = result.get('metadata', {})
  89. return {
  90. "name": name,
  91. "url": url,
  92. "type": source.get("type", "unknown"),
  93. "status": "success",
  94. "content": markdown[:50000], # Limit content size
  95. "title": metadata.get('title', name),
  96. "description": metadata.get('description', ''),
  97. "fetched_at": datetime.utcnow().isoformat() + "Z",
  98. }
  99. except Exception as e:
  100. return {
  101. "name": name,
  102. "url": url,
  103. "type": source.get("type", "unknown"),
  104. "status": "error",
  105. "error": str(e),
  106. "fetched_at": datetime.utcnow().isoformat() + "Z",
  107. }
  108. def fetch_all_parallel(sources: list, max_workers: int = 10) -> list:
  109. """Fetch all URLs in parallel using ThreadPoolExecutor."""
  110. if not FIRECRAWL_AVAILABLE:
  111. print("Error: firecrawl not available")
  112. return []
  113. api_key = os.getenv('FIRECRAWL_API_KEY')
  114. if not api_key:
  115. print("Error: FIRECRAWL_API_KEY environment variable not set")
  116. return []
  117. app = FirecrawlApp(api_key=api_key)
  118. results = []
  119. total = len(sources)
  120. completed = 0
  121. print(f"Fetching {total} URLs with {max_workers} workers...")
  122. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  123. # Submit all tasks
  124. future_to_source = {
  125. executor.submit(fetch_url_firecrawl, app, source): source
  126. for source in sources
  127. }
  128. # Process results as they complete
  129. for future in as_completed(future_to_source):
  130. source = future_to_source[future]
  131. completed += 1
  132. try:
  133. result = future.result()
  134. results.append(result)
  135. status = "OK" if result["status"] == "success" else "FAIL"
  136. print(f"[{completed}/{total}] {status}: {source['name']}")
  137. except Exception as e:
  138. print(f"[{completed}/{total}] ERROR: {source['name']} - {e}")
  139. results.append({
  140. "name": source["name"],
  141. "url": source["url"],
  142. "status": "error",
  143. "error": str(e),
  144. })
  145. return results
  146. def extract_article_links(content: str, base_url: str, max_articles: int = 5) -> list:
  147. """Extract article links from markdown content."""
  148. articles = []
  149. seen_urls = set()
  150. base_domain = urlparse(base_url).netloc
  151. for pattern in ARTICLE_LINK_PATTERNS:
  152. matches = re.findall(pattern, content)
  153. for title, url in matches:
  154. # Skip if already seen
  155. if url in seen_urls:
  156. continue
  157. # Skip excluded patterns
  158. if any(re.search(exc, url, re.IGNORECASE) for exc in EXCLUDE_PATTERNS):
  159. continue
  160. # Ensure same domain or relative URL
  161. parsed = urlparse(url)
  162. if parsed.netloc and parsed.netloc != base_domain:
  163. continue
  164. # Clean up title
  165. title = title.strip()
  166. if len(title) < 5 or len(title) > 200:
  167. continue
  168. # Skip generic link text
  169. if title.lower() in ['read more', 'continue reading', 'link', 'here', 'click here']:
  170. continue
  171. seen_urls.add(url)
  172. articles.append({
  173. "title": title,
  174. "url": url,
  175. })
  176. return articles[:max_articles]
  177. def discover_articles(sources: list, max_workers: int = 10, max_articles_per_source: int = 5) -> list:
  178. """Fetch blog homepages and extract recent article links."""
  179. if not FIRECRAWL_AVAILABLE:
  180. print("Error: firecrawl not available")
  181. return []
  182. api_key = os.getenv('FIRECRAWL_API_KEY')
  183. if not api_key:
  184. print("Error: FIRECRAWL_API_KEY environment variable not set")
  185. return []
  186. # First, fetch all blog homepages
  187. print(f"Phase 1: Fetching {len(sources)} blog homepages...")
  188. homepage_results = fetch_all_parallel(sources, max_workers=max_workers)
  189. # Extract article links from each
  190. all_articles = []
  191. print(f"\nPhase 2: Extracting article links...")
  192. for result in homepage_results:
  193. if result["status"] != "success":
  194. continue
  195. content = result.get("content", "")
  196. base_url = result["url"]
  197. source_name = result["name"]
  198. articles = extract_article_links(content, base_url, max_articles=max_articles_per_source)
  199. print(f" {source_name}: found {len(articles)} articles")
  200. for article in articles:
  201. all_articles.append({
  202. "name": article["title"],
  203. "url": article["url"],
  204. "type": "article",
  205. "source_name": source_name,
  206. "source_url": base_url,
  207. })
  208. if not all_articles:
  209. print("No articles found to fetch")
  210. return homepage_results
  211. # Phase 3: Fetch individual articles
  212. print(f"\nPhase 3: Fetching {len(all_articles)} individual articles...")
  213. article_results = fetch_all_parallel(all_articles, max_workers=max_workers)
  214. # Add source info to results
  215. for i, result in enumerate(article_results):
  216. if i < len(all_articles):
  217. result["source_name"] = all_articles[i].get("source_name", "")
  218. result["source_url"] = all_articles[i].get("source_url", "")
  219. return article_results
  220. def filter_relevant_content(results: list) -> list:
  221. """Filter results to only those with Claude Code relevant content."""
  222. relevant = []
  223. for result in results:
  224. if result["status"] != "success":
  225. continue
  226. content = ((result.get("content") or "") + " " +
  227. (result.get("title") or "") + " " +
  228. (result.get("description") or "")).lower()
  229. # Check for relevance keywords
  230. for keyword in RELEVANCE_KEYWORDS:
  231. if keyword.lower() in content:
  232. result["relevant_keyword"] = keyword
  233. relevant.append(result)
  234. break
  235. return relevant
  236. def main():
  237. parser = argparse.ArgumentParser(description="Pulse Fetch - Parallel URL fetching")
  238. parser.add_argument("--sources", choices=["all", "official", "blogs", "community"],
  239. default="all", help="Source category to fetch")
  240. parser.add_argument("--max-workers", type=int, default=10,
  241. help="Maximum parallel workers (default: 10)")
  242. parser.add_argument("--output", "-o", type=str, default=None,
  243. help="Output JSON file (default: stdout)")
  244. parser.add_argument("--filter-relevant", action="store_true",
  245. help="Only include results with relevant keywords")
  246. parser.add_argument("--discover-articles", action="store_true",
  247. help="Extract and fetch individual articles from blog homepages")
  248. parser.add_argument("--max-articles-per-source", type=int, default=5,
  249. help="Max articles to fetch per source (default: 5)")
  250. args = parser.parse_args()
  251. # Collect sources based on selection
  252. if args.sources == "all":
  253. sources = []
  254. for category in SOURCES.values():
  255. sources.extend(category)
  256. else:
  257. sources = SOURCES.get(args.sources, [])
  258. if not sources:
  259. print(f"No sources found for category: {args.sources}")
  260. return 1
  261. # Fetch URLs - either discover articles or just fetch homepages
  262. if args.discover_articles:
  263. # Filter to only blog-type sources for article discovery
  264. blog_sources = [s for s in sources if s.get("type") == "blog"]
  265. if not blog_sources:
  266. print("No blog sources found for article discovery")
  267. return 1
  268. results = discover_articles(
  269. blog_sources,
  270. max_workers=args.max_workers,
  271. max_articles_per_source=args.max_articles_per_source
  272. )
  273. else:
  274. results = fetch_all_parallel(sources, max_workers=args.max_workers)
  275. # Filter if requested
  276. if args.filter_relevant:
  277. results = filter_relevant_content(results)
  278. print(f"\nFiltered to {len(results)} relevant results")
  279. # Prepare output
  280. output = {
  281. "fetched_at": datetime.utcnow().isoformat() + "Z",
  282. "total_sources": len(sources),
  283. "successful": len([r for r in results if r.get("status") == "success"]),
  284. "failed": len([r for r in results if r.get("status") != "success"]),
  285. "results": results,
  286. }
  287. # Output
  288. json_output = json.dumps(output, indent=2)
  289. if args.output:
  290. Path(args.output).write_text(json_output, encoding="utf-8")
  291. print(f"\nResults saved to: {args.output}")
  292. else:
  293. print("\n" + "=" * 60)
  294. print("RESULTS")
  295. print("=" * 60)
  296. print(json_output)
  297. # Summary
  298. print(f"\n{'=' * 60}")
  299. print(f"SUMMARY: {output['successful']}/{output['total_sources']} successful")
  300. print(f"{'=' * 60}")
  301. return 0
  302. if __name__ == "__main__":
  303. sys.exit(main())