爬虫实战 批量下载环评受理公示

项目背景

作为环境工程从业者,我经常需要查阅各类环评报告作为学习参考资料。这些报告虽然都是全文公开的,但每次都要手动一个个查找下载非常耗时。最近我终于下定决心要解决这个问题,利用Python开发了一个自动化采集系统,能够批量下载环评公示报告。

技术实现

系统架构

这个爬虫系统主要包含以下几个功能模块:

  1. 分页检测模块:自动识别公告总页数
  2. 链接提取模块:从每页获取所有公告链接
  3. 内容解析模块:提取公告标题、日期、正文等关键信息
  4. 附件处理模块:智能识别并下载各类附件
  5. 本地存储模块:将内容保存为结构化的Markdown格式

核心功能亮点

  1. 智能分页处理:采用三重检测机制确保准确获取总页数
    • 解析分页DOM元素
    • 匹配页面文本中的页数信息
    • 自动探测最后一页
  2. 内容增强提取:通过多选择器组合确保内容获取成功率
  3. 附件智能命名:基于公告内容自动生成有意义的附件文件名
  4. 完善的错误处理:每个关键步骤都有异常捕获和重试机制

使用效果

经过实际测试,该系统能够:

  • 自动识别22页共计200+条公告
  • 准确提取每条公告的完整内容
  • 智能下载PDF、Word等各类附件
  • 按项目分类存储到本地目录

下载效果截图

优化方向

虽然当前版本已经能满足基本需求,但还有不少优化空间:

  1. 信息结构化:提取建设单位、项目地点等关键字段构建数据库
  2. 内容分析:对报告文本进行NLP处理提取关键信息
  3. 可视化展示:生成项目地理分布图、时间趋势图等
  4. 自动化更新:设置定时任务自动获取最新公告

法律声明

需要特别说明的是:

  1. 本工具仅用于个人学习研究
  2. 请控制采集频率,避免对服务器造成负担
  3. 不得将采集数据用于商业用途
  4. 请遵守网站robots.txt的相关规定

结语

通过这个项目,我深刻体会到Python在信息采集处理方面的强大能力。后续我计划继续优化这个工具,也欢迎有兴趣的朋友一起交流探讨。
最后需要说明的是每一个城市的网页都是不尽相同的,需要单独进行微调,我的代码也只是某一个城市的,不能通用。为了规避责任我隐去了网址。

附上完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
import os
import time
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import re
from datetime import datetime
import json

class EIAAnnouncementCrawler:
def __init__(self):
self.base_url = " 网址~脱敏 "
self.output_dir = "环评公告"
self.attachments_dir = os.path.join(self.output_dir, "附件")
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}

# 创建目录
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.attachments_dir, exist_ok=True)

# 统计信息
self.stats = {
'total_pages': 0,
'total_announcements': 0,
'successful_downloads': 0,
'failed_downloads': 0,
'attachments_downloaded': 0
}

def get_total_pages(self):
"""第一步:识别公告总共有多少页"""
try:
print("正在获取总页数...")
response = requests.get(self.base_url, headers=self.headers, timeout=15)
response.raise_for_status()
response.encoding = 'utf-8'

soup = BeautifulSoup(response.text, 'html.parser')

# 方法1:查找分页信息
pagination = soup.find('div', class_='pagination') or soup.find('div', class_='page')
if pagination:
page_links = pagination.find_all('a')
if page_links:
# 查找最后一个数字页码
page_numbers = []
for link in page_links:
text = link.get_text(strip=True)
if text.isdigit():
page_numbers.append(int(text))

if page_numbers:
total_pages = max(page_numbers)
print(f"通过分页信息找到总页数: {total_pages}")
return total_pages

# 方法2:查找页面中的总页数文本
page_text = soup.find(text=re.compile(r'共.*页'))
if page_text:
match = re.search(r'共(\d+)页', page_text)
if match:
total_pages = int(match.group(1))
print(f"通过文本匹配找到总页数: {total_pages}")
return total_pages

# 方法3:尝试访问第二页,看是否存在
test_url = urljoin(self.base_url, "index_1.html")
response = requests.get(test_url, headers=self.headers, timeout=10)
if response.status_code == 200:
# 如果第二页存在,继续尝试更多页
for i in range(2, 50): # 最多尝试50页
test_url = urljoin(self.base_url, f"index_{i}.html")
response = requests.get(test_url, headers=self.headers, timeout=5)
if response.status_code != 200:
total_pages = i
print(f"通过测试找到总页数: {total_pages}")
return total_pages

