当前位置:AIGC资讯 > 数据采集 > 正文

【Scrapy爬虫】批量采集百度网页_知道_新闻_360图片_优酷视频

Scrapy爬虫】批量采集百度网页_百度知道_百度新闻_360图片_优酷视频

有一堆关键词,采集一批对应的内容,仅需一个脚本:说白就是一个关键词对应有几篇内容、知道、新闻以及图片和视频

可以用来干什么:使用web框架(Flask、Django),CMS(帝国、织梦)等聚合一堆页面。。。

需求主要使用Scarpy爬虫框架,涉及很多实用小技巧:

1,去除html标签 2,chardet编码识别 3,start_url批量添加 4,__xxx 类中的私有变量 5,callback传递多个参数 6,字典dict去重 7,无处不在的正则表达式 8,PyV8解析js(伪)
#coding:utf-8
 
import scrapy,re,urllib,chardet,json
from seo.items import SeoItem
from scrapy.http import Request
from readability.readability import Document
# import PyV8
 
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
 
 
def number(content):
    text = re.sub("[\s+\.\!\/_,$%^*(+\"\']+|[+——!,::。?、~@#¥%……&*()“”《》]+".decode("utf8"), "".decode("utf8"),content)  #去除中英文标点符号
    text2 = re.sub('<[^>]*?>','',text)  #去除所有标签
    words_number = len(text2)
    return int(words_number)
 
 
def bianma(i):
    i=str(i).strip()
    mychar = chardet.detect(i)
    bianma = mychar['encoding']
    if bianma == 'utf-8' or bianma == 'UTF-8':
        data=i
    else:
        data=i.decode('gbk','ignore').encode('utf-8')
    return data
 
 
def search(req,html):
     text = re.search(req,html)
     if text:
         data = text.group(1)
     else:
         data = 'no'
     return data
 
 
def extract_data(div,xpath_data):
    loading = div.xpath('%s'%xpath_data)
    if loading:
        loading=bianma(re.sub('<[^>]*?>','',search('<a[^>]*?>([\s\S]*?)</a>',loading.extract()[0])))
    else:
        loading='Aladdin'
    return loading
 
 
def qu_b(re_data):
    if re_data:
        loading=bianma(re.sub('<[^>]*?>','',re_data))
        loading=bianma(re.sub('&nbsp;','',loading))
    else:
        loading='Aladdin'
    return loading
 
 
