import pLimit from 'p-limit'; import type { Store } from '../models/store.js'; import { getEnabledStores, getStoresByCategory, getStoresByGroup, getStoresByIds, updateScrapeStatus } from '../models/store.js'; import { logScrape } from '../models/scrape-log.js'; import { scrapeStore } from './http-scraper.js'; import { scrapeStoreWithBrowser } from './browser-scraper.js'; import { normalizeResult, type Product } from './result-parser.js'; import { getLimiter } from './rate-limiter.js'; const MAX_CONCURRENCY = 5; const SEARCH_TIMEOUT = 60_000; export interface SearchOptions { query: string; storeIds?: number[]; categoryId?: number; groupId?: number; } export interface StoreProgress { type: 'start' | 'store_complete' | 'store_error' | 'done'; storeId?: number; storeName?: string; results?: Product[]; resultCount?: number; duration?: number; error?: string; stores?: Array<{ id: number; name: string; renderJs: boolean }>; meta?: SearchResult['meta']; } export interface SearchResult { results: Product[]; meta: { query: string; duration: number; storeCount: number; totalResults: number; errors: Array<{ storeId: number; storeName: string; error: string }>; }; } export async function search(options: SearchOptions): Promise { const startTime = Date.now(); const { query } = options; // Determine which stores to scrape let stores: Store[]; if (options.storeIds?.length) { stores = getStoresByIds(options.storeIds); } else if (options.groupId) { stores = getStoresByGroup(options.groupId); } else if (options.categoryId) { stores = getStoresByCategory(options.categoryId); } else { stores = getEnabledStores(); } if (stores.length === 0) { return { results: [], meta: { query, duration: Date.now() - startTime, storeCount: 0, totalResults: 0, errors: [] }, }; } const limit = pLimit(MAX_CONCURRENCY); const errors: SearchResult['meta']['errors'] = []; const allProducts: Product[] = []; // Create an overall timeout const timeoutPromise = new Promise((_, reject) => setTimeout(() => reject(new Error('Search timeout')), SEARCH_TIMEOUT) ); const scrapePromises = stores.map((store) => limit(async () => { const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); const storeStart = Date.now(); const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); try { const scrapeFn = store.render_js ? () => scrapeStoreWithBrowser(store, searchUrl) : () => scrapeStore(store, searchUrl); const result = await rateLimiter.schedule(scrapeFn); const duration = Date.now() - storeStart; const products = result.items.map((item) => normalizeResult(item, store.id, store.name, store.base_url, store.currency) ); logScrape(store.id, query, true, products.length, duration); updateScrapeStatus(store.id, true); return products; } catch (err) { const duration = Date.now() - storeStart; const errorMessage = err instanceof Error ? err.message : String(err); logScrape(store.id, query, false, 0, duration, errorMessage); updateScrapeStatus(store.id, false); errors.push({ storeId: store.id, storeName: store.name, error: errorMessage }); return []; } }) ); try { const results = await Promise.race([ Promise.all(scrapePromises), timeoutPromise, ]) as Product[][]; for (const products of results) { allProducts.push(...products); } } catch (err) { // Timeout — collect whatever we have errors.push({ storeId: 0, storeName: 'System', error: 'Search timed out' }); } // Sort by price ascending, nulls last allProducts.sort((a, b) => { if (a.price === null && b.price === null) return 0; if (a.price === null) return 1; if (b.price === null) return -1; return a.price - b.price; }); return { results: allProducts, meta: { query, duration: Date.now() - startTime, storeCount: stores.length, totalResults: allProducts.length, errors, }, }; } function resolveStores(options: SearchOptions): Store[] { if (options.storeIds?.length) return getStoresByIds(options.storeIds); if (options.groupId) return getStoresByGroup(options.groupId); if (options.categoryId) return getStoresByCategory(options.categoryId); return getEnabledStores(); } export async function searchStreaming( options: SearchOptions, onProgress: (event: StoreProgress) => void, ): Promise { const startTime = Date.now(); const { query } = options; const stores = resolveStores(options); if (stores.length === 0) { onProgress({ type: 'done', meta: { query, duration: 0, storeCount: 0, totalResults: 0, errors: [] } }); return; } // Sort: cheerio stores first (fast), Puppeteer stores last (slow) stores.sort((a, b) => (a.render_js || 0) - (b.render_js || 0)); // Send start event with store list onProgress({ type: 'start', stores: stores.map((s) => ({ id: s.id, name: s.name, renderJs: !!s.render_js })), }); // Yield so the SSE start event flushes to client await new Promise((r) => setTimeout(r, 0)); const errors: SearchResult['meta']['errors'] = []; let totalResults = 0; // Process stores sequentially so SSE events flush between each for (const store of stores) { const searchUrl = store.search_url.replace('{query}', encodeURIComponent(query)); const storeStart = Date.now(); const rateLimiter = getLimiter(store.id, 1, Math.floor(store.rate_window / store.rate_limit)); try { const scrapeFn = store.render_js ? () => scrapeStoreWithBrowser(store, searchUrl) : () => scrapeStore(store, searchUrl); const result = await rateLimiter.schedule(scrapeFn); const duration = Date.now() - storeStart; const products = result.items.map((item) => normalizeResult(item, store.id, store.name, store.base_url, store.currency) ); logScrape(store.id, query, true, products.length, duration); updateScrapeStatus(store.id, true); totalResults += products.length; onProgress({ type: 'store_complete', storeId: store.id, storeName: store.name, results: products, resultCount: products.length, duration, }); } catch (err) { const duration = Date.now() - storeStart; const errorMessage = err instanceof Error ? err.message : String(err); logScrape(store.id, query, false, 0, duration, errorMessage); updateScrapeStatus(store.id, false); errors.push({ storeId: store.id, storeName: store.name, error: errorMessage }); onProgress({ type: 'store_error', storeId: store.id, storeName: store.name, error: errorMessage, duration, }); } // Yield between stores so SSE events flush await new Promise((r) => setTimeout(r, 0)); } onProgress({ type: 'done', meta: { query, duration: Date.now() - startTime, storeCount: stores.length, totalResults, errors, }, }); }