Spaces:
Sleeping
Sleeping
import os | |
import requests | |
from googleapiclient.discovery import build | |
from google.oauth2 import service_account | |
from googleapiclient.http import MediaFileUpload | |
import io | |
################################ | |
######### Variables ############ | |
################################ | |
# -- Get environment variables | |
CLIENT_ID = os.getenv('CLIENT_ID') | |
CLIENT_EMAIL = os.getenv('CLIENT_EMAIL') | |
PRIVATE_KEY_ID = os.getenv('PRIVATE_KEY_ID') | |
PRIVATE_KEY = os.getenv('PRIVATE_KEY').replace('\\n', '\n') | |
PROJECT_ID = os.getenv("PROJECT_ID") | |
CLIENT_X509_CERT_URL = os.getenv("CLIENT_X509_CERT_URL") | |
# -- Define your OAuth2 credentials directly | |
JSON_DATA = { | |
"type": "service_account", | |
"project_id": PROJECT_ID, | |
"private_key_id": PRIVATE_KEY_ID, | |
"private_key": PRIVATE_KEY, | |
"client_email": CLIENT_EMAIL, | |
"client_id": CLIENT_ID, | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"client_x509_cert_url": CLIENT_X509_CERT_URL, | |
"universe_domain": "googleapis.com" | |
} | |
################################ | |
####### GenericFunctions ####### | |
################################ | |
# -- Authentication | |
def get_drive_service(): | |
""" | |
Authenticate and return the Google Drive API service. | |
""" | |
# Build and return the Drive service | |
credentials = service_account.Credentials.from_service_account_info( | |
JSON_DATA, | |
scopes=['https://www.googleapis.com/auth/drive'] | |
) | |
service = build('drive', 'v3', credentials=credentials) | |
return service | |
# -- List all files | |
def list_all_files(): | |
""" | |
List all file IDs and names in Google Drive. | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
try: | |
results = drive_service.files().list(fields="nextPageToken, files(id, name)").execute() | |
files = results.get('files', []) | |
if not files: | |
print("No files found in Google Drive.") | |
else: | |
for file in files: | |
print(f"File ID: {file['id']}, Name: {file['name']}") | |
except Exception as e: | |
print(f"Error listing files: {e}") | |
raise e | |
# -- Get File ID from File Name | |
def get_files_id_by_name(file_names): | |
""" | |
List file IDs for specific file names in Google Drive. | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
# Set the query parameters | |
fields = 'files(id, name)' | |
pageSize = 1000 # Set an appropriate page size to retrieve all files | |
# List the files matching the query | |
results = drive_service.files().list( | |
fields=fields, | |
pageSize=pageSize | |
).execute() | |
files = results.get('files', []) | |
return files[0] | |
# -- Create a new file | |
def create_file(file_path): | |
""" | |
Create a new file on Google Drive. | |
""" | |
# Build the Drive service | |
print(file_path) | |
drive_service = get_drive_service() | |
try: | |
file_name = os.path.basename(file_path) | |
media = MediaFileUpload(file_path, mimetype='application/octet-stream') | |
file_metadata = {'name': file_name} | |
file = drive_service.files().create( | |
body=file_metadata, | |
media_body=media, | |
fields='id' | |
).execute() | |
print(f"Uploaded '{file_name}' with ID: {file['id']}") | |
return file | |
except Exception as e: | |
print(f"Upload error: {e}") | |
raise e | |
# -- Update existing file | |
def update_file(file_path): | |
""" | |
Update an existing file on Google Drive. | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
try: | |
# get file id | |
file_name = os.path.basename(file_path) | |
file_id = get_files_id_by_name(file_name) | |
file_metadata = { | |
'name': file_name | |
} | |
# Update the file | |
media_body = MediaFileUpload(file_path, mimetype='application/octet-stream') | |
file = drive_service.files().update( | |
fileId=file_id['id'], | |
body=file_metadata, | |
media_body=media_body | |
).execute() | |
print(f"Uploaded '{file_name}' with ID: {file['id']}") | |
return file | |
except Exception as e: | |
print(f"Upload error: {e}") | |
raise e | |
# -- Donwload file to local | |
def download_file(file_name, save_path): | |
""" | |
Download Google Drive to local | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
try: | |
file_id = get_files_id_by_name(file_name) | |
request = drive_service.files().get_media(fileId=file_id['id']) | |
fh = io.FileIO(save_path+file_name, 'wb') | |
# Download the file in chunks and write to the local file | |
downloader = request.execute().decode("utf-8") | |
if 'size' in downloader: | |
file_size = int(downloader['size']) | |
chunk_size = 1024 * 1024 # 1MB chunks (adjust as needed) | |
while downloader: | |
if 'data' in downloader: | |
fh.write(downloader['data'].encode('utf-8')) | |
status, downloader = service.files().get_media(fileId=file_id, downloadStatus=status).execute() | |
print(f"Downloaded {fh.tell()}/{file_size} bytes.") | |
else: | |
fh.write(downloader.encode('utf-8')) | |
print(f"Downloaded file '{file_id}' to '{save_path}'") | |
except Exception as e: | |
print(f"Download error: {e}") | |
raise e | |
# -- Delete a file by its ID | |
def delete_file(file_id): | |
""" | |
Delete a file in Google Drive by its ID. | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
# Deleting specific file | |
try: | |
drive_service.files().delete(fileId=file_id).execute() | |
print(f"Deleted file with ID: {file_id}") | |
except Exception as e: | |
print(f"Error deleting file with ID {file_id}: {e}") | |
raise e | |
# -- List and delete all files | |
def delete_all_files(): | |
""" | |
List and delete all files in Google Drive. | |
""" | |
# Build the Drive service | |
drive_service = get_drive_service() | |
# Set the query parameters to list all files | |
fields = 'files(id, name)' | |
pageSize = 1000 # Set an appropriate page size to retrieve all files | |
try: | |
# List all files | |
results = drive_service.files().list( | |
fields=fields, | |
pageSize=pageSize | |
).execute() | |
files = results.get('files', []) | |
# Delete each file in the list | |
for file in files: | |
delete_file(file['id']) | |
except Exception as e: | |
print(f"Error deleting files: {e}") | |
raise e |