Python入门教程:实战示例 - 爬虫解析文件中的队列与线程协同操作
最编程
2024-07-25 21:50:04
...
1 import requests
2 from queue import Queue
3 from threading import Thread
4 from lxml import etree
5 import json
6
7 CRAW_EXIT = True
8 PARSE_EXIT = True
9
10 class ThreadCrawl(Thread):
11 def __init__(self,threadName,pageQueque,dataQueque):
12 # 调用父类初始化方法
13 super(ThreadCrawl,self).__init__()
14 self.threadName=threadName # 线程名
15 self.pageQueque=pageQueque # 页码队列
16 self.dataQueque=dataQueque # 数据队列
17
18 def run(self):
19 print('启动'+self.threadName)
20 while CRAW_EXIT:
21 try:
22 # 取出一个数字,先进先出
23 # 可选参数block,默认值为True
24 # 1. 如果对列为空,block为True的话,不会结束,会进入阻塞状态,直到队列有新的数据
25 # 2. 如果队列为空,block为False的话,就弹出一个Queue.empty()异常
26 page=self.pageQueque.get(block=False)
27 # print(self.pageQueque.qsize())
28 url = 'http://www.waduanzi.com/page/'+str(page)
29 # url = 'http://www.waduanzi.com'
30 header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"}
31 content=requests.get(url,headers=header).text
32 self.dataQueque.put(content)
33 # print(content)
34 # print(len(content))
35 except:
36 pass
37
38
39 class ThreadParse(Thread):
40 def __init__(self,parseName,dataQueque,lock):
41 super(ThreadParse,self).__init__()
42 self.parseName=parseName
43 self.dataQueque=dataQueque
44 self.lock=lock # 线程锁
45
46 def run(self):
47 print('启动'+self.parseName)
48 while PARSE_EXIT:
49 try:
50 html=self.dataQueque.get(False)
51 self.parse(html)
52 except:
53 pass
54
55 def parse(self,html):
56 # 解析为HTML DOM
57 text=etree.HTML(html)
58 node = "//div[contains(@class,'panel panel20')]"
59 node_list = text.xpath(node)
60
61 for node in node_list:
62 # xpath返回的列表,这个列表就这一个参数,用索引方式取出来,用户名
63 img = node.xpath("./div[1]/img/@src")[0]
64 title = node.xpath("./div[2]/h2/a/@title")[0]
65 # 取出标签下的内容,段子内容
66 # content = node.xpath("./div[2]/div")[0].text //遇到有 和<br>后,文本内容没有取全
67 content = node.xpath("normalize-space(./div[2]/div)") # normalize-space()遇到有 和<br>后,文本内容正常取
68 zan = node.xpath("./div[3]/ul/li[1]/a")[0].text
69 buzan = node.xpath("./div[3]/ul/li[2]/a")[0].text
70
71 items = {
72 "img": img,
73 "title": title,
74 "content": content.replace("\xa0", ""),
75 "zan": zan,
76 "buzan": buzan,
77 }
78
79 self.lock.acquire() #锁定
80 with open("waduanzi.json", "a") as f:
81 f.write(json.dumps(items,ensure_ascii=False)+",\n")
82 self.lock.release() #释放
83
84
85 def main():
86 # 页码的队列,表示10个页面
87 pageQueque=Queue(10)
88 for i in range(1,11):
89 pageQueque.put(i)
90
91 # 采集结果(每页的HTML源码)的数据队列,参数为空表示不限制
92 dataQueque=Queue(0)
93
94 import threading
95 lock=threading.Lock() #创建锁
96
97 # 三个采集线程的名字
98 crawList=['采集线程1','采集线程2','采集线程3']
99 # 存储三个采集线程的列表集合
100 threadcrawl=[]
101 for threadName in crawList:
102 thread=ThreadCrawl(threadName,pageQueque,dataQueque)
103 thread.start()
104 threadcrawl.append(thread)
105
106 # 三个解析线程的名字
107 parseList=['解析县城1','解析线程2','解析线程3']
108 # 存储三个解析线程
109 threadparse=[]
110 for parseName in parseList:
111 thread=ThreadParse(parseName,dataQueque,lock)
112 thread.start()
113 threadparse.append(thread)
114
115 # 等待pageQueue队列为空,也就是等待之前的操作执行完毕
116 while not pageQueque.empty():
117 pass
118
119 # 如果pageQueue为空,采集线程退出循环
120 global CRAW_EXIT
121 CRAW_EXIT=False
122 print("pageQueue为空")
123
124 for thread in threadcrawl:
125 thread.join() #守护线程
126 print('end')
127
128
129 while not dataQueque.empty():
130 pass
131
132 global PARSE_EXIT
133 PARSE_EXIT=False
134 print("dataQueue为空")
135
136 for thread1 in threadparse:
137 thread1.join()
138 print('end1')
139
140 print("谢谢使用!")
141
142 if __name__ == '__main__':
143 main()