Switch streaming search to sequential queue instead of parallel

Parallel scraping with Puppeteer blocks the Node.js event loop,
preventing SSE events from flushing. Sequential processing means
each store completes, sends its event, and the client sees it
before the next store starts.

Also sorts stores so cheerio-based (fast) stores run first,
giving the user results sooner while Puppeteer stores load.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
mariosemes
2026-03-26 22:37:27 +01:00
parent 0e6ec21e81
commit 2c8ae5f628

View File

@@ -155,21 +155,23 @@ export async function searchStreaming(
return; return;
} }
// Sort: cheerio stores first (fast), Puppeteer stores last (slow)
stores.sort((a, b) => (a.render_js || 0) - (b.render_js || 0));
// Send start event with store list // Send start event with store list
onProgress({ onProgress({
type: 'start', type: 'start',
stores: stores.map((s) => ({ id: s.id, name: s.name, renderJs: !!s.render_js })), stores: stores.map((s) => ({ id: s.id, name: s.name, renderJs: !!s.render_js })),
}); });
// Yield to event loop so the SSE start event flushes to client // Yield so the SSE start event flushes to client
await new Promise((r) => setTimeout(r, 0)); await new Promise((r) => setTimeout(r, 0));
const limit = pLimit(MAX_CONCURRENCY);
const errors: SearchResult['meta']['errors'] = []; const errors: SearchResult['meta']['errors'] = [];
let totalResults = 0; let totalResults = 0;
const scrapePromises = stores.map((store) => // Process stores sequentially so SSE events flush between each
limit(async () => { for (const store of stores) {
const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query));
const storeStart = Date.now(); const storeStart = Date.now();
const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit));
@@ -196,9 +198,6 @@ export async function searchStreaming(
resultCount: products.length, resultCount: products.length,
duration, duration,
}); });
// Yield so SSE event flushes to client
await new Promise((r) => setTimeout(r, 0));
} catch (err) { } catch (err) {
const duration = Date.now() - storeStart; const duration = Date.now() - storeStart;
const errorMessage = err instanceof Error ? err.message : String(err); const errorMessage = err instanceof Error ? err.message : String(err);
@@ -213,10 +212,10 @@ export async function searchStreaming(
duration, duration,
}); });
} }
})
);
await Promise.all(scrapePromises); // Yield between stores so SSE events flush
await new Promise((r) => setTimeout(r, 0));
}
onProgress({ onProgress({
type: 'done', type: 'done',