Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelizing sitemap fetching for increased performance on nested sitemaps #631

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/api/src/controllers/v1/crawl-status-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";

Expand Down Expand Up @@ -94,9 +94,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara

doneJobIDs = await getDoneJobsOrdered(req.params.jobId);

const preJobState = await getCrawlPreQueue().getJobState(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : preJobState === "failed" ? "failed" : preJobState !== "completed" ? "scraping" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const doneJobs = await getJobs(doneJobIDs);
const data = doneJobs.map(x => x.returnvalue);

Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/controllers/v1/crawl-status.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Response } from "express";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
configDotenv();
Expand Down Expand Up @@ -57,9 +57,10 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;

const preJobState = await getCrawlPreQueue().getJobState(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : preJobState === "failed" ? "failed" : preJobState !== "completed" ? "scraping" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);

Expand Down
86 changes: 8 additions & 78 deletions apps/api/src/controllers/v1/crawl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
} from "../../lib/crawl-redis";
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs";
import { addCrawlPreJob, addScrapeJob } from "../../services/queue-jobs";
import { Logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
Expand Down Expand Up @@ -71,85 +71,15 @@ export async function crawlController(
plan: req.auth.plan,
};

const crawler = crawlToCrawler(id, sc);

try {
sc.robots = await crawler.getRobotsTxt();
} catch (e) {
Logger.debug(
`[Crawl] Failed to get robots.txt (this is probably fine!): ${JSON.stringify(
e
)}`
);
}

await saveCrawl(id, sc);

const sitemap = sc.crawlerOptions.ignoreSitemap
? null
: await crawler.tryGetSitemap();

if (sitemap !== null && sitemap.length > 0) {
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
if(sitemap.length > 1000){
// set base to 21
jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21})
}
const jobs = sitemap.map((x) => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
team_id: req.auth.team_id,
crawlerOptions,
pageOptions,
origin: "api",
crawl_id: id,
sitemapped: true,
webhook: req.body.webhook,
v1: true,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});

await lockURLs(
id,
jobs.map((x) => x.data.url)
);
await addCrawlJobs(
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, req.body.url);
const job = await addScrapeJob(
{
url: req.body.url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: req.auth.team_id,
pageOptions: pageOptions,
origin: "api",
crawl_id: id,
webhook: req.body.webhook,
v1: true,
},
{
priority: 15,
}
);
await addCrawlJob(id, job.id);
}
await addCrawlPreJob({
auth: req.auth,
crawlerOptions,
pageOptions,
webhook: req.body.webhook,
url: req.body.url,
}, id);

if(req.body.webhook) {
await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started");
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/controllers/v1/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ export type CrawlStatusResponse =
data: Document[];
};

type AuthObject = {
export type AuthObject = {
team_id: string;
plan: PlanType;
};
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getScrapeQueue } from "./services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
Expand Down Expand Up @@ -67,7 +67,7 @@ if (cluster.isMaster) {
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);

const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getCrawlPreQueue())],
serverAdapter: serverAdapter,
});

Expand Down
18 changes: 8 additions & 10 deletions apps/api/src/scraper/WebScraper/sitemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ export async function getLinksFromSitemap(
const root = parsed.urlset || parsed.sitemapindex;

if (root && root.sitemap) {
for (const sitemap of root.sitemap) {
if (sitemap.loc && sitemap.loc.length > 0) {
await getLinksFromSitemap({ sitemapUrl: sitemap.loc[0], allUrls, mode });
}
}
const sitemapPromises = root.sitemap
.filter(sitemap => sitemap.loc && sitemap.loc.length > 0)
.map(sitemap => getLinksFromSitemap({ sitemapUrl: sitemap.loc[0], allUrls, mode }));
await Promise.all(sitemapPromises);
} else if (root && root.url) {
for (const url of root.url) {
if (url.loc && url.loc.length > 0 && !WebCrawler.prototype.isFile(url.loc[0])) {
allUrls.push(url.loc[0]);
}
}
const validUrls = root.url
.filter(url => url.loc && url.loc.length > 0 && !WebCrawler.prototype.isFile(url.loc[0]))
.map(url => url.loc[0]);
allUrls.push(...validUrls);
}
} catch (error) {
Logger.debug(`Error processing sitemapUrl: ${sitemapUrl} | Error: ${error.message}`);
Expand Down
49 changes: 48 additions & 1 deletion apps/api/src/services/queue-jobs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Job, Queue } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import { AuthObject } from "../controllers/v1/types";

async function addScrapeJobRaw(
webScraperOptions: any,
Expand Down Expand Up @@ -49,6 +50,52 @@ export async function addScrapeJob(
}
}

async function addCrawlPreJobRaw(
data: any,
jobId: string,
): Promise<Job> {
return await getCrawlPreQueue().add(jobId, data, {
jobId,
});
}

export async function addCrawlPreJob(
data: {
auth: AuthObject,
crawlerOptions: any,
pageOptions: any,
webhook?: string, // req.body.webhook
url: string, // req.body.url
sentry?: any,
},
jobId: string,
): Promise<Job> {

if (Sentry.isInitialized()) {
const size = JSON.stringify(data).length;
return await Sentry.startSpan({
name: "Add crawl pre job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getCrawlPreQueue().name,
"messaging.message.body.size": size,
},
}, async (span) => {
return await addCrawlPreJobRaw({
...data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
}, jobId);
});
} else {
return await addCrawlPreJobRaw(data, jobId);
}
}

export function waitForJob(jobId: string, timeout: number) {
return new Promise((resolve, reject) => {
const start = Date.now();
Expand Down
28 changes: 28 additions & 0 deletions apps/api/src/services/queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Logger } from "../lib/logger";
import IORedis from "ioredis";

let scrapeQueue: Queue;
let crawlPreQueue: Queue;

export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
Expand Down Expand Up @@ -34,6 +35,33 @@ export function getScrapeQueue() {
return scrapeQueue;
}

export const crawlPreQueueName = "{crawlPreQueue}";

export function getCrawlPreQueue() {
if (!crawlPreQueue) {
crawlPreQueue = new Queue(
crawlPreQueueName,
{
connection: redisConnection,
}
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Crawl pre queue created");
}
return crawlPreQueue;
}



// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
Expand Down
Loading
Loading