|
import streamSaver from 'streamsaver' |
|
import { Api } from 'telegram' |
|
import { telegramClient } from './Telegram' |
|
import { concat } from 'concat-stream' |
|
import FastPriorityQueue from 'fastpriorityqueue' |
|
class ConnectionPool { |
|
private connections: Promise<any>[] |
|
public maxSize: number |
|
constructor(maxSize: number) { |
|
this.connections = [] |
|
this.maxSize = maxSize |
|
} |
|
async getConnection() { |
|
if (this.connections.length > 0) { |
|
return this.connections.shift() |
|
} |
|
if (this.connections.length < this.maxSize) { |
|
const connection = telegramClient.connect() |
|
this.connections.push(connection) |
|
return connection |
|
} |
|
return new Promise((resolve) => { |
|
const interval = setInterval(() => { |
|
if (this.connections.length < this.maxSize) { |
|
const connection = telegramClient.connect() |
|
this.connections.push(connection) |
|
clearInterval(interval) |
|
resolve(connection) |
|
} |
|
}, 1000) |
|
}) |
|
} |
|
releaseConnection(connection: Promise<any>) { |
|
this.connections.push(connection) |
|
} |
|
} |
|
|
|
type FileIterator = { |
|
[Symbol.asyncIterator]: () => AsyncGenerator<Uint8Array, void, unknown> |
|
} |
|
const connectionPool = new ConnectionPool(5) |
|
const cache = new Map<string, Uint8Array>() |
|
async function* generateChunks( |
|
clients: any[], |
|
media: any, |
|
i: number, |
|
numParallel: number, |
|
bufferSize: number = 1024 * 1024 |
|
): AsyncGenerator<Uint8Array, void, unknown> { |
|
const numConnections = clients.length |
|
let connIndex = i % numConnections |
|
let offset = Math.floor(i / numConnections) * media.size / numParallel |
|
const limit = media.size / numParallel |
|
const promises: Promise<Uint8Array[]>[] = [] |
|
let buffer: Uint8Array[] = [] |
|
let bufferLength = 0 |
|
for (let j = 0; j < numParallel; j++) { |
|
const client = clients[connIndex] |
|
promises.push( |
|
client.downloadMedia(media, { offset, limit }) |
|
) |
|
offset += limit |
|
connIndex = (connIndex + 1) % numConnections |
|
} |
|
const chunksArray = await Promise.allSettled(promises) |
|
for (const result of chunksArray) { |
|
if (result.status === 'fulfilled') { |
|
const chunks = result.value |
|
for (const chunk of chunks) { |
|
buffer.push(chunk) |
|
bufferLength += chunk.byteLength |
|
if (bufferLength >= bufferSize) { |
|
const concatenated = concat(buffer) |
|
yield concatenated |
|
buffer = [] |
|
bufferLength = 0 |
|
} |
|
} |
|
} else { |
|
console.error(result.reason) |
|
} |
|
} |
|
if (bufferLength > 0) { |
|
const concatenated = concat(buffer) |
|
yield concatenated |
|
} |
|
} |
|
export async function download( |
|
id: string, |
|
numParallel: number = 1 |
|
): Promise<ReadableStream<Uint8Array>> { |
|
const fileIterators: FileIterator[] = [] |
|
const cachedData = cache.get(id) |
|
if (cachedData) { |
|
return new ReadableStream({ |
|
start(controller) { |
|
controller.enqueue(cachedData) |
|
controller.close() |
|
} |
|
}) |
|
} |
|
const clients: any[] = [] |
|
for (let i = 0; i < numParallel; i++) { |
|
clients.push(await connectionPool.getConnection()) |
|
} |
|
try { |
|
const { data: response } = await clients[0].invoke( |
|
new Api.messages.GetMessages({ |
|
id: [new Api.InputMessageID({ id: Number(id) })] |
|
}) |
|
) |
|
const media = response.messages[0].media |
|
for (let i = 0; i < numParallel; i++) { |
|
fileIterators.push({ |
|
[Symbol.asyncIterator]: generateChunks.bind( |
|
null, |
|
clients, |
|
media, |
|
i, |
|
numParallel |
|
) |
|
}) |
|
} |
|
const streams: ReadableStream<Uint8Array>[] = [] |
|
for (const fileIterator of fileIterators) { |
|
const stream = new ReadableStream({ |
|
async start(controller) { |
|
for await (const chunk of fileIterator) { |
|
controller.enqueue(chunk) |
|
} |
|
controller.close() |
|
} |
|
}) |
|
streams.push(stream) |
|
} |
|
return mergeStreams(...streams) |
|
} finally { |
|
for (const client of clients) { |
|
connectionPool.releaseConnection(client) |
|
} |
|
} |
|
} |
|
export const directDownload = async ( |
|
id: string, |
|
name: string, |
|
numParallel: number = 1 |
|
): Promise<void> => { |
|
const fileStream = streamSaver.createWriteStream(name) |
|
const writer = fileStream.getWriter() |
|
try { |
|
const streams = await download(id, numParallel) |
|
|
|
const mergedStream = mergeStreams(streams) |
|
const reader = mergedStream.getReader() |
|
const pump = async () => { |
|
const { done, value } = await reader.read() |
|
if (done) { |
|
if (value) { |
|
cache.set(id, value) |
|
} |
|
writer.close() |
|
return |
|
} |
|
writer.write(value) |
|
pump() |
|
} |
|
pump() |
|
} catch (error) { |
|
console.error(error) |
|
} |
|
} |
|
|
|
function mergeStreams(...streams: ReadableStream<Uint8Array>[]): ReadableStream<Uint8Array> { |
|
if (streams.length === 1) { |
|
return streams[0] |
|
} |
|
const mid = Math.floor(streams.length / 2) |
|
const left = mergeStreams(...streams.slice(0, mid)) |
|
const right = mergeStreams(...streams.slice(mid)) |
|
|
|
const heap = new FastPriorityQueue((a: any, b: any) => a[0] > b[0]) |
|
|
|
const leftReader = left.getReader() |
|
const rightReader = right.getReader() |
|
const read = async (reader: ReadableStreamDefaultReader<Uint8Array>) => { |
|
const { done, value } = await reader.read() |
|
if (!done && value !== undefined) { |
|
heap.add([value.byteLength, reader]) |
|
} |
|
} |
|
read(leftReader) |
|
read(rightReader) |
|
const combinedStream = new ReadableStream({ |
|
async start(controller) { |
|
while (!heap.isEmpty()) { |
|
const [_, reader] = heap.poll() |
|
const { done, value } = await reader.read() |
|
if (done) { |
|
if (!heap.isEmpty()) { |
|
const next = heap.poll() |
|
heap.add(next) |
|
} |
|
} else { |
|
if (value !== undefined) { |
|
controller.enqueue(value) |
|
heap.add([value.byteLength, reader]) |
|
} |
|
} |
|
} |
|
controller.close() |
|
} |
|
}) |
|
return combinedStream |
|
} |