class DmozSpider(scrapy.Spider):
    name = 'seo' 
    start_urls=[]
    for word in open('keywords.txt'):
        query=word.strip()
        start_urls.append('http://www.baidu.com/s?word=%s' % urllib.quote(query))
        start_urls.append('http://www.baidu.com/s?pn=10&word=%s' % urllib.quote(query))
        start_urls.append('http://www.baidu.com/s?pn=20&word=%s' % urllib.quote(query))
        start_urls.append('http://www.baidu.com/s?pn=30&word=%s' % urllib.quote(query))
        start_urls.append('http://www.baidu.com/s?pn=40&word=%s' % urllib.quote(query))
        start_urls.append('http://www.baidu.com/s?pn=50&word=%s' % urllib.quote(query))
        start_urls.append('http://news.baidu.com/ns?cl=2&rn=20&tn=news&word=%s' % urllib.quote(query))
        start_urls.append('http://zhidao.baidu.com/search?rn=10&ie=gbk&word=%s' % urllib.quote(query))
        start_urls.append('http://image.so.com/j?q=%s' % urllib.quote(query))
        start_urls.append('http://www.soku.com/search_video/q_%s' % urllib.quote(query))
 
    def __init__(self):
    #     #init js_ctx
    #     ctx = PyV8.JSContext()
    #     ctx.enter()
    #     self.js_ctx = ctx
        self.op_txt=open('url.txt','a')
        self.zidian={}
        c=0
        with open('url.txt') as f:
            for i in f.readlines():
                i=i.strip()
                self.zidian['%s'%(i)]=c
                c+=1
         
    def __get_url_query(self,url):
        m = re.search("word=(.*)",url).group(1)
        return m
 
    def __get_imgurl_query(self,url):
        m = re.search("q=(.*)",url).group(1)
        return m
 
    def __get_vediourl_query(self,url):
        m = re.search("q_(.*)",url).group(1)
        return m
 
    def parse(self,response):
        judge_url=response.url
        
        if 'www.baidu.com' in judge_url:
            re_url=re.compile(r'  style="text-decoration:none;">(.*?)</a>')
            url_list=re.findall(re_url,response.body)
            data_table='baidu_pc_search'
            query=urllib.unquote(self.__get_url_query(judge_url))
            for url in url_list:
                url='http://'+qu_b(url).strip()
                yield Request(url,callback=lambda response, typid=data_table, typeid=query: self.page_parse(response, typid, typeid))
 
 
        if 'zhidao.baidu.com' in judge_url:
            re_url=re.compile(r'<a href="https://my.oschina.net/u/2426650/blog/(http://zhidao\.baidu\.com/question/.*?html\?fr=iks&word=.*?&ie=gbk)"')
            url_list=re.findall(re_url,response.body)
            data_table='baidu_pc_zhidao'
            query=urllib.unquote(self.__get_url_query(judge_url))
            for url in url_list:
                yield Request(url,callback=lambda response, typid=data_table, typeid=query: self.page_parse(response, typid, typeid))
 
 
        if 'news.baidu.com' in judge_url:
            re_url=re.compile(r'<h3 class="c-title"><a href="https://my.oschina.net/u/2426650/blog/(.*?)"')
            url_list=re.findall(re_url,response.body)
            data_table='baidu_pc_news'
            query=urllib.unquote(self.__get_url_query(judge_url))
            for url in url_list:
                yield Request(url,callback=lambda response, typid=data_table, typeid=query: self.page_parse(response, typid, typeid))
 
 
        if 'image.so.com' in judge_url:
            # only_url=response.url
            json_str=response.body
            data_table='so_pc_img'
            query=urllib.unquote(self.__get_imgurl_query(judge_url))
            if len(json_str) > 0:
                # fret = self.js_ctx.eval("""
                #                 function func() {
                #                   var data = """ + json_str + """;
                #                   var json_data = JSON.stringify(data);
                #                   return json_data;
                #                 }
                #                 """)
                # jsond = self.js_ctx.locals.func()
                json_data = json.loads(json_str)
                # print json_data
                list_img = json_data['list']
                for i in list_img:
                    original_img=i['img']             
                    huancun_img=i['thumb_bak']
                    if self.zidian.has_key(judge_url):
                        print u'<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<采集url重复>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>'
                        pass
                    else:
                        print original_img,huancun_img
                        item = SeoItem()
                        item['table'] = data_table
                        item['query'] = query
                        item['title'] = original_img#.encode('utf-8')
                        item['article'] = huancun_img#.encode('utf-8')
                        self.op_txt.writelines(original_img+'\n')
                        yield item
 
 
        if 'soku.com' in judge_url:
            re_url=re.compile(r'<a title=".*?" target="_blank" href="https://my.oschina.net/u/2426650/blog/(http://v\.youku\.com/v_show/.*?)"')
            url_list=re.findall(re_url,response.body)
            data_table='youku_pc_swf'
            query=urllib.unquote(self.__get_vediourl_query(judge_url))
            for url in url_list:
                print url
                yield Request(url,callback=lambda response, typid=data_table, typeid=query: self.page_parse(response, typid, typeid))
 
 
 
    def page_parse(self,response,typid,typeid):
        only_url=response.url
 
        if self.zidian.has_key(only_url):
            print u'<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<采集url重复>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>'
            pass
 
        else:
            html = response.body
 
            if typid=='youku_pc_swf':
                title=search(r'</a><h1   title="(.*?)"><a href',html)
                article=search(r"<embed src='https://my.oschina.net/u/2426650/blog/(http://player\.youku\.com/player\.php/.*?swf)'.*?</embed>",html)
                item = SeoItem()
                item['table'] = typid
                item['query'] = typeid
                item['title'] = title#.encode('utf-8')
                item['article'] = article#.encode('utf-8')
                self.op_txt.writelines(only_url+'\n')
                yield item
             
            else:
                title = Document(html).short_title()
                article = Document(html).summary()
                a = re.sub(r'<script[\s\S]*?</script>|&#13;','',article).strip()
                b = re.sub(r'<(?!p|img|/p|br|iframe)[^<>]*?>','',a).strip()
                c = re.sub(r'<p[^>]*?>','<p>',b).strip().replace('\n','')
                article = re.sub(r'<p>\s+<p>','',c)
                num = number(b)      
                if num > 1 and '出错' not in title:
                    if '404' not in title:
                        # print title,article
                        item = SeoItem()
                        item['table'] = typid
                        item['query'] = typeid
                        item['title'] = title#.encode('utf-8')
                        item['article'] = article#.encode('utf-8')
                        self.op_txt.writelines(only_url+'\n')
                        yield item
                else:
                    print u'<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<没有内容pass掉>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>'
                    pass

更新时间 2023-11-08