Spaces:
Runtime error
Runtime error
| import os | |
| from io import StringIO | |
| from tempfile import mkdtemp, mkstemp | |
| import pandas as pd | |
| import tree_sitter | |
| from IPython.display import HTML | |
| from metakernel import MetaKernel | |
| import subprocess | |
| from .codeql import QueryClient | |
| __version__ = "0.0.1" | |
| class CodeQLKernel(MetaKernel): | |
| implementation = "CodeQL Kernel" | |
| implementation_version = "1.0" | |
| language = "ql" | |
| language_version = "0.1" | |
| banner = "CodeQL Kernel - Experimental" | |
| language_info = { | |
| "mimetype": "text/x-codeql", | |
| "name": "codeql", | |
| "file_extension": ".ql", | |
| "help_links": MetaKernel.help_links, | |
| } | |
| def __init__(self, **kwargs): | |
| # get absolute path of running script | |
| here = os.path.dirname(os.path.abspath(__file__)) | |
| self.QL_LANGUAGE = tree_sitter.Language( | |
| os.path.join(here, "tree-sitter-ql.so"), "ql" | |
| ) | |
| self._select_query = self.QL_LANGUAGE.query( | |
| "(moduleMember (select)) @select_statement" | |
| ) | |
| self._predicate_query = self.QL_LANGUAGE.query( | |
| """(moduleMember | |
| (annotation name: (annotName) @aname (#eq? @aname "query")). | |
| (classlessPredicate name: (predicateName) @pname) | |
| ) @annotated_query """ | |
| ) | |
| self._parser = tree_sitter.Parser() | |
| self._parser.set_language(self.QL_LANGUAGE) | |
| self._context = "" | |
| def on_progress(obj): | |
| self.Display(obj["message"], clear_output=True) | |
| def on_result(obj): | |
| self.Display( | |
| f"Query completed in {obj['evaluationTime']}!", clear_output=True | |
| ) | |
| self._query_client: QueryClient = QueryClient( | |
| on_progress=on_progress, on_result=on_result | |
| ) | |
| MetaKernel.__init__(self, **kwargs) | |
| print(kwargs) | |
| def get_usage(self): | |
| return "This is the CodeQL kernel." | |
| def parse_cell(self, cell): | |
| """ | |
| parse the cell code using tree-sitter | |
| """ | |
| tree = self._parser.parse(bytes(cell, "utf8")) | |
| select_statements = [] | |
| query_predicates = [] | |
| captures = self._select_query.captures(tree.root_node) | |
| for capture in captures: | |
| # capture[0] is the node, capture[1] is the capture name | |
| if capture[1] == "select_statement": | |
| start_point = capture[0].start_point | |
| end_point = capture[0].end_point | |
| select_statements.append((start_point, end_point)) | |
| captures = self._predicate_query.captures(tree.root_node) | |
| for capture in captures: | |
| # capture[0] is the node | |
| # capture[1] is the capture name | |
| if capture[1] == "annotated_query": | |
| start_point = capture[0].start_point | |
| end_point = capture[0].end_point | |
| # extract the annotation name | |
| # check if its a query predicate | |
| for i, line in enumerate(cell.split("\n")): | |
| if i == start_point[0]: | |
| if (line[start_point[1]: start_point[1] + len("query")] == "query"): | |
| query_predicates.append((start_point, end_point)) | |
| return (select_statements, query_predicates) | |
| def evaluate(self, code, quick_eval=None): | |
| """ | |
| Evaluate the given code and return the result. | |
| """ | |
| try: | |
| if not self._query_client._db_metadata: | |
| self.Error_display("No database registered! Use %set_database to register a database.") | |
| return | |
| # create a temporary directory to hold the query pack and the query | |
| qlpack = "\n".join( | |
| [ | |
| "---", | |
| "library: false", | |
| "name: jupyter-kernel/temporary-qlpack", | |
| "version: 0.0.1", | |
| "dependencies:", | |
| " codeql/{}-all: '*'", | |
| "", | |
| ] | |
| ).format(self._query_client._db_metadata["languages"][0]) | |
| tmp_dir = mkdtemp(dir="/tmp", prefix="codeql_kernel") | |
| with open(os.path.join(tmp_dir, "qlpack.yml"), "w") as f: | |
| f.write(qlpack) | |
| subprocess.run("codeql pack install", cwd=tmp_dir, shell=True) | |
| fd, query_path = mkstemp(suffix=".ql", dir=tmp_dir, text=True) | |
| os.write(fd, bytearray(code, "utf-8")) | |
| os.close(fd) | |
| self.Display("Running query ...", clear_output=True) | |
| (err, resp) = self._query_client.run_query( | |
| query_path, quick_eval=quick_eval | |
| ) | |
| if err: | |
| self.clear_output(wait=True) | |
| self.Error_display( | |
| "Error running query: {}".format(err) | |
| ) | |
| else: | |
| csv = StringIO(resp) | |
| chunks = (chunk for chunk in pd.read_csv(csv, chunksize=5000)) | |
| df = pd.concat(chunks) | |
| self.Display(HTML(df.to_html()), clear_output=True) | |
| except Exception as e: | |
| self.Error_display("Error running query: {}".format(e)) | |
| def do_execute_direct(self, code): | |
| """ | |
| Execute the given code directly. | |
| """ | |
| (select_statements, query_predicates) = self.parse_cell(code) | |
| if len(query_predicates) == 1 and len(select_statements) == 0: | |
| # we have exactly one query predicate: | |
| # add cell to the context and evaluate the query predicate | |
| offset = len(self._context.split("\n")) | |
| self._context += code + "\n" | |
| predicate = query_predicates[0][0] | |
| pred_line = predicate[0] | |
| pred_col = predicate[1] | |
| cell_lines = code.split("\n") | |
| words = cell_lines[pred_line].strip().split(" ") | |
| position = { | |
| "startLine": offset + pred_line, | |
| "endLine": offset + pred_line, | |
| "startColumn": pred_col + len(words[0]) + len(words[1]) + 3, | |
| "endColumn": pred_col + len(words[0]) + len(words[1]) + 3, | |
| } | |
| self.Display("Evaluating predicate '" + words[2].split("(")[0] + "'", clear_output=True) | |
| self.evaluate(self._context, quick_eval=position) | |
| elif len(select_statements) == 1: | |
| # we have exactly one select statement: | |
| # add cell to the context and evaluate the whole context | |
| self._context += code + "\n" | |
| self.Display("Evaluating select statement ...", clear_output=True) | |
| self.evaluate(self._context) | |
| else: | |
| self._context += code + "\n" | |
| def repr(self, data): | |
| return repr(data) | |
| def do_shutdown(self, restart): | |
| if self._query_client: | |
| self._query_client.stop() | |
| if restart: | |
| self.Print("Restarting kernel...") | |
| self.reload_magics() | |
| self.restart_kernel() | |
| self.Print("Done!") | |
| super(CodeQLKernel, self).do_shutdown(restart) | |
| if __name__ == "__main__": | |
| CodeQLKernel.run_as_main() | |