欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Python入门教程:实战示例 - 爬虫解析文件中的队列与线程协同操作

最编程 2024-07-25 21:50:04
...
1 import requests 2 from queue import Queue 3 from threading import Thread 4 from lxml import etree 5 import json 6 7 CRAW_EXIT = True 8 PARSE_EXIT = True 9 10 class ThreadCrawl(Thread): 11 def __init__(self,threadName,pageQueque,dataQueque): 12 # 调用父类初始化方法 13 super(ThreadCrawl,self).__init__() 14 self.threadName=threadName # 线程名 15 self.pageQueque=pageQueque # 页码队列 16 self.dataQueque=dataQueque # 数据队列 17 18 def run(self): 19 print('启动'+self.threadName) 20 while CRAW_EXIT: 21 try: 22 # 取出一个数字,先进先出 23 # 可选参数block,默认值为True 24 # 1. 如果对列为空,block为True的话,不会结束,会进入阻塞状态,直到队列有新的数据 25 # 2. 如果队列为空,block为False的话,就弹出一个Queue.empty()异常 26 page=self.pageQueque.get(block=False) 27 # print(self.pageQueque.qsize()) 28 url = 'http://www.waduanzi.com/page/'+str(page) 29 # url = 'http://www.waduanzi.com' 30 header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"} 31 content=requests.get(url,headers=header).text 32 self.dataQueque.put(content) 33 # print(content) 34 # print(len(content)) 35 except: 36 pass 37 38 39 class ThreadParse(Thread): 40 def __init__(self,parseName,dataQueque,lock): 41 super(ThreadParse,self).__init__() 42 self.parseName=parseName 43 self.dataQueque=dataQueque 44 self.lock=lock # 线程锁 45 46 def run(self): 47 print('启动'+self.parseName) 48 while PARSE_EXIT: 49 try: 50 html=self.dataQueque.get(False) 51 self.parse(html) 52 except: 53 pass 54 55 def parse(self,html): 56 # 解析为HTML DOM 57 text=etree.HTML(html) 58 node = "//div[contains(@class,'panel panel20')]" 59 node_list = text.xpath(node) 60 61 for node in node_list: 62 # xpath返回的列表,这个列表就这一个参数,用索引方式取出来,用户名 63 img = node.xpath("./div[1]/img/@src")[0] 64 title = node.xpath("./div[2]/h2/a/@title")[0] 65 # 取出标签下的内容,段子内容 66 # content = node.xpath("./div[2]/div")[0].text //遇到有&nbsp;和<br>后,文本内容没有取全 67 content = node.xpath("normalize-space(./div[2]/div)") # normalize-space()遇到有&nbsp;和<br>后,文本内容正常取 68 zan = node.xpath("./div[3]/ul/li[1]/a")[0].text 69 buzan = node.xpath("./div[3]/ul/li[2]/a")[0].text 70 71 items = { 72 "img": img, 73 "title": title, 74 "content": content.replace("\xa0", ""), 75 "zan": zan, 76 "buzan": buzan, 77 } 78 79 self.lock.acquire() #锁定 80 with open("waduanzi.json", "a") as f: 81 f.write(json.dumps(items,ensure_ascii=False)+",\n") 82 self.lock.release() #释放 83 84 85 def main(): 86 # 页码的队列,表示10个页面 87 pageQueque=Queue(10) 88 for i in range(1,11): 89 pageQueque.put(i) 90 91 # 采集结果(每页的HTML源码)的数据队列,参数为空表示不限制 92 dataQueque=Queue(0) 93 94 import threading 95 lock=threading.Lock() #创建锁 96 97 # 三个采集线程的名字 98 crawList=['采集线程1','采集线程2','采集线程3'] 99 # 存储三个采集线程的列表集合 100 threadcrawl=[] 101 for threadName in crawList: 102 thread=ThreadCrawl(threadName,pageQueque,dataQueque) 103 thread.start() 104 threadcrawl.append(thread) 105 106 # 三个解析线程的名字 107 parseList=['解析县城1','解析线程2','解析线程3'] 108 # 存储三个解析线程 109 threadparse=[] 110 for parseName in parseList: 111 thread=ThreadParse(parseName,dataQueque,lock) 112 thread.start() 113 threadparse.append(thread) 114 115 # 等待pageQueue队列为空,也就是等待之前的操作执行完毕 116 while not pageQueque.empty(): 117 pass 118 119 # 如果pageQueue为空,采集线程退出循环 120 global CRAW_EXIT 121 CRAW_EXIT=False 122 print("pageQueue为空") 123 124 for thread in threadcrawl: 125 thread.join() #守护线程 126 print('end') 127 128 129 while not dataQueque.empty(): 130 pass 131 132 global PARSE_EXIT 133 PARSE_EXIT=False 134 print("dataQueue为空") 135 136 for thread1 in threadparse: 137 thread1.join() 138 print('end1') 139 140 print("谢谢使用!") 141 142 if __name__ == '__main__': 143 main()