From 37425812e0e6cfe4383e2f5cd8d7b2db18e3eb40 Mon Sep 17 00:00:00 2001 From: mariosemes Date: Thu, 26 Mar 2026 22:15:50 +0100 Subject: [PATCH] Add real-time per-store search progress via SSE streaming Backend: new /api/search/stream SSE endpoint that emits events as each store completes: start (store list), store_complete (results + duration), store_error, and done (final meta). Frontend: results page now shows live progress per store with spinning indicators while searching, checkmarks when done, and X marks on errors. Each store chip shows product count and response time. Results stream into the table as stores complete instead of waiting for all stores to finish. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/client/src/lib/api.ts | 28 +++ src/client/src/routes/results/+page.svelte | 224 +++++++++++++-------- src/server/routes/search.ts | 52 ++++- src/server/scraper/engine.ts | 101 ++++++++++ 4 files changed, 322 insertions(+), 83 deletions(-) diff --git a/src/client/src/lib/api.ts b/src/client/src/lib/api.ts index 9be5a14..08589a3 100644 --- a/src/client/src/lib/api.ts +++ b/src/client/src/lib/api.ts @@ -94,6 +94,34 @@ export function searchProducts(query: string, params?: { stores?: string; catego return api(`/api/search?${searchParams}`); } +export function searchProductsStream( + query: string, + params: { stores?: string; category?: string; group?: string } | undefined, + onEvent: (event: any) => void, +): () => void { + const searchParams = new URLSearchParams({ q: query }); + if (params?.stores) searchParams.set('stores', params.stores); + if (params?.category) searchParams.set('category', params.category); + if (params?.group) searchParams.set('group', params.group); + + const evtSource = new EventSource(`/api/search/stream?${searchParams}`); + + evtSource.onmessage = (e) => { + try { + const event = JSON.parse(e.data); + onEvent(event); + if (event.type === 'done') evtSource.close(); + } catch { /* ignore parse errors */ } + }; + + evtSource.onerror = () => { + evtSource.close(); + }; + + // Return cleanup function + return () => evtSource.close(); +} + export function testStore(id: number, query: string) { return api(`/api/stores/${id}/test`, { method: 'POST', body: JSON.stringify({ query }) }); } diff --git a/src/client/src/routes/results/+page.svelte b/src/client/src/routes/results/+page.svelte index 7c98c77..b31680e 100644 --- a/src/client/src/routes/results/+page.svelte +++ b/src/client/src/routes/results/+page.svelte @@ -1,7 +1,7 @@
@@ -131,19 +154,69 @@ New Search
- - {#if meta?.errors?.length > 0} -
- {#each meta.errors as err} -
- {err.storeName}: {err.error} -
- {/each} + + {#if storeProgress.length > 0} +
+
+ + {#if searchDone} + All stores completed + {:else} + Searching stores ({completedCount()}/{totalStores()}) + {/if} + + {#if !searchDone} +
+
+
+ {/if} +
+
+ {#each storeProgress as store} +
+ + + {#if store.status === 'searching'} + + + + + {:else if store.status === 'done'} + + + + {:else} + + + + {/if} + + {store.name} + + {#if store.status === 'searching'} + + {store.renderJs ? 'rendering...' : 'fetching...'} + + {:else if store.status === 'done'} + {store.resultCount} products + {store.duration}ms + {:else} + {store.error} + {/if} +
+ {/each} +
{/if} - {#if !loading && results.length > 0} + {#if results.length > 0}
@@ -189,12 +262,14 @@
{#if loading} -
- {#each Array(10) as _} -
- {/each} +
+ + + + +

Connecting to stores...

- {:else if results.length === 0} + {:else if searchDone && results.length === 0}
@@ -202,7 +277,7 @@

No results found for "{query}"

Try a different search term or check your store configurations.

- {:else} + {:else if results.length > 0} @@ -231,9 +306,8 @@ - {#each filteredAndSorted() as product, i} + {#each filteredAndSorted() as product} - - - - - - - - -
{#if product.image}
@@ -247,29 +321,15 @@
{/if}
- - {product.name} - + {product.name} - - {formatPrice(product.price, product.currency)} - + {formatPrice(product.price, product.currency)} - - {product.storeName} - + {product.storeName} diff --git a/src/server/routes/search.ts b/src/server/routes/search.ts index 1a96ff2..ff954d6 100644 --- a/src/server/routes/search.ts +++ b/src/server/routes/search.ts @@ -1,7 +1,8 @@ import type { FastifyPluginAsync } from 'fastify'; -import { search } from '../scraper/engine.js'; +import { search, searchStreaming } from '../scraper/engine.js'; export const searchRoutes: FastifyPluginAsync = async (app) => { + // Original endpoint (kept for compatibility) app.get<{ Querystring: { q: string; @@ -36,4 +37,53 @@ export const searchRoutes: FastifyPluginAsync = async (app) => { groupId: group ? Number(group) : undefined, }); }); + + // SSE streaming endpoint + app.get<{ + Querystring: { + q: string; + stores?: string; + category?: string; + group?: string; + }; + }>('/search/stream', { + schema: { + querystring: { + type: 'object', + required: ['q'], + properties: { + q: { type: 'string', minLength: 1 }, + stores: { type: 'string' }, + category: { type: 'string' }, + group: { type: 'string' }, + }, + }, + }, + }, async (request, reply) => { + const { q, stores, category, group } = request.query; + + const storeIds = stores + ? stores.split(',').map(Number).filter((n) => !isNaN(n)) + : undefined; + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }); + + await searchStreaming( + { + query: q, + storeIds, + categoryId: category ? Number(category) : undefined, + groupId: group ? Number(group) : undefined, + }, + (event) => { + reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + }, + ); + + reply.raw.end(); + }); }; diff --git a/src/server/scraper/engine.ts b/src/server/scraper/engine.ts index dced660..0eb3d31 100644 --- a/src/server/scraper/engine.ts +++ b/src/server/scraper/engine.ts @@ -17,6 +17,18 @@ export interface SearchOptions { groupId?: number; } +export interface StoreProgress { + type: 'start' | 'store_complete' | 'store_error' | 'done'; + storeId?: number; + storeName?: string; + results?: Product[]; + resultCount?: number; + duration?: number; + error?: string; + stores?: Array<{ id: number; name: string; renderJs: boolean }>; + meta?: SearchResult['meta']; +} + export interface SearchResult { results: Product[]; meta: { @@ -122,3 +134,92 @@ export async function search(options: SearchOptions): Promise { }, }; } + +function resolveStores(options: SearchOptions): Store[] { + if (options.storeIds?.length) return getStoresByIds(options.storeIds); + if (options.groupId) return getStoresByGroup(options.groupId); + if (options.categoryId) return getStoresByCategory(options.categoryId); + return getEnabledStores(); +} + +export async function searchStreaming( + options: SearchOptions, + onProgress: (event: StoreProgress) => void, +): Promise { + const startTime = Date.now(); + const { query } = options; + const stores = resolveStores(options); + + if (stores.length === 0) { + onProgress({ type: 'done', meta: { query, duration: 0, storeCount: 0, totalResults: 0, errors: [] } }); + return; + } + + // Send start event with store list + onProgress({ + type: 'start', + stores: stores.map((s) => ({ id: s.id, name: s.name, renderJs: !!s.render_js })), + }); + + const limit = pLimit(MAX_CONCURRENCY); + const errors: SearchResult['meta']['errors'] = []; + let totalResults = 0; + + const scrapePromises = stores.map((store) => + limit(async () => { + const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); + const storeStart = Date.now(); + const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); + + try { + const scrapeFn = store.render_js + ? () => scrapeStoreWithBrowser(store, searchUrl) + : () => scrapeStore(store, searchUrl); + const result = await rateLimiter.schedule(scrapeFn); + const duration = Date.now() - storeStart; + + const products = result.items.map((item) => + normalizeResult(item, store.id, store.name, store.base_url, store.currency) + ); + + logScrape(store.id, query, true, products.length, duration); + totalResults += products.length; + + onProgress({ + type: 'store_complete', + storeId: store.id, + storeName: store.name, + results: products, + resultCount: products.length, + duration, + }); + } catch (err) { + const duration = Date.now() - storeStart; + const errorMessage = err instanceof Error ? err.message : String(err); + logScrape(store.id, query, false, 0, duration, errorMessage); + errors.push({ storeId: store.id, storeName: store.name, error: errorMessage }); + + onProgress({ + type: 'store_error', + storeId: store.id, + storeName: store.name, + error: errorMessage, + duration, + }); + } + }) + ); + + await Promise.all(scrapePromises); + + onProgress({ + type: 'done', + meta: { + query, + duration: Date.now() - startTime, + storeCount: stores.length, + totalResults, + errors, + }, + }); +}