如何使用Scrapeless和Snowpipe Streaming将网页数据流入Snowflake
Senior Web Scraping Engineer
关键要点:
- 在 Snowflake 中无固定模式地进行网页数据抓取。 Scrapeless Scraping Browser 在云浏览器中呈现页面并输出以换行符分隔的 JSON(NDJSON);Snowflake 将其导入到
VARIANT列中,因此新字段永远不会破坏加载。 - 四种摄取方法,一种数据形状。 一次性加载使用批量
COPY INTO,Snowpipe 用于连续的文件批次,Snowpipe Streaming 用于低延迟行,Kafka 连接器用于事件驱动管道——所有方法都读取 Scrapeless 生成的相同 NDJSON。 - Snowpipe Streaming 的高性能架构已在 2025 年 9 月全面推出(GA)。 它通过 SDK(Java、Python、Node.js)或 REST 直接写入行,具有通道、偏移令牌和精确一次的恢复——无需暂存文件。
- 按需模式让抓取的数据保持灵活。 使用
col:field::type语法查询VARIANT列,并利用LATERAL FLATTEN展开数组——当源页面添加字段时无需迁移。 - Snowflake CLI(
snow)是当前的工具。pip install snowflake-cli,然后使用snow sql -f ingest.sql从一个文件运行整个设置。 - 免费开始。 新的 Scrapeless 账户包括免费的抓取浏览器运行时——登录 Scrapeless 网站 注册。
简介:从渲染的页面到 Snowflake 表
分析团队越来越希望将网页数据——产品目录、列表、评论、市场信号——与它们的第一方数据存储在同一个仓库中,以便能够进行连接、建模和喂入BI。Snowflake 是一个常见的目标,因为其 VARIANT 类型可以原生存储半结构化的 JSON,并使其可通过 SQL 查询。
摩擦在于这两个系统之间的差距。抓取的页面是 JavaScript 渲染的,并且受到反机器人防护;数据通常以嵌套 JSON 形式到达,形状随着源网站的变化而漂移。手动构建的加载程序将每个字段映射到列的做法在页面第一次添加一个字段时就会失败。
这篇文章将逐步讲解一种以终端为中心的工作流程,缩小这一差距。Scrapeless Scraping Browser 处理渲染和反检测方面,并输出 NDJSON;Snowflake 根据数据的新鲜程度以四种不同的方式摄取。示例生产者是公开抓取的沙箱 books.toscrape.com,因此下面的每个命令都是可重复的——同样的模式也适用于更难抓取的目标(请参见同系列的 2026 年最佳 Zillow 抓取工具 和 2026 年最佳亚马逊抓取工具 指南)。
你可以用它做什么
- 构建网页数据湖仓。 在 Snowflake 中存储抓取的目录和列表,并将其与内部的销售或库存数据连接。
- 运行定期市场快照。 每次运行时将新的 NDJSON 文件放入一个阶段,并让 Snowpipe 在几分钟内自动加载。
- 为近实时仪表板提供数据。 使用 Snowpipe Streaming 一行一行地流式传输抓取事件,确保子分钟的新鲜度。
- 桥接现有的 Kafka 支架。 将抓取记录推送到一个主题中,让 Snowflake Kafka 连接器进行加载。
- 保持模式灵活。 将原始 JSON 存储在
VARIANT中,并在查询时整形,以便源网站的变化从不阻塞加载。
在 Scrapeless,我们只访问公开可用的数据,同时严格遵守适用的法律、法规和网站隐私政策。本文内容仅供演示目的。
为什么选择 Scrapeless Scraping Browser
Scrapeless Scraping Browser 是一个可定制的、反检测的云浏览器,专为网络爬虫和 AI代理设计。作为 Snowflake 管道的生产方,它提供了:
- 云端的 JavaScript 渲染,因此数据在提取前已存在于 DOM 中
- 在 195 个国家的住宅代理,按会话分配
- 反检测浏览器指纹识别
- 一个单一的
scrapeless-scraping-browserCLI 界面,其eval返回可以在一步中重塑为 NDJSON 的 JSON - 多页面抓取的会话持久性
在免费计划上获取你的 API 密钥,访问 Scrapeless 网站。
前提条件
- Node.js 18 或更新版本
- 一个 Scrapeless 账户和 API 密钥 — 登录 Scrapeless 网站 注册
- 一个 Snowflake 账户,并具有可以创建数据库、仓库、阶段和管道的角色
- Snowflake CLI:
pip install snowflake-cli(Python 3.10+) - 对于 Snowpipe 自动摄取和外部阶段:一个云存储桶(AWS S3、GCS 或 Azure)并有权限创建存储集成
jq是可选的(对于 NDJSON 转换,提供了 Node 的一行备选方案)
在 ~/.snowflake/config.toml 中配置一次 Snowflake 连接:
toml
[connections.demo]
account = "myorg-myaccount"
user = "jondoe"
密码 = "your_password_here"
仓库 = "ingest_wh"
数据库 = "web_data"
模式 = "raw"
角色 = "sysadmin"
然后 `snow sql -c demo -q "SELECT CURRENT_VERSION();"` 确认它可以正常工作。
---
## 一览管道
无痕抓取浏览器 → NDJSON 文件 → Snowflake 阶段 → 表 (VARIANT)
(渲染 + 提取) (每行一个对象) (内部或外部桶) COPY INTO | 一次性
Snowpipe | 持续性
流式 | 低延迟
Kafka | 事件驱动
形状始终不变:无痕抓取器每行发出一个 JSON 对象,文件落在一个阶段,四种方法中的一种将其加载到你用 SQL 查询的 `VARIANT` 列中。
---
## 第一步 — 使用无痕抓取生成 NDJSON
安装 CLI 并设置你的密钥:
```bash
npm install -g scrapeless-scraping-browser
scrapeless-scraping-browser config set apiKey your_api_token_here
打开一个云会话,导航到目录页面,等待稳定标记,然后使用 eval 提取书籍记录。new-session JSON 将 id 嵌套在 data.taskId 下 - 使用 jq,或使用可移植的 grep 后备示例:
bash
# 打开会话并捕捉任务 id (jq 路径是 .data.taskId)
SID=$(scrapeless-scraping-browser new-session --name books --ttl 300 --proxy-country US --json | jq -r '.data.taskId')
# 没有 jq?可移植后备:
# SID=$(scrapeless-scraping-browser new-session --name books --ttl 300 --proxy-country US --json | grep -oE '"taskId":"[^"]*"' | head -1 | cut -d'"' -f4)
# 渲染目录页面,然后等待产品网格
scrapeless-scraping-browser --session-id "$SID" open "https://books.toscrape.com/catalogue/page-1.html"
scrapeless-scraping-browser --session-id "$SID" wait "article.product_pod"
# 每本书提取一条记录;eval 返回一个 JSON 数组
scrapeless-scraping-browser --session-id "$SID" eval '
JSON.stringify(Array.from(document.querySelectorAll("article.product_pod")).map(el => ({
title: el.querySelector("h3 a")?.getAttribute("title") ?? null,
price: el.querySelector(".price_color")?.textContent.trim() ?? null,
rating: el.querySelector("p.star-rating")?.className.replace("star-rating", "").trim() ?? null,
in_stock: /In stock/i.test(el.querySelector(".availability")?.textContent ?? ""),
url: el.querySelector("h3 a")?.href ?? null
})))
' > books.raw.json
scrapeless-scraping-browser --session-id "$SID" close
将数组转换为 NDJSON - 每行一个对象,这是 Snowflake 加载器最干净的读取格式:
bash
# 使用 jq
jq -c '.[]' books.raw.json > books.ndjson
# 或者,不使用 jq,使用 Node 一行命令
node -e 'JSON.parse(require("fs").readFileSync("books.raw.json","utf8")).forEach(o=>console.log(JSON.stringify(o)))' > books.ndjson
books.ndjson 现在每行包含一个自包含的 JSON 对象。如果一个冷会话返回一个空壳或短暂的 os error 10054,关闭会话,创建一个新的会话,然后在提取之前尝试有限的次数。
第二步 — 准备 Snowflake
创建仓库、数据库、模式、一个 JSON 文件格式和一个具有单一 VARIANT 列的落地表。将此保存为 setup.sql 并使用 snow sql -c demo -f setup.sql 运行:
sql
CREATE WAREHOUSE IF NOT EXISTS ingest_wh WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 60;
CREATE DATABASE IF NOT EXISTS web_data;
CREATE SCHEMA IF NOT EXISTS web_data.raw;
USE WAREHOUSE ingest_wh;
USE SCHEMA web_data.raw;
-- NDJSON: 每行一个 JSON 对象,因此不要去掉外部数组
CREATE OR REPLACE FILE FORMAT ndjson_format
TYPE = JSON
STRIP_OUTER_ARRAY = FALSE
COMPRESSION = AUTO;
-- 按原样加载原始记录;在查询时塑造它
CREATE OR REPLACE TABLE raw_books (
src VARIANT,
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
STRIP_OUTER_ARRAY = FALSE 对于 NDJSON 是正确的,因为每行已经是自己的对象 - STRIP_OUTER_ARRAY = TRUE 仅适用于一个大型 [ ... ] 数组的文件。
第三步 — 方法 1:使用 COPY INTO 一次性批量加载
对于单个文件或手动批次,阶段文件并运行 COPY INTO。最简单的路径是一个命名的内部阶段加上 PUT:
sql
-- 一个绑定到 JSON 格式的命名内部阶段
CREATE OR REPLACE STAGE books_stage FILE_FORMAT = ndjson_format;
bash
# 将本地 NDJSON 上传到内部阶段 (snow CLI 运行 PUT)
snow sql -c demo -q "PUT file://$(pwd)/books.ndjson @books_stage AUTO_COMPRESS=TRUE OVERWRITE=TRUE"
sql
-- 将每个对象作为一行加载到 VARIANT 列中
COPY INTO raw_books (src)
FROM @books_stage
FILE_FORMAT = (FORMAT_NAME = 'ndjson_format')
ON_ERROR = 'CONTINUE';
要将 JSON 键直接映射到类型化列而不是 VARIANT,创建一个列名与键匹配的表并使用 MATCH_BY_COLUMN_NAME:
sql
CREATE OR REPLACE TABLE books (
title VARCHAR, price VARCHAR, rating VARCHAR, in_stock BOOLEAN, url VARCHAR
);
COPY INTO books
FROM @books_stage
FILE_FORMAT = (TYPE = 'JSON')
MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE';
如果您更愿意让 Snowflake 从暂存文件中推导模式,使用 INFER_SCHEMA 和 CREATE TABLE … USING TEMPLATE 方法可以为您构建列列表:
sql
CREATE OR REPLACE TABLE books_auto
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(INFER_SCHEMA(
LOCATION => '@books_stage',
FILE_FORMAT => 'ndjson_format'
))
);
对于已经存放在云存储桶中的数据,指向 外部阶段 而不是上传。使用存储集成(没有内联密钥):
sql
CREATE OR REPLACE STAGE books_s3_stage
URL = 's3://my-bucket/scraped/books/'
STORAGE_INTEGRATION = my_s3_integration
FILE_FORMAT = ndjson_format;
COPY INTO raw_books (src) FROM @books_s3_stage;
在免费计划中获取您的 API 密钥:app.scrapeless.com
第 4 步 — 方法 2:使用 Snowpipe 的持续批处理
当爬虫按计划将新文件放入存储桶时,Snowpipe 会自动加载每个文件——无需手动 COPY 和专用仓库。管道包装一个 COPY INTO 语句;使用 AUTO_INGEST = TRUE,云事件通知触发加载:
sql
CREATE OR REPLACE PIPE books_pipe
AUTO_INGEST = TRUE
AWS_SNS_TOPIC = 'arn:aws:sns:us-east-1:123456789012:scraped-bucket'
AS
COPY INTO raw_books (src)
FROM @books_s3_stage
FILE_FORMAT = (TYPE = 'JSON');
在 S3 上,事件通过 SNS/SQS 流入 Snowflake 管理的队列;GCS 使用 Pub/Sub,Azure 使用 Event Grid,每种方式都有通知集成。如果您更愿意显式调用 Snowpipe,则可以不设置 AUTO_INGEST,并将暂存文件路径 POST 到 insertFiles REST 端点,然后轮询 insertReport。
Snowflake 指导中的两个操作注意事项:
- 计费是无服务器的,现在根据 Snowpipe 吸入的数据按每 GB 收费——没有需要调整大小的仓库,之前按文件收费的部分已被取消。
- 文件大小很重要。 目标文件大小为 100 到 250 MB 压缩,并且每分钟最多只进行一次暂存;更频繁的暂存会增加队列管理开销而不降低延迟。在暂存之前将小的抓取批处理缓冲为更大的文件。
Snowpipe 使数据在几分钟内可用,适合定期的市场快照。
第 5 步 — 方法 3:使用 Snowpipe Streaming 实现低延迟行
当新鲜度需要是秒而不是分钟时,Snowpipe Streaming 直接将行写入表——无需暂存文件。高性能架构自 2025 年 9 月起普遍可用,同时有 Java、Python 和 Node.js 的 SDK 以及通过共享客户端核心的 REST API;经典的基于文件的 Snowpipe Streaming 正在退役路径上。
该模型有三个核心概念:
- 通道——一个与表的命名长连接。行在通道内按顺序提交。
- 偏移令牌——应用程序附加到每个批次的字符串。重启后,
getLatestCommittedOffsetToken()告诉您最后一个持久提交的位置,因此您只需重放后续内容——这正是一次传递的基础。 - 吞吐量计费——按每 GB 未压缩数据收费,而不是按文件。
Java 客户端示例很简洁:
java
SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.builder("BOOKS_CLIENT")
.setProperties(props).build();
OpenChannelRequest request = OpenChannelRequest.builder("BOOKS_CHANNEL")
.setDBName("WEB_DATA").setSchemaName("RAW").setTableName("RAW_BOOKS")
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.build();
SnowflakeStreamingIngestChannel channel = client.openChannel(request);
channel.insertRow(rowAsMap, offsetToken); // 一条抓取记录
channel.getLatestCommittedOffsetToken(); // 用于恢复
当抓取记录以连续流的形式到达(代理在爬行中生成事件)并且仪表板需要在几秒钟内获取时,就可以使用 Streaming。
第 6 步 — 方法 4:使用 Kafka 连接器进行事件驱动加载
如果抓取记录已经通过 Apache Kafka 流动,Snowflake Connector for Kafka 将主题加载到表中(一个主题映射到一个表)。它在 Kafka Connect 工作器内部运行。一个属性选择底层的摄取引擎:
properties
name=scraped-books-sink
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
topics=scraped_books
snowflake.database.name=WEB_DATA
snowflake.schema.name=RAW
# SNOWPIPE(基于文件,默认)或 SNOWPIPE_STREAMING(低延迟)
snowflake.ingestion.method=SNOWPIPE_STREAMING
每个连接器创建的表都有两个 VARIANT 列:RECORD_CONTENT(消息负载)和 RECORD_METADATA(主题、分区、偏移量、时间戳和头部)。查询负载的方式与查询任何 VARIANT 的方式完全相同。
对于一种完全托管的替代方案——无需操作 Kafka Connect 集群——Snowflake Openflow(一般可用,基于 Apache NiFi)通过托管管道从 Kafka、Kinesis、数据库和 SaaS 源中摄取数据到 Snowflake。
选择一种方法
| 方法 | 延迟 | 数据形状 | 操作开销 | 使用场景 |
|---|---|---|---|---|
COPY INTO |
手动 | 阶段中的文件 | 最低 | 一次性加载,补充数据 |
| Snowpipe | 几分钟 | 放入存储桶中的文件 | 低(无服务器) | 定期抓取批次 |
| Snowpipe Streaming | 几秒 | 通过 SDK/REST 的行 | 中等(编写客户端) | 持续事件流 |
| Kafka 连接器 | 秒至分钟 | Kafka 主题记录 | 中等(连接工作者) | 现有的 Kafka 框架 |
大多数团队首先使用 COPY INTO 来验证模式,在抓取程序按计划运行后转向 Snowpipe,只有在子分钟新鲜度证明额外移动部分的合理性时,才采用 Streaming 或 Kafka。
查询加载的数据
由于原始记录存在于 VARIANT 中,因此您在读取时进行塑形。使用 : 运算符导航,并用 :: 类型转换:
sql
SELECT
src:title::string AS title,
src:price::string AS price,
src:rating::string AS rating,
src:in_stock::boolean AS in_stock,
src:url::string AS url
FROM raw_books;
当抓取的记录包含一个数组——一个 photos 列表,一个 priceHistory 系列——LATERAL FLATTEN 将其展开为行:
sql
-- raw_listings: 一个假设的抓取列表表,加载方式与 raw_books 相同。
-- (书籍示例没有嵌套数组;这里展示了具有嵌套数组的源的模式。)
SELECT
src:title::string AS title,
ph.value:date::string AS price_date,
ph.value:price::number AS price
FROM raw_listings,
LATERAL FLATTEN(INPUT => src:priceHistory) ph;
当源页面添加字段时无需迁移——它只是出现在下一个加载的 src 下。
您得到的结果
在 COPY INTO 之后, 每个抓取对象在 raw_books 中是一行。VARIANT 列按原样保存记录;下面的模式是规范性的,字段值是示例:
json
// 一行的 src VARIANT,来自步骤 1 提取器的输出。
{
"title": "A Light in the Attic",
"price": "£51.77",
"rating": "Three",
"in_stock": true,
"url": "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html"
}
几个诚实的观察:
- 价格以显示字符串的形式到达。 在 SQL 中进行类型转换和清理(
REPLACE(src:price::string, '£', '')::number),而不是期待页面返回数字。 - 条件字段是可为空的。 在某个页面上缺失的字段会简单地在
src中缺失;访问VARIANT返回NULL而不是错误。 MATCH_BY_COLUMN_NAME跳过不匹配的键。 新键会自动进入VARIANT表,但在类型表中将被丢弃,直到您添加该列。- Snowpipe 加载历史在管道元数据中保留 14 天; 大批量
COPY历史在表元数据中保留 64 天——在审计补充数据时请牢记这一点。
结论
将抓取的网数据导入 Snowflake 简化为四个步骤:使用 Scrapeless 渲染和提取,发出 NDJSON,暂存文件,并使用合适延迟的方法加载它——COPY INTO、Snowpipe、Snowpipe Streaming 或 Kafka 连接器。将原始记录放入 VARIANT 列中并在查询时进行塑形,从而使管道能够适应源网站添加字段。
在目标需要时保持美国出口,将 Scrapeless 会话链保持在一个 shell 调用内,遵循发现 → 提取模式,并将缺失字段视为可为空的。在比这里使用的沙盒更复杂的目标处,相同的生产者模式仍适用——见 2026年最佳 Zillow 抓取工具 指南、Scraping Browser 产品页面 和 Scrapeless 文档。
准备构建您的 AI 驱动数据管道?
加入我们的社区以申请免费计划,并与构建网页数据到仓库管道的开发者连接:Discord · Telegram。
在 Scrapeless 网站 注册以获得免费的 Scraping Browser 运行时间,并查看 scrapeless.com/en/pricing 以随着管道的增长扩展会话分钟数和并发。
FAQ
问1:网络抓取用于仓库摄取是否合法?
收集公开可见数据在广泛的范围内是可以辩护的,但合法性取决于目标网站的条款、管辖权和数据类型。请查看网站的服务条款,避免个人或受限数据,并在商业使用前咨询法律顾问。这里使用的沙箱网站books.toscrape.com专门用于爬虫实践。
Q2: NDJSON还是JSON数组——爬虫应该输出哪种?
NDJSON(每行一个对象)加载最为干净并且可以流式传输而无需缓冲整个文件。将STRIP_OUTER_ARRAY设置为FALSE。如果您的生成器输出一个单一的[ ... ]数组,请将STRIP_OUTER_ARRAY设置为TRUE,使每个元素变成一行。
Q3: 我应该加载到VARIANT列还是类型化列中?
将原始爬取数据放入VARIANT中,并用SQL进行处理——源页面会变化,VARIANT可以在不迁移的情况下吸收新字段。仅在架构稳定后,使用MATCH_BY_COLUMN_NAME加载到类型化列中。
Q4: 我应该选择哪种摄取方法?
对于一次性加载使用COPY INTO,对于定时文件批次(延迟几分钟,无服务器)使用Snowpipe,对于子分钟行级新鲜度使用Snowpipe Streaming,当记录已经通过Kafka流动时使用Kafka连接器。首先从COPY INTO开始,然后随着新鲜度需求的增长再升级。
Q5: 我该如何处理像os error 10054或503这样的瞬态爬虫错误?
将其视为瞬态错误:关闭Scrapeless会话,创建一个新的会话,再次导航,并在提取之前等待一个稳定的选择器。保持重试的次数有限。这些属于生产者端,不会影响Snowflake,后者加载任何进入阶段的数据。
Q6: Snowpipe是否需要运行一个仓库?
不需要。Snowpipe是无服务器的,并按摄取的GB收费——Snowflake提供计算资源。用户管理的仓库仅在进行大量COPY INTO和查询时需要。
Q7: 我可以在没有AI代理的情况下运行这个吗?
可以。scrapeless-scraping-browser CLI从普通shell中端到端生成NDJSON,Snowflake端是普通SQL。连接MCP的代理是便利的路径,但不是必需的。
在Scrapeless,我们仅访问公开可用的数据,并严格遵循适用的法律、法规和网站隐私政策。本博客中的内容仅供演示之用,不涉及任何非法或侵权活动。我们对使用本博客或第三方链接中的信息不做任何保证,并免除所有责任。在进行任何抓取活动之前,请咨询您的法律顾问,并审查目标网站的服务条款或获取必要的许可。



