From 2c8ae5f62806ae5408f957031627a81bea0b6c42 Mon Sep 17 00:00:00 2001 From: mariosemes Date: Thu, 26 Mar 2026 22:37:27 +0100 Subject: [PATCH] Switch streaming search to sequential queue instead of parallel Parallel scraping with Puppeteer blocks the Node.js event loop, preventing SSE events from flushing. Sequential processing means each store completes, sends its event, and the client sees it before the next store starts. Also sorts stores so cheerio-based (fast) stores run first, giving the user results sooner while Puppeteer stores load. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server/scraper/engine.ts | 89 ++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/src/server/scraper/engine.ts b/src/server/scraper/engine.ts index 9775e17..08a3bfc 100644 --- a/src/server/scraper/engine.ts +++ b/src/server/scraper/engine.ts @@ -155,68 +155,67 @@ export async function searchStreaming( return; } + // Sort: cheerio stores first (fast), Puppeteer stores last (slow) + stores.sort((a, b) => (a.render_js || 0) - (b.render_js || 0)); + // Send start event with store list onProgress({ type: 'start', stores: stores.map((s) => ({ id: s.id, name: s.name, renderJs: !!s.render_js })), }); - // Yield to event loop so the SSE start event flushes to client + // Yield so the SSE start event flushes to client await new Promise((r) => setTimeout(r, 0)); - const limit = pLimit(MAX_CONCURRENCY); const errors: SearchResult['meta']['errors'] = []; let totalResults = 0; - const scrapePromises = stores.map((store) => - limit(async () => { - const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); - const storeStart = Date.now(); - const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); + // Process stores sequentially so SSE events flush between each + for (const store of stores) { + const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); + const storeStart = Date.now(); + const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); - try { - const scrapeFn = store.render_js - ? () => scrapeStoreWithBrowser(store, searchUrl) - : () => scrapeStore(store, searchUrl); - const result = await rateLimiter.schedule(scrapeFn); - const duration = Date.now() - storeStart; + try { + const scrapeFn = store.render_js + ? () => scrapeStoreWithBrowser(store, searchUrl) + : () => scrapeStore(store, searchUrl); + const result = await rateLimiter.schedule(scrapeFn); + const duration = Date.now() - storeStart; - const products = result.items.map((item) => - normalizeResult(item, store.id, store.name, store.base_url, store.currency) - ); + const products = result.items.map((item) => + normalizeResult(item, store.id, store.name, store.base_url, store.currency) + ); - logScrape(store.id, query, true, products.length, duration); - totalResults += products.length; + logScrape(store.id, query, true, products.length, duration); + totalResults += products.length; - onProgress({ - type: 'store_complete', - storeId: store.id, - storeName: store.name, - results: products, - resultCount: products.length, - duration, - }); + onProgress({ + type: 'store_complete', + storeId: store.id, + storeName: store.name, + results: products, + resultCount: products.length, + duration, + }); + } catch (err) { + const duration = Date.now() - storeStart; + const errorMessage = err instanceof Error ? err.message : String(err); + logScrape(store.id, query, false, 0, duration, errorMessage); + errors.push({ storeId: store.id, storeName: store.name, error: errorMessage }); - // Yield so SSE event flushes to client - await new Promise((r) => setTimeout(r, 0)); - } catch (err) { - const duration = Date.now() - storeStart; - const errorMessage = err instanceof Error ? err.message : String(err); - logScrape(store.id, query, false, 0, duration, errorMessage); - errors.push({ storeId: store.id, storeName: store.name, error: errorMessage }); + onProgress({ + type: 'store_error', + storeId: store.id, + storeName: store.name, + error: errorMessage, + duration, + }); + } - onProgress({ - type: 'store_error', - storeId: store.id, - storeName: store.name, - error: errorMessage, - duration, - }); - } - }) - ); - - await Promise.all(scrapePromises); + // Yield between stores so SSE events flush + await new Promise((r) => setTimeout(r, 0)); + } onProgress({ type: 'done',