File size: 3,325 Bytes
4112422
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
"""
Test script for RunPod serverless endpoint
"""
import base64
import requests
import json
import sys

def test_runpod_api(pdf_path, runpod_endpoint, runpod_api_key):
    """Test PDF conversion on RunPod"""
    
    # Read PDF and encode to base64
    with open(pdf_path, 'rb') as f:
        pdf_data = f.read()
    
    pdf_base64 = base64.b64encode(pdf_data).decode('utf-8')
    
    # Prepare request
    headers = {
        'Authorization': f'Bearer {runpod_api_key}',
        'Content-Type': 'application/json'
    }
    
    payload = {
        'input': {
            'pdf_base64': pdf_base64,
            'filename': pdf_path.split('/')[-1]
        }
    }
    
    print(f"Sending PDF to RunPod: {pdf_path}")
    print(f"File size: {len(pdf_data)} bytes")
    
    # Send request
    response = requests.post(
        f"{runpod_endpoint}/run",
        headers=headers,
        json=payload
    )
    
    if response.status_code == 200:
        result = response.json()
        job_id = result.get('id')
        print(f"Job submitted: {job_id}")
        
        # Poll for result
        status_url = f"{runpod_endpoint}/status/{job_id}"
        
        while True:
            status_response = requests.get(status_url, headers=headers)
            if status_response.status_code == 200:
                status_data = status_response.json()
                
                if status_data['status'] == 'COMPLETED':
                    output = status_data.get('output', {})
                    
                    if output.get('status') == 'success':
                        markdown = output.get('markdown', '')
                        print(f"\nConversion successful!")
                        print(f"Markdown length: {len(markdown)} characters")
                        print(f"Pages: {output.get('pages', 'unknown')}")
                        
                        # Save result
                        output_file = pdf_path.replace('.pdf', '_runpod.md')
                        with open(output_file, 'w') as f:
                            f.write(markdown)
                        print(f"Saved to: {output_file}")
                    else:
                        print(f"Conversion failed: {output.get('error')}")
                    break
                    
                elif status_data['status'] == 'FAILED':
                    print(f"Job failed: {status_data.get('error')}")
                    break
                    
                else:
                    print(f"Status: {status_data['status']}")
                    import time
                    time.sleep(2)
            else:
                print(f"Status check failed: {status_response.status_code}")
                break
    else:
        print(f"Request failed: {response.status_code}")
        print(response.text)

if __name__ == "__main__":
    # Example usage
    if len(sys.argv) < 3:
        print("Usage: python test_runpod.py <pdf_file> <runpod_endpoint> <api_key>")
        print("Example: python test_runpod.py test.pdf https://api.runpod.ai/v2/your-endpoint your-api-key")
        sys.exit(1)
    
    pdf_file = sys.argv[1]
    endpoint = sys.argv[2]
    api_key = sys.argv[3] if len(sys.argv) > 3 else input("RunPod API Key: ")
    
    test_runpod_api(pdf_file, endpoint, api_key)