# 默认返回1页
print("无法确定总页数,默认返回1页")
return 1

except Exception as e:
print(f"获取总页数失败: {e}")
return 1

def get_page_url(self, page_num):
"""生成指定页面的URL"""
if page_num == 1:
return self.base_url
else:
return urljoin(self.base_url, f"index_{page_num-1}.html")

def extract_announcement_links(self, page_url):
"""从页面中提取所有公告链接"""
try:
response = requests.get(page_url, headers=self.headers, timeout=15)
response.raise_for_status()
response.encoding = 'utf-8'

soup = BeautifulSoup(response.text, 'html.parser')
links = []

# 查找公告列表
# 尝试多种可能的选择器
selectors = [
'ul.list-unstyled li a[href]',
'.news_list a[href]',
'.list a[href]',
'a[href*="html"]',
'a[href*="hpspgg"]'
]

for selector in selectors:
elements = soup.select(selector)
for element in elements:
href = element.get('href', '').strip()
if href and ('html' in href or 'hpspgg' in href):
# 构建完整URL
if href.startswith('http'):
full_url = href
else:
full_url = urljoin(page_url, href)

# 确保是公告链接
if 'hpspgg' in full_url and full_url not in links:
links.append(full_url)

if links:
break

return links

except Exception as e:
print(f"提取公告链接失败 {page_url}: {e}")
return []

def extract_announcement_content(self, url):
"""提取单个公告的内容"""
try:
response = requests.get(url, headers=self.headers, timeout=15)
response.raise_for_status()
response.encoding = 'utf-8'

soup = BeautifulSoup(response.text, 'html.parser')

# 提取标题
title = ""
title_selectors = ['h1', '.title', '.article-title', 'h2']
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
title = title_elem.get_text(strip=True)
break

if not title:
title = "无标题"

# 提取发布日期
publish_date = ""
date_selectors = ['.date', '.time', '.publish-date', '.article-date']
for selector in date_selectors:
date_elem = soup.select_one(selector)
if date_elem:
date_text = date_elem.get_text(strip=True)
# 提取日期格式
date_match = re.search(r'(\d{4}-\d{2}-\d{2}|\d{4}/\d{2}/\d{2}|\d{4}\.\d{2}\.\d{2})', date_text)
if date_match:
publish_date = date_match.group(1)
break

if not publish_date:
publish_date = datetime.now().strftime('%Y-%m-%d')

# 提取正文内容
content = ""
content_selectors = ['.content', '.article-content', '.main-content', '.text']
for selector in content_selectors:
content_elem = soup.select_one(selector)
if content_elem:
content = content_elem.get_text(strip=True, separator='\n')
break

if not content:
# 如果没有找到特定内容区域,尝试获取整个body
body = soup.find('body')
if body:
content = body.get_text(strip=True, separator='\n')

# 提取附件 - 优化版本
attachments = []
attachment_links = []

# 查找所有可能的附件链接
for a_tag in soup.find_all('a', href=True):
href = a_tag.get('href', '')
text = a_tag.get_text(strip=True)

# 检查是否是附件
if (href.endswith(('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.zip', '.rar')) or
'下载' in text or '附件' in text or '文件' in text):
attachment_url = urljoin(url, href)
attachment_links.append({
'url': attachment_url,
'text': text,
'href': href
})

# 智能处理附件信息
for i, link_info in enumerate(attachment_links, 1):
attachment_name = self.extract_attachment_name(link_info, title, i)
attachments.append({
'name': attachment_name,
'url': link_info['url'],
'filename': os.path.basename(link_info['href']),
'index': i
})

return {
'title': title,
'publish_date': publish_date,
'content': content,
'attachments': attachments,
'url': url
}

except Exception as e:
print(f"提取公告内容失败 {url}: {e}")
return None

def extract_attachment_name(self, link_info, announcement_title, index):
"""智能提取附件名称"""
text = link_info['text']
href = link_info['href']
original_filename = os.path.basename(href)

# 方法1:从链接文本中提取有意义的名称
if text and text != original_filename and len(text) > 2:
# 清理文本,移除常见的无用词汇
clean_text = re.sub(r'[下载|附件|文件|点击|查看]', '', text).strip()
if clean_text and len(clean_text) > 2:
return clean_text

