import streamSaver from 'streamsaver' import { Api } from 'telegram' import { telegramClient } from './Telegram' import { concat } from 'concat-stream' import FastPriorityQueue from 'fastpriorityqueue' class ConnectionPool { private connections: Promise[] public maxSize: number constructor(maxSize: number) { this.connections = [] this.maxSize = maxSize } async getConnection() { if (this.connections.length > 0) { return this.connections.shift() } if (this.connections.length < this.maxSize) { const connection = telegramClient.connect() this.connections.push(connection) return connection } return new Promise((resolve) => { const interval = setInterval(() => { if (this.connections.length < this.maxSize) { const connection = telegramClient.connect() this.connections.push(connection) clearInterval(interval) resolve(connection) } }, 1000) }) } releaseConnection(connection: Promise) { this.connections.push(connection) } } // Declare the proper type for file iterators type FileIterator = { [Symbol.asyncIterator]: () => AsyncGenerator } const connectionPool = new ConnectionPool(5) // set maximum pool size to 5 const cache = new Map() // create a cache for downloaded data async function* generateChunks( clients: any[], media: any, i: number, numParallel: number, bufferSize: number = 1024 * 1024 // set buffer size to 1 MB ): AsyncGenerator { const numConnections = clients.length let connIndex = i % numConnections let offset = Math.floor(i / numConnections) * media.size / numParallel const limit = media.size / numParallel const promises: Promise[] = [] let buffer: Uint8Array[] = [] let bufferLength = 0 for (let j = 0; j < numParallel; j++) { const client = clients[connIndex] promises.push( client.downloadMedia(media, { offset, limit }) ) offset += limit connIndex = (connIndex + 1) % numConnections } const chunksArray = await Promise.allSettled(promises) for (const result of chunksArray) { if (result.status === 'fulfilled') { const chunks = result.value for (const chunk of chunks) { buffer.push(chunk) bufferLength += chunk.byteLength if (bufferLength >= bufferSize) { const concatenated = concat(buffer) yield concatenated buffer = [] bufferLength = 0 } } } else { console.error(result.reason) } } if (bufferLength > 0) { const concatenated = concat(buffer) yield concatenated } } export async function download( id: string, numParallel: number = 1 ): Promise> { const fileIterators: FileIterator[] = [] const cachedData = cache.get(id) if (cachedData) { return new ReadableStream({ start(controller) { controller.enqueue(cachedData) controller.close() } }) } const clients: any[] = [] for (let i = 0; i < numParallel; i++) { clients.push(await connectionPool.getConnection()) } try { const { data: response } = await clients[0].invoke( new Api.messages.GetMessages({ id: [new Api.InputMessageID({ id: Number(id) })] }) ) const media = response.messages[0].media for (let i = 0; i < numParallel; i++) { fileIterators.push({ [Symbol.asyncIterator]: generateChunks.bind( null, clients, media, i, numParallel ) }) } const streams: ReadableStream[] = [] for (const fileIterator of fileIterators) { const stream = new ReadableStream({ async start(controller) { for await (const chunk of fileIterator) { controller.enqueue(chunk) } controller.close() } }) streams.push(stream) } return mergeStreams(...streams) } finally { for (const client of clients) { connectionPool.releaseConnection(client) } } } export const directDownload = async ( id: string, name: string, numParallel: number = 1 ): Promise => { const fileStream = streamSaver.createWriteStream(name) const writer = fileStream.getWriter() try { const streams = await download(id, numParallel) // Combine the streams using a function like mergeStreams in place of [stream1, stream2, ..., streamN] const mergedStream = mergeStreams(streams) const reader = mergedStream.getReader() const pump = async () => { const { done, value } = await reader.read() if (done) { if (value) { // add null check here cache.set(id, value) } writer.close() return } writer.write(value) pump() } pump() } catch (error) { console.error(error) } } function mergeStreams(...streams: ReadableStream[]): ReadableStream { if (streams.length === 1) { return streams[0] } const mid = Math.floor(streams.length / 2) const left = mergeStreams(...streams.slice(0, mid)) const right = mergeStreams(...streams.slice(mid)) // Use FastPriorityQueue instead of an array and custom heapify/siftDown functions const heap = new FastPriorityQueue((a: any, b: any) => a[0] > b[0]) // Initialize heap with the first chunk from each stream const leftReader = left.getReader() const rightReader = right.getReader() const read = async (reader: ReadableStreamDefaultReader) => { const { done, value } = await reader.read() if (!done && value !== undefined) { heap.add([value.byteLength, reader]) } } read(leftReader) read(rightReader) const combinedStream = new ReadableStream({ async start(controller) { while (!heap.isEmpty()) { const [_, reader] = heap.poll() const { done, value } = await reader.read() if (done) { if (!heap.isEmpty()) { const next = heap.poll() heap.add(next) } } else { if (value !== undefined) { controller.enqueue(value) heap.add([value.byteLength, reader]) } } } controller.close() } }) return combinedStream }