目录
- playwright简介
- 使用playwright初窥亚马逊
- 安装playwright
- 打开亚马逊页面
 
- 搞数据
- 搜索
- 修改bug
- 数据获取
- 翻页
- 优化结构
 
- 简单保存
 
playwright简介
playwright是微软新出的一个测试工具,与selenium类似,不过与selenium比起来还是有其自身的优势的(除了教程少是弱项)。
-  任何浏览器 • 任何平台 • 一个 API - 跨浏览器。 Playwright 支持所有现代渲染引擎,包括 Chromium、WebKit 和 Firefox。
- 跨平台。 在 Windows、Linux 和 macOS 上进行本地或 CI 测试,无头或有头。
- 跨语言。 在 TypeScript、JavaScript、Python、.NET、Java 中使用 Playwright API。
- 测试移动网络。 适用于 Android 的 Google Chrome 和 Mobile Safari 的原生移动模拟。 相同的渲染引擎可以在桌面和云端运行。
 (上面是抄官网的, 下面才是个人的使用体验)
 
-  方便 - 配置方便。与selenium的配置相比较的话,不需要下载额外的驱动,也不需要下载对应版本的浏览器。(就是配置地址不能修改)。
- 可以异步。selenium是只能阻塞式的执行,而playwright可以执行异步。
- 内存占用少。好像确实是比较少一点。
 
- 配置方便。与
使用playwright初窥亚马逊
通常情况,每家电商平台都有自己的反爬措施,亚马逊也不例外,所以要尝试一下,当然如果是scrapy这个爬虫框架的话,似乎可以越过反爬措施,内部的逻辑代码是什么没有深究,现在先拿过来用用。
安装playwright
同样的,这个第三方库也要安装,直接使用pip安装就行
pip install playwright
安装完毕之后,还需要安装一些依赖,只要执行一句话即可。这样就能安装了。
playwright install
如果不能安装,或者安装卡进度条了,可以查看这个教程
打开亚马逊页面
小试牛刀一把,看看这个库怎么样
import asyncio
import time
import traceback
from playwright.async_api import async_playwright
class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None
    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.goto("https://www.amazon.com/")
        time.sleep(60)
        await self.browser.close()
if __name__ == "__main__":
    asyncio.run(AmazonBrowser().start())

 what!!! 不愧是美国注明的电商平台,上来就是验证码=。=还有点不好搞定。
那试试添加headers,应该可以
class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Referer": "https://www.google.com/", # 伪造,告诉亚马逊网站,说我是从Google搜索页面点击进来的。
        }

 诶嘿! 来了!
搞数据
在第一个页面里面,没有我们想要的数据,所以我们需要输入内容,然后从新的页面中搞到要的数据。
 获取元素 → 搜索 → 搞数据 应该是这个思路。
搜索
我们需要获取2个元素,一个元素是搜索框,另一个元素是搜索按钮(当然搞回车按键也行),从F12里面可以得知,搜索框是下面的内容:
 
 搜索按钮是下面的:
 
 先搞定输入框:
import random # 引入random库
class AmazonBrowser:
    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.set_extra_http_headers(self.headers)
        await self.page.goto("https://www.amazon.com/")
		search_box = self.page.locator("input#twotabsearchtextbox")# 这里的input有个id,就是twotabsearchtextbox,这个id应该是独立的,所以直接用它。
        await search_box.wait_for(state="visible")
        await search_box.type('键盘', delay=random.uniform(50, 200)) 
        time.sleep(60)
【细节讲解】
- locator:在- Playwright中,- locator的作用是用于定位页面上的元素,语法有很多:- "text=登录"是通过文本匹配,
- "#username"是通过ID匹配,
- "//button[@type='submit']"是通过xpath匹配,
- "a.s-pagination-next.s-pagination-separator"通过css匹配,
- "a.s-pagination-next.s-pagination-separator:not([aria-disabled="true"])",这个则是排除具有- aria-disabled="true"属性的元素。
 
- type:在- playwright中,- type的作用时将文字打入输入框中,与之类似的方法是- fill,只不过前者是将文字挨个键入,而后者则是将文字一次性输入,在严格监听页面的网页中,- type的的功能用的会比较多,因为可以模拟人的操作,而- fill则不行。
再执行看看~
 
 芜湖~
 
 同样的,找到搜索按钮并点击就能跳转过去了。
class AmazonBrowser:
    async def start(self):
    # ...
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await self.page.locator("input#nav-search-submit-button").click()

修改bug
有的人可能会遇到另外一种情况:
 
 这个时候搜索输入框和搜索按钮的id就已经不一样了,这种情况的搜索输入框是id="nav-bb-search",搜索按钮是没有id的只有class="nav-bb-button",所以要修改上面的代码