# 方法2:从原始文件名中提取(去除扩展名)
if original_filename:
name_without_ext = os.path.splitext(original_filename)[0]
if name_without_ext and len(name_without_ext) > 2:
return name_without_ext

# 方法3:从公告标题中提取关键词作为附件名称
keywords = self.extract_keywords_from_title(announcement_title)
if keywords:
return f"{keywords}_附件{index}"

# 方法4:默认命名
return f"附件{index}"

def extract_keywords_from_title(self, title):
"""从公告标题中提取关键词"""
# 移除常见的无意义词汇
stop_words = ['关于', '的', '项目', '环评', '审批', '决定', '公告', '公示', '信息', '通知']

# 提取可能的关键词(公司名、项目名等)
keywords = []

# 查找公司名称模式
company_patterns = [
r'([^有限公司]+有限公司)',
r'([^公司]+公司)',
r'([^集团]+集团)',
r'([^科技]+科技)',
r'([^发展]+发展)'
]

for pattern in company_patterns:
matches = re.findall(pattern, title)
if matches:
keywords.extend(matches)

# 查找项目名称模式
project_patterns = [
r'([^项目]+项目)',
r'([^工程]+工程)',
r'([^建设]+建设)',
r'([^制造]+制造)',
r'([^加工]+加工)'
]

for pattern in project_patterns:
matches = re.findall(pattern, title)
if matches:
keywords.extend(matches)

# 清理和过滤关键词
clean_keywords = []
for keyword in keywords:
# 移除停用词
clean_keyword = keyword
for stop_word in stop_words:
clean_keyword = clean_keyword.replace(stop_word, '')

if clean_keyword and len(clean_keyword) > 1:
clean_keywords.append(clean_keyword)

# 返回最长的关键词(通常最有意义)
if clean_keywords:
return max(clean_keywords, key=len)

return None

def generate_attachment_filename(self, attachment_info, announcement_title, publish_date):
"""生成有意义的附件文件名"""
attachment_name = attachment_info['name']
original_filename = attachment_info['filename']
index = attachment_info['index']

# 获取文件扩展名
file_ext = os.path.splitext(original_filename)[1].lower()

# 从公告标题中提取简短标识
title_identifier = self.get_title_identifier(announcement_title)

# 生成文件名格式:日期_标题标识_附件名称_序号.扩展名
safe_attachment_name = re.sub(r'[<>:"/\\|?*]', '_', attachment_name)
safe_title_identifier = re.sub(r'[<>:"/\\|?*]', '_', title_identifier)

# 如果附件名称太长,截取前20个字符
if len(safe_attachment_name) > 20:
safe_attachment_name = safe_attachment_name[:20]

filename = f"{publish_date}_{safe_title_identifier}_{safe_attachment_name}_{index:02d}{file_ext}"

return filename

def get_title_identifier(self, title):
"""从标题中提取简短标识"""
# 提取公司名称或项目名称的前几个字符
if '有限公司' in title:
# 提取公司名称
company_match = re.search(r'([^有限公司]+)有限公司', title)
if company_match:
company_name = company_match.group(1)
if len(company_name) <= 10:
return company_name
else:
return company_name[:10]

# 如果没有公司名称,提取前几个字符
if len(title) <= 15:
return title
else:
return title[:15]

def download_attachment(self, attachment_info, announcement_title, publish_date):
"""下载附件并保存到本地"""
try:
url = attachment_info['url']

# 生成有意义的文件名
filename = self.generate_attachment_filename(attachment_info, announcement_title, publish_date)

# 创建以公告标题命名的子文件夹
safe_title = re.sub(r'[<>:"/\\|?*]', '_', announcement_title)
attachment_folder = os.path.join(self.attachments_dir, safe_title)
os.makedirs(attachment_folder, exist_ok=True)

filepath = os.path.join(attachment_folder, filename)

# 如果文件已存在,跳过下载
if os.path.exists(filepath):
print(f"附件已存在,跳过: {filename}")
return True

response = requests.get(url, headers=self.headers, timeout=10, stream=True)

if response.status_code == 200:
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"已下载附件: {filename}")
self.stats['attachments_downloaded'] += 1
return True
else:
print(f"附件下载失败: {url} (状态码: {response.status_code})")
return False
except Exception as e:
print(f"下载附件出错: {url} | 错误: {e}")
return False

