from ibmcloudant.cloudant_v1 import CloudantV1, Document from ibm_cloud_sdk_core.authenticators import IAMAuthenticator def cloudant_retrieve_documents( client: CloudantV1, db_name: str, selectors: dict, fields, sort: dict = None, limit: int = 100, ): """Retrieve documents from the cloudant database.""" cloudant = client sort_by = sort or { "sort": [ {"_id": "desc"}, ] } fields_to_retrieve = ( {"fields": fields} if isinstance(fields, list) else fields ) if not isinstance(fields, (list, dict)): raise ValueError("fields must be a list of strings or a dict") retrieved_docs = cloudant.post_find( db=db_name, selector=selectors, fields=fields_to_retrieve["fields"], sort=sort, limit=limit, ).get_result() return retrieved_docs def ensure_database_exists(client: CloudantV1, db_name: str) -> bool: """ Check if database exists and create it if it doesn't. Args: client (CloudantV1): Initialized Cloudant client db_name (str): Database name to check/create Returns: bool: True if database exists or was created successfully """ try: # Try to get database info - will raise error if doesn't exist client.get_database_information(db=db_name) return True except Exception: try: # Database doesn't exist, try to create it client.put_database(db=db_name) return True except Exception as e: print(f"Failed to create database {db_name}: {str(e)}") raise