class AmazonBrowser:
    async def start(self):
		#...
		# 同时检测两种搜索框的存在
        twotab_exists = await self.page.locator("input#twotabsearchtextbox").count()
        navbb_exists = await self.page.locator("input#nav-bb-search").count()
        if twotab_exists:
            search_box = self.page.locator("input#twotabsearchtextbox")
            search_btn = self.page.locator("input#nav-search-submit-button")
        elif navbb_exists:
            search_box = self.page.locator("input#nav-bb-search")
            search_btn = self.page.locator("input.nav-bb-button")
        else:
            raise Exception("No search box found")
        await search_box.wait_for(state="visible")
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await search_btn.click()
数据获取
我们先来看看要搞哪些数据:
 
 从这个截图上看,有图片,标题,评分,价格。不过有一点是:第二个图片里面有2个价格,到底那个是真实的价格呢?点进去发现,第一个价格是原价,第二个价格好像是二手价格。
 
 那就弄原价吧,不然太麻烦了。可是他的标题有了,要是标题重复了怎么办?从F12里面可以发现data-asin应该是唯一的,那我们就把这个ASIN弄到手,当然后面也有一个uuid,弄那个也行。
 
# 新增依赖库
from urllib.parse import urljoin
class AmazonBrowser:
    async def start(self):
    	#...
    	await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        batch_data = []
        for index, item in enumerate(items):
        	asin = await item.get_attribute("data-asin")
        	uuid = await item.get_attribute("data-uuid")
        	img = item.locator('img.s-image').first
        	await img.scroll_into_view_if_needed()
        	await self.page.wait_for_timeout(500)
        	src = await img.get_attribute('src')
			href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
			product = await item.locator('h2.a-size-medium > span').first.inner_text()
			price_section = item.locator('span.a-price[data-a-color="base"]')
			if await price_section.count() > 0:
				price = await price_section.locator('span.a-offscreen').first.inner_text()
			else:
				price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
            		price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
            	else:
            		price = '暂无价格'
            batch_data.append({
                        'Product': product,
                        'UUID': uuid,
                        'ASIN': asin,
                        'ImageURL': src,
                        'URL': urljoin(self.target_url, href) if href else None,
                        'Price': price,
                    })
【细节讲解】
- wait_for_selector:是等待页面上符合选择器的元素出现,最长等待15秒。- div[role="listitem"][data-asin]:是选择- <div>元素,要求这个元素里面有- [role="listitem"]以及- [data-asin]这两个东西。
 
- await,异步标志,有些情况下需要使用,有些情况下则不需要使用:- 需要使用的情况: 
    - 涉及网络请求:如 goto()、fetch()
- 获取异步返回的内容:如 text_content()、inner_text()
- 涉及页面交互的操作:如点击、输入、等待
- 需要时间完成的操作:如 wait_for_selector()
 
- 涉及网络请求:如 
- 不需要使用的情况: 
    - locator()对象
- 只定义选择器,不交互
- 直接获取属性,如browser.name
 
 
- 需要使用的情况: 
    
翻页
一页当然是不行的,一页才16条数据,所以肯定要翻页的。同理从F12里面找到翻页标签,那就是
 
 然后仅有这个也不够,我们还要知道终止,也就是不能点击的情况下,是什么情况的,所以翻到20页得到
 
 所以需要设计选择器,来判断下一页按钮能不能按下去,或者是能不能检测到终止的<span>
class AmazonBrowser:
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待

优化结构
但是仅有循环,没有获取数据,这也不是我们想要的,所以接下来是优化结构。将数据获取也翻页分开。
class AmazonBrowser:
	# ...
	async def data_collection(self):
        await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        for index, item in enumerate(items):
            asin = await item.get_attribute("data-asin")
            uuid = await item.get_attribute("data-uuid")
            img = item.locator('img.s-image').first
            await img.scroll_into_view_if_needed()
            await self.page.wait_for_timeout(500)
            src = await img.get_attribute('src')
            href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
            product = await item.locator('h2.a-size-medium > span').first.inner_text()
            price_section = item.locator('span.a-price[data-a-color="base"]')
            if await price_section.count() > 0:
                price = await price_section.locator('span.a-offscreen').first.inner_text()
            else:
                price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
                    price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
                else:
                    price = '暂无价格'
			# self.batch_data要提前声明哦
            self.batch_data.append({
                'Product': product,
                'UUID': uuid,
                'ASIN': asin,
                'ImageURL': src,
                'URL': urljoin(self.target_url, href) if href else None,
                'Price': price,
            })
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			# 在这里添加
			await self.data_collection()
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待
这样就搞定啦,但是数据要怎么保存呢?可以有两种方式,一种方式是保存在数据库里面,比如mongodb,mysql等等,还可以保存在csv文件里面。
简单保存
在这里就简单的保存成csv文件的样子吧,也就是直接使用pandas保存就行了,下一篇章就搞一下mongodb
# 导入依赖库
import pandas as pd
class AmazonBrowser:
	def __init__(self):
		self.df = pd.DataFrame()
	#... 
	async def start(self):
		while True:
			#...while循环内容
		new_df = pd.DataFrame(self.batch_data)
        self.df = pd.concat([self.df, new_df], ignore_index=True)
        self.df.to_csv(path)
就酱~
 




![[数据结构]排序之 归并排序(有详细的递归图解)](https://i-blog.csdnimg.cn/direct/6b5fec2a539d45adbd53451e5705fed9.png)














