import os import requests from googleapiclient.discovery import build from google.oauth2 import service_account from googleapiclient.http import MediaFileUpload import io ################################ ######### Variables ############ ################################ # -- Get environment variables CLIENT_ID = os.getenv('CLIENT_ID') CLIENT_EMAIL = os.getenv('CLIENT_EMAIL') PRIVATE_KEY_ID = os.getenv('PRIVATE_KEY_ID') PRIVATE_KEY = os.getenv('PRIVATE_KEY').replace('\\n', '\n') PROJECT_ID = os.getenv("PROJECT_ID") CLIENT_X509_CERT_URL = os.getenv("CLIENT_X509_CERT_URL") # -- Define your OAuth2 credentials directly JSON_DATA = { "type": "service_account", "project_id": PROJECT_ID, "private_key_id": PRIVATE_KEY_ID, "private_key": PRIVATE_KEY, "client_email": CLIENT_EMAIL, "client_id": CLIENT_ID, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": CLIENT_X509_CERT_URL, "universe_domain": "googleapis.com" } ################################ ####### GenericFunctions ####### ################################ # -- Authentication def get_drive_service(): """ Authenticate and return the Google Drive API service. """ # Build and return the Drive service credentials = service_account.Credentials.from_service_account_info( JSON_DATA, scopes=['https://www.googleapis.com/auth/drive'] ) service = build('drive', 'v3', credentials=credentials) return service # -- List all files def list_all_files(): """ List all file IDs and names in Google Drive. """ # Build the Drive service drive_service = get_drive_service() try: results = drive_service.files().list(fields="nextPageToken, files(id, name)").execute() files = results.get('files', []) if not files: print("No files found in Google Drive.") else: for file in files: print(f"File ID: {file['id']}, Name: {file['name']}") except Exception as e: print(f"Error listing files: {e}") raise e # -- Get File ID from File Name def get_files_id_by_name(file_names): """ List file IDs for specific file names in Google Drive. """ # Build the Drive service drive_service = get_drive_service() # Set the query parameters fields = 'files(id, name)' pageSize = 1000 # Set an appropriate page size to retrieve all files # List the files matching the query results = drive_service.files().list( fields=fields, pageSize=pageSize ).execute() files = results.get('files', []) return files[0] # -- Create a new file def create_file(file_path): """ Create a new file on Google Drive. """ # Build the Drive service print(file_path) drive_service = get_drive_service() try: file_name = os.path.basename(file_path) media = MediaFileUpload(file_path, mimetype='application/octet-stream') file_metadata = {'name': file_name} file = drive_service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() print(f"Uploaded '{file_name}' with ID: {file['id']}") return file except Exception as e: print(f"Upload error: {e}") raise e # -- Update existing file def update_file(file_path): """ Update an existing file on Google Drive. """ # Build the Drive service drive_service = get_drive_service() try: # get file id file_name = os.path.basename(file_path) file_id = get_files_id_by_name(file_name) file_metadata = { 'name': file_name } # Update the file media_body = MediaFileUpload(file_path, mimetype='application/octet-stream') file = drive_service.files().update( fileId=file_id['id'], body=file_metadata, media_body=media_body ).execute() print(f"Uploaded '{file_name}' with ID: {file['id']}") return file except Exception as e: print(f"Upload error: {e}") raise e # -- Donwload file to local def download_file(file_name, save_path): """ Download Google Drive to local """ # Build the Drive service drive_service = get_drive_service() try: file_id = get_files_id_by_name(file_name) request = drive_service.files().get_media(fileId=file_id['id']) fh = io.FileIO(save_path+file_name, 'wb') # Download the file in chunks and write to the local file downloader = request.execute().decode("utf-8") if 'size' in downloader: file_size = int(downloader['size']) chunk_size = 1024 * 1024 # 1MB chunks (adjust as needed) while downloader: if 'data' in downloader: fh.write(downloader['data'].encode('utf-8')) status, downloader = service.files().get_media(fileId=file_id, downloadStatus=status).execute() print(f"Downloaded {fh.tell()}/{file_size} bytes.") else: fh.write(downloader.encode('utf-8')) print(f"Downloaded file '{file_id}' to '{save_path}'") except Exception as e: print(f"Download error: {e}") raise e # -- Delete a file by its ID def delete_file(file_id): """ Delete a file in Google Drive by its ID. """ # Build the Drive service drive_service = get_drive_service() # Deleting specific file try: drive_service.files().delete(fileId=file_id).execute() print(f"Deleted file with ID: {file_id}") except Exception as e: print(f"Error deleting file with ID {file_id}: {e}") raise e # -- List and delete all files def delete_all_files(): """ List and delete all files in Google Drive. """ # Build the Drive service drive_service = get_drive_service() # Set the query parameters to list all files fields = 'files(id, name)' pageSize = 1000 # Set an appropriate page size to retrieve all files try: # List all files results = drive_service.files().list( fields=fields, pageSize=pageSize ).execute() files = results.get('files', []) # Delete each file in the list for file in files: delete_file(file['id']) except Exception as e: print(f"Error deleting files: {e}") raise e