// Minimal async-iterator implementation of the listChatModels pipeline. // Yields plain JSON-serializable progress objects. Uses per-request AbortControllers // and a finally block so iterator.return() causes cleanup. export async function* listChatModelsIterator(params = {}) { const opts = Object.assign({ maxCandidates: 250, concurrency: 12, hfToken: null, timeoutMs: 10000, maxListing: 5000 }, params || {}); const { maxCandidates, concurrency, hfToken, timeoutMs, maxListing } = opts; const MAX_TOTAL_TO_FETCH = Math.min(maxListing, 5000); const PAGE_SIZE = 1000; const RETRIES = 3; const BACKOFF_BASE_MS = 200; const inFlight = new Set(); // Adaptive concurrency and telemetry const counters = { configFetch429: 0, configFetch200: 0, configFetchError: 0 }; const initialConcurrency = Math.max(1, concurrency || 1); let effectiveConcurrency = initialConcurrency; let availableTokens = effectiveConcurrency; const tokenWaiters = []; // sliding window for 429 detection const recent429s = []; const RATE_WINDOW_MS = 30_000; // 30s window const RATE_THRESHOLD = 10; // triggers backoff const BACKOFF_WINDOW_MS = 30_000; // backoff period when triggered let rateLimitedUntil = 0; function pruneOld429s(now) { while (recent429s.length && recent429s[0] < now - RATE_WINDOW_MS) recent429s.shift(); } function record429() { const now = Date.now(); recent429s.push(now); pruneOld429s(now); if (recent429s.length >= RATE_THRESHOLD) { // reduce concurrency conservatively const newEff = Math.max(1, Math.floor(effectiveConcurrency / 2)); const reduction = effectiveConcurrency - newEff; if (reduction > 0) { effectiveConcurrency = newEff; // reduce availableTokens accordingly (don't abort in-flight) availableTokens = Math.max(0, availableTokens - reduction); } rateLimitedUntil = now + BACKOFF_WINDOW_MS; } } function maybeRestoreConcurrency() { const now = Date.now(); pruneOld429s(now); if (now < rateLimitedUntil) return; // still in backoff if (recent429s.length === 0 && effectiveConcurrency < initialConcurrency) { effectiveConcurrency = Math.min(initialConcurrency, effectiveConcurrency + 1); availableTokens = Math.min(availableTokens + 1, effectiveConcurrency); // wake one waiter if present if (tokenWaiters.length) { const w = tokenWaiters.shift(); if (w) w(); } } } async function acquireToken() { if (availableTokens > 0) { availableTokens--; return; } // wait until a token is available await new Promise(resolve => tokenWaiters.push(resolve)); // when resolved, consume token availableTokens = Math.max(0, availableTokens - 1); } function releaseToken() { availableTokens = Math.min(effectiveConcurrency, availableTokens + 1); if (tokenWaiters.length > 0 && availableTokens > 0) { const w = tokenWaiters.shift(); if (w) w(); } } // helper: fetchConfigForModel (tries multiple paths, per-request timeouts & retries) async function fetchConfigForModel(modelId) { const urls = [ `https://huggingface.co/${encodeURIComponent(modelId)}/resolve/main/config.json`, `https://huggingface.co/${encodeURIComponent(modelId)}/resolve/main/config/config.json`, `https://huggingface.co/${encodeURIComponent(modelId)}/resolve/main/adapter_config.json` ]; for (const url of urls) { for (let attempt = 0; attempt <= RETRIES; attempt++) { // Use AbortController to allow caller to cancel, but do not enforce our own timeout here. const controller = new AbortController(); inFlight.add(controller); try { const resp = await fetch( url, { signal: controller.signal, headers: hfToken ? { Authorization: `Bearer ${hfToken}` } : {}, cache: 'force-cache' }); if (resp.status === 200) { const json = await resp.json(); counters.configFetch200++; return { status: 'ok', model_type: json.model_type || null, architectures: json.architectures || null }; } if (resp.status === 401 || resp.status === 403) return { status: 'auth', code: resp.status }; if (resp.status === 404) break; // try next fallback if (resp.status === 429) { counters.configFetch429++; // record rate-limit and maybe reduce concurrency record429(); const backoff = BACKOFF_BASE_MS * Math.pow(2, attempt); await new Promise(r => setTimeout(r, backoff)); continue; } counters.configFetchError++; return { status: 'error', code: resp.status, message: `fetch failed ${resp.status}` }; } catch (err) { if (attempt === RETRIES) { counters.configFetchError++; return { status: 'error', message: String(err) }; } const backoff = BACKOFF_BASE_MS * Math.pow(2, attempt); await new Promise(r => setTimeout(r, backoff)); } finally { try { inFlight.delete(controller); } catch (e) {} } } } return { status: 'no-config' }; } function classifyModel(rawModel, fetchResult) { const id = rawModel.modelId || rawModel.id || rawModel.model || rawModel.modelId; const hasTokenizer = rawModel.hasTokenizer || false; const hasOnnxModel = rawModel.hasOnnxModel || false; const isTransformersJsReady = rawModel.isTransformersJsReady || false; const entry = { id, model_type: null, architectures: null, classification: 'unknown', confidence: 'low', fetchStatus: 'error', hasTokenizer, hasOnnxModel, isTransformersJsReady }; if (!fetchResult) return entry; if (fetchResult.status === 'auth') { entry.classification = 'auth-protected'; entry.confidence = 'high'; entry.fetchStatus = String(fetchResult.code || 401); return entry; } if (fetchResult.status === 'ok') { entry.model_type = fetchResult.model_type || null; entry.architectures = Array.isArray(fetchResult.architectures) ? fetchResult.architectures : null; entry.fetchStatus = 'ok'; const deny = ['bert','roberta','distilbert','electra','albert','deberta','mobilebert','convbert','sentence-transformers']; const allow = ['gpt2','gptj','gpt_neox','llama','qwen','qwen2','mistral','phi','phi3','t5','bart','pegasus','gemma','gemma2','gemma3','falcon','bloom','lfm2']; if (entry.model_type && deny.includes(entry.model_type)) { entry.classification = 'encoder'; entry.confidence = 'high'; return entry; } if (entry.model_type && allow.includes(entry.model_type)) { entry.classification = 'gen'; entry.confidence = 'high'; return entry; } // Also check for model_type variations with underscores/dashes const normalizedModelType = entry.model_type && entry.model_type.replace(/[-_]/g, ''); if (normalizedModelType) { const normalizedAllow = allow.map(t => t.replace(/[-_]/g, '')); const normalizedDeny = deny.map(t => t.replace(/[-_]/g, '')); if (normalizedDeny.includes(normalizedModelType)) { entry.classification = 'encoder'; entry.confidence = 'high'; return entry; } if (normalizedAllow.includes(normalizedModelType)) { entry.classification = 'gen'; entry.confidence = 'high'; return entry; } } const arch = entry.architectures; if (arch && Array.isArray(arch)) { for (let i = 0; i < arch.length; i++) { const a = String(arch[i]).toLowerCase(); if (allow.includes(a)) { entry.classification = 'gen'; entry.confidence = 'high'; return entry; } if (deny.includes(a)) { entry.classification = 'encoder'; entry.confidence = 'high'; return entry; } } } entry.classification = 'unknown'; entry.confidence = 'low'; return entry; } if (fetchResult.status === 'no-config') { const pipeline = rawModel.pipeline_tag || ''; if (pipeline && pipeline.startsWith('text-generation')) { entry.classification = 'gen'; entry.confidence = 'medium'; } else entry.classification = 'unknown'; entry.confidence = 'low'; entry.fetchStatus = '404'; return entry; } if (fetchResult.status === 'error') { entry.classification = 'unknown'; entry.confidence = 'low'; entry.fetchStatus = 'error'; entry.fetchError = { message: fetchResult.message, code: fetchResult.code }; return entry; } return entry; } // Main pipeline let listing = []; try { // 1) listing let offset = 0; while (listing.length < MAX_TOTAL_TO_FETCH) { const url = `https://huggingface.co/api/models?full=true&limit=${PAGE_SIZE}&offset=${offset}`; let ok = false; for (let attempt = 0; attempt <= RETRIES && !ok; attempt++) { try { const resp = await fetch( url, { headers: hfToken ? { Authorization: `Bearer ${hfToken}` } : {}, cache: 'force-cache' } ); if (resp.status === 429) { const backoff = BACKOFF_BASE_MS * Math.pow(2, attempt); await new Promise(r => setTimeout(r, backoff)); continue; } if (!resp.ok) throw Object.assign(new Error(`listing-fetch-failed:${resp.status}`), { code: 'listing_fetch_failed', status: resp.status }); const page = await resp.json(); if (!Array.isArray(page) || page.length === 0) { ok = true; break; } listing.push(...page); offset += PAGE_SIZE; ok = true; } catch (err) { if (attempt === RETRIES) throw err; await new Promise(r => setTimeout(r, BACKOFF_BASE_MS * Math.pow(2, attempt))); } } if (!ok) break; } // emit listing_done yield { status: 'listing_done', totalFound: listing.length }; // 2) prefilter const denyPipeline = new Set(['feature-extraction', 'fill-mask', 'sentence-similarity', 'masked-lm']); const survivors = []; for (const m of listing) { if (survivors.length >= maxCandidates) break; const pipeline = m.pipeline_tag; if (pipeline && denyPipeline.has(pipeline)) continue; // Normalize model id (HF listing uses `id` commonly; some code expects `modelId`) const modelId = (m.modelId || m.id || m.model || '').toString(); if (modelId && modelId.includes('sentence-transformers')) continue; // Sibling entries from HF API may be strings or objects with different keys const siblings = m.siblings || []; const hasTokenizer = siblings.some((s) => { if (!s) return false; let name = null; if (typeof s === 'string') name = s; else if (typeof s === 'object') name = s.rfilename || s.name || s.path || s.filename || s.repo_file || s.file || null; if (!name) return false; return /tokenizer|vocab|merges|sentencepiece/i.test(String(name)); }); // Check for ONNX model files that transformers.js needs const hasOnnxModel = siblings.some((s) => { if (!s) return false; let name = null; if (typeof s === 'string') name = s; else if (typeof s === 'object') name = s.rfilename || s.name || s.path || s.filename || s.repo_file || s.file || null; if (!name) return false; // Look for ONNX files - transformers.js needs various ONNX model files return /onnx\/.*\.onnx|onnx\\.*\.onnx|.*model.*\.onnx|.*decoder.*\.onnx/i.test(String(name)); }); // Filter out models that lack required files // Models must have both tokenizer files AND ONNX model files, OR be auth-protected if (!hasTokenizer || !hasOnnxModel) { // Only keep if it's likely auth-protected or has text-generation pipeline with both files if (!pipeline || !pipeline.toLowerCase().includes('text-generation')) continue; if (!hasTokenizer) continue; // Always require tokenizer } // Check if model explicitly supports transformers.js const isTransformersJsReady = (m.library_name === 'transformers.js') || (Array.isArray(m.tags) && m.tags.includes('transformers.js')) || (Array.isArray(m.tags) && m.tags.includes('onnx')); // Preserve flags for later filtering m.hasTokenizer = hasTokenizer; m.hasOnnxModel = hasOnnxModel; m.isTransformersJsReady = isTransformersJsReady; survivors.push(m); } yield { status: 'prefiltered', survivors: survivors.length }; // 3) concurrent config fetch & classify using an event queue so workers can emit // progress while the generator yields them. const results = []; const errors = []; let idx = 0; let processed = 0; const events = []; let resolveNext = null; function emit(ev) { events.push(ev); if (resolveNext) { resolveNext(); resolveNext = null; } } async function nextEvent() { while (events.length === 0) { await new Promise(r => { resolveNext = r; }); } return events.shift(); } // start a small restore timer to gradually increase concurrency when safe const restoreInterval = setInterval(maybeRestoreConcurrency, 5000); const workerCount = Math.min(initialConcurrency, survivors.length || 1); const pool = new Array(workerCount).fill(0).map(async () => { while (true) { const i = idx++; if (i >= survivors.length) break; const model = survivors[i]; const modelId = model.modelId || model.id || model.model || model.modelId; try { // acquire concurrency token await acquireToken(); emit({ modelId, status: 'config_fetching' }); const fetchResult = await fetchConfigForModel(modelId); const entry = classifyModel(model, fetchResult); results.push(entry); emit({ modelId, status: 'classified', data: entry }); } catch (err) { errors.push({ modelId, message: String(err) }); emit({ modelId, status: 'error', data: { message: String(err) } }); } finally { processed++; // release concurrency token so other workers can proceed try { releaseToken(); } catch (e) {} } } }); // consume events as workers produce them while (processed < survivors.length) { const ev = await nextEvent(); yield ev; } // make sure any remaining events are yielded while (events.length > 0) { yield events.shift(); } await Promise.all(pool); // final // Select models: auth-protected regardless of classification, or generation-capable with both tokenizers and ONNX files // Prioritize transformers.js-ready models const authRequired = results.filter(r => r.classification === 'auth-protected').slice(0, 50); const genCapable = results.filter(r => r.classification === 'gen' && r.hasTokenizer && r.hasOnnxModel); // Sort generation-capable models: transformers.js-ready first, then others genCapable.sort((a, b) => { if (a.isTransformersJsReady && !b.isTransformersJsReady) return -1; if (!a.isTransformersJsReady && b.isTransformersJsReady) return 1; return 0; }); const nonAuth = genCapable.slice(0, 50); const selected = nonAuth.concat(authRequired); const models = selected.map(r => ({ id: r.id, model_type: r.model_type, architectures: r.architectures, classification: r.classification, confidence: r.confidence, fetchStatus: r.fetchStatus, hasTokenizer: r.hasTokenizer, hasOnnxModel: r.hasOnnxModel, isTransformersJsReady: r.isTransformersJsReady })); const meta = { fetched: listing.length, filtered: survivors.length, errors, selected: { nonAuth: nonAuth.length, authRequired: authRequired.length, total: models.length } }; if (params && params.debug) meta.counters = Object.assign({}, counters); yield { status: 'done', models, meta }; } finally { // abort any in-flight fetches if iteration stopped early for (const c of Array.from(inFlight)) try { c.abort(); } catch (e) {} // cleanup restore timer try { clearInterval(restoreInterval); } catch (e) {} } }