def save_as_markdown(self, announcement_data):
"""保存公告内容为markdown格式"""
try:
title = announcement_data['title']
publish_date = announcement_data['publish_date']
content = announcement_data['content']
url = announcement_data['url']
attachments = announcement_data['attachments']

# 清理文件名
safe_title = re.sub(r'[<>:"/\\|?*]', '_', title)
filename = f"{publish_date}_{safe_title}.md"
filepath = os.path.join(self.output_dir, filename)

# 如果文件已存在,跳过
if os.path.exists(filepath):
print(f"文件已存在,跳过: {filename}")
return True

with open(filepath, 'w', encoding='utf-8') as f:
# 写入markdown格式
f.write(f"# {title}\n\n")
f.write(f"**发布日期:** {publish_date}\n\n")
f.write(f"**原文链接:** [{url}]({url})\n\n")
f.write("---\n\n")
f.write("## 公告内容\n\n")
f.write(content)
f.write("\n\n")

if attachments:
f.write("## 附件\n\n")
for i, attachment in enumerate(attachments, 1):
f.write(f"{i}. [{attachment['name']}]({attachment['url']})\n")
f.write("\n")

print(f"已保存为markdown: {filename}")
return True

except Exception as e:
print(f"保存markdown失败: {e}")
return False

def crawl_all_pages(self):
"""爬取所有页面的公告"""
print("开始爬取环评公告...")
print(f"基础URL: {self.base_url}")
print(f"输出目录: {self.output_dir}")
print(f"附件目录: {self.attachments_dir}")
print("-" * 50)

# 第一步:获取总页数
total_pages = self.get_total_pages()
self.stats['total_pages'] = total_pages
print(f"总页数: {total_pages}")
print("-" * 50)

all_announcements = []

# 第二步和第三步:从第一页开始依次处理
for page_num in range(1, total_pages + 1):
print(f"\n正在处理第 {page_num}/{total_pages} 页...")

# 获取当前页面的URL
page_url = self.get_page_url(page_num)
print(f"页面URL: {page_url}")

# 提取当前页面的所有公告链接
announcement_links = self.extract_announcement_links(page_url)
print(f"本页找到 {len(announcement_links)} 个公告链接")

# 处理每个公告
for i, link in enumerate(announcement_links, 1):
print(f"\n 处理公告 {i}/{len(announcement_links)}: {link}")

# 提取公告内容
announcement_data = self.extract_announcement_content(link)
if not announcement_data:
print(f" 提取内容失败,跳过")
self.stats['failed_downloads'] += 1
continue

# 保存为markdown格式
if self.save_as_markdown(announcement_data):
self.stats['successful_downloads'] += 1
all_announcements.append(announcement_data)
else:
self.stats['failed_downloads'] += 1

# 下载附件
if announcement_data['attachments']:
print(f" 发现 {len(announcement_data['attachments'])} 个附件")
for attachment in announcement_data['attachments']:
self.download_attachment(attachment, announcement_data['title'], announcement_data['publish_date'])

# 礼貌爬取,添加延迟
time.sleep(2)

# 页面间延迟
if page_num < total_pages:
time.sleep(3)

# 保存统计信息
self.save_statistics(all_announcements)

print("\n" + "=" * 50)
print("爬取完成!")
print(f"总页数: {self.stats['total_pages']}")
print(f"成功下载: {self.stats['successful_downloads']}")
print(f"失败下载: {self.stats['failed_downloads']}")
print(f"附件下载: {self.stats['attachments_downloaded']}")
print("=" * 50)

def save_statistics(self, announcements):
"""保存统计信息"""
stats_file = os.path.join(self.output_dir, "爬取统计.json")

stats_data = {
'crawl_time': datetime.now().isoformat(),
'statistics': self.stats,
'announcements': [
{
'title': ann['title'],
'publish_date': ann['publish_date'],
'url': ann['url'],
'attachments_count': len(ann['attachments'])
}
for ann in announcements
]
}

with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats_data, f, ensure_ascii=False, indent=2)

print(f"统计信息已保存到: {stats_file}")

def main():
crawler = EIAAnnouncementCrawler()
crawler.crawl_all_pages()

if __name__ == "__main__":
main()

爬虫实战 批量下载环评受理公示
https://maoyu92.github.io/2025/06/29/02 生态环境/爬虫实战 批量下载环评受理公示/
作者
陈文茂
发布于
2025年6月29日
许可协议