Nymbo commited on
Commit
a83aca4
·
verified ·
1 Parent(s): fb48c87

Update Modules/Obsidian_Vault.py

Browse files
Files changed (1) hide show
  1. Modules/Obsidian_Vault.py +15 -310
Modules/Obsidian_Vault.py CHANGED
@@ -1,16 +1,13 @@
1
  from __future__ import annotations
2
 
3
- import json
4
  import os
5
- import re
6
- import stat
7
- from datetime import datetime
8
  from typing import Annotated, Optional
9
 
10
  import gradio as gr
11
 
12
  from app import _log_call_end, _log_call_start, _truncate_for_log
13
  from ._docstrings import autodoc
 
14
 
15
 
16
  TOOL_SUMMARY = (
@@ -41,314 +38,22 @@ HELP_TEXT = (
41
  )
42
 
43
 
44
- def _default_root() -> str:
45
- env_root = os.getenv("OBSIDIAN_VAULT_ROOT")
46
- if env_root and env_root.strip():
47
- return os.path.abspath(os.path.expanduser(env_root.strip()))
48
- try:
49
- here = os.path.abspath(__file__)
50
- tools_dir = os.path.dirname(os.path.dirname(here))
51
- return os.path.abspath(os.path.join(tools_dir, "Obsidian"))
52
- except Exception:
53
- return os.path.abspath(os.getcwd())
54
-
55
-
56
- ROOT_DIR = _default_root()
57
- try:
58
- os.makedirs(ROOT_DIR, exist_ok=True)
59
- except Exception:
60
- pass
61
- ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0")))
62
-
63
-
64
- def _safe_err(exc: Exception | str) -> str:
65
- """Return an error string with any absolute root replaced by '/' and slashes normalized."""
66
- s = str(exc)
67
- s_norm = s.replace("\\", "/")
68
- root_fwd = ROOT_DIR.replace("\\", "/")
69
- root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)}
70
- for variant in root_variants:
71
- if variant:
72
- s_norm = s_norm.replace(variant, "/")
73
- s_norm = re.sub(r"/+", "/", s_norm)
74
- return s_norm
75
-
76
-
77
- def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str:
78
- payload = {
79
- "status": "error",
80
- "code": code,
81
- "message": message,
82
- "root": "/",
83
- }
84
- if path:
85
- payload["path"] = path
86
- if hint:
87
- payload["hint"] = hint
88
- if data:
89
- payload["data"] = data
90
- return json.dumps(payload, ensure_ascii=False)
91
-
92
-
93
- def _display_path(abs_path: str) -> str:
94
- try:
95
- norm_root = os.path.normpath(ROOT_DIR)
96
- norm_abs = os.path.normpath(abs_path)
97
- common = os.path.commonpath([norm_root, norm_abs])
98
- if os.path.normcase(common) == os.path.normcase(norm_root):
99
- rel = os.path.relpath(norm_abs, norm_root)
100
- if rel == ".":
101
- return "/"
102
- return "/" + rel.replace("\\", "/")
103
- except Exception:
104
- pass
105
- return abs_path.replace("\\", "/")
106
 
107
 
 
108
  def _resolve_path(path: str) -> tuple[str, str]:
109
- try:
110
- user_input = (path or "/").strip() or "/"
111
- if user_input.startswith("/"):
112
- rel_part = user_input.lstrip("/") or "."
113
- raw = os.path.expanduser(rel_part)
114
- treat_as_relative = True
115
- else:
116
- raw = os.path.expanduser(user_input)
117
- treat_as_relative = False
118
-
119
- if not treat_as_relative and os.path.isabs(raw):
120
- if not ALLOW_ABS:
121
- return "", _err(
122
- "absolute_path_disabled",
123
- "Absolute paths are disabled in safe mode.",
124
- path=raw.replace("\\", "/"),
125
- hint="Use a path relative to / (e.g., /Notes/index.md).",
126
- )
127
- abs_path = os.path.abspath(raw)
128
- else:
129
- abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw))
130
- if not ALLOW_ABS:
131
- try:
132
- common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)])
133
- except Exception:
134
- root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
135
- abs_cmp = os.path.normcase(os.path.normpath(abs_path))
136
- if not abs_cmp.startswith(root_cmp):
137
- return "", _err(
138
- "path_outside_root",
139
- "Path not allowed outside root.",
140
- path=user_input.replace("\\", "/"),
141
- hint="Use a path under / (the vault root).",
142
- )
143
- else:
144
- root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
145
- common_cmp = os.path.normcase(os.path.normpath(common))
146
- if common_cmp != root_cmp:
147
- return "", _err(
148
- "path_outside_root",
149
- "Path not allowed outside root.",
150
- path=user_input.replace("\\", "/"),
151
- hint="Use a path under / (the vault root).",
152
- )
153
- return abs_path, ""
154
- except Exception as exc:
155
- return "", _err(
156
- "resolve_path_failed",
157
- "Failed to resolve path.",
158
- path=(path or ""),
159
- data={"error": _safe_err(exc)},
160
- )
161
-
162
-
163
- def _fmt_size(num_bytes: int) -> str:
164
- units = ["B", "KB", "MB", "GB", "TB"]
165
- size = float(num_bytes)
166
- for unit in units:
167
- if size < 1024.0:
168
- return f"{size:.1f} {unit}"
169
- size /= 1024.0
170
- return f"{size:.1f} PB"
171
-
172
-
173
- def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str:
174
- from ._tree_utils import format_dir_listing
175
-
176
- return format_dir_listing(
177
- abs_path,
178
- _display_path(abs_path),
179
- show_hidden=show_hidden,
180
- recursive=recursive,
181
- max_entries=max_entries,
182
- fmt_size_fn=_fmt_size,
183
- )
184
-
185
-
186
- def _search_text(
187
- abs_path: str,
188
- query: str,
189
- *,
190
- recursive: bool,
191
- show_hidden: bool,
192
- max_results: int,
193
- case_sensitive: bool,
194
- start_index: int,
195
- ) -> str:
196
- if not os.path.exists(abs_path):
197
- return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
198
-
199
- query = query or ""
200
- normalized_query = query if case_sensitive else query.lower()
201
- if normalized_query == "":
202
- return _err(
203
- "missing_search_query",
204
- "Search query is required for the search action.",
205
- hint="Provide text in the Search field to look for.",
206
- )
207
-
208
- max_results = max(1, int(max_results) if max_results is not None else 20)
209
- start_index = max(0, int(start_index) if start_index is not None else 0)
210
- matches: list[tuple[str, int, str]] = []
211
- errors: list[str] = []
212
- files_scanned = 0
213
- truncated = False
214
- total_matches = 0
215
-
216
- def _should_skip(name: str) -> bool:
217
- return not show_hidden and name.startswith('.')
218
-
219
- def _handle_match(file_path: str, line_no: int, line_text: str) -> bool:
220
- nonlocal truncated, total_matches
221
- total_matches += 1
222
- if total_matches <= start_index:
223
- return False
224
- if len(matches) < max_results:
225
- snippet = line_text.strip()
226
- if len(snippet) > 200:
227
- snippet = snippet[:197] + "…"
228
- matches.append((_display_path(file_path), line_no, snippet))
229
- return False
230
- truncated = True
231
- return True
232
-
233
- def _search_file(file_path: str) -> bool:
234
- nonlocal files_scanned
235
- files_scanned += 1
236
- try:
237
- with open(file_path, 'r', encoding='utf-8', errors='replace') as handle:
238
- for line_no, line in enumerate(handle, start=1):
239
- haystack = line if case_sensitive else line.lower()
240
- if normalized_query in haystack:
241
- if _handle_match(file_path, line_no, line):
242
- return True
243
- except Exception as exc:
244
- errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})")
245
- return truncated
246
-
247
- if os.path.isfile(abs_path):
248
- _search_file(abs_path)
249
- else:
250
- for root, dirs, files in os.walk(abs_path):
251
- dirs[:] = [d for d in dirs if not _should_skip(d)]
252
- visible_files = [f for f in files if show_hidden or not f.startswith('.')]
253
- for name in visible_files:
254
- file_path = os.path.join(root, name)
255
- if _search_file(file_path):
256
- break
257
- if truncated:
258
- break
259
- if not recursive:
260
- break
261
-
262
- header_lines = [
263
- f"Search results for {query!r}",
264
- f"Scope: {_display_path(abs_path)}",
265
- f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}",
266
- f"Start offset: {start_index}",
267
- f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""),
268
- f"Files scanned: {files_scanned}",
269
- ]
270
-
271
- next_cursor = start_index + len(matches) if truncated else None
272
 
273
- if truncated:
274
- header_lines.append(f"Matches encountered before truncation: {total_matches}")
275
- header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.")
276
- header_lines.append(f"Next cursor: {next_cursor}")
277
- else:
278
- header_lines.append(f"Total matches found: {total_matches}")
279
- header_lines.append("Truncated: no — end of results.")
280
- header_lines.append("Next cursor: None")
281
 
282
- if not matches:
283
- if total_matches > 0 and start_index >= total_matches:
284
- hint_limit = max(total_matches - 1, 0)
285
- body_lines = [
286
- f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.",
287
- (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""),
288
- ]
289
- body_lines = [line for line in body_lines if line]
290
- else:
291
- body_lines = [
292
- "No matches found.",
293
- (f"Total matches encountered: {total_matches}." if total_matches else ""),
294
- ]
295
- body_lines = [line for line in body_lines if line]
296
- else:
297
- body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)]
298
-
299
- if errors:
300
- shown = errors[:5]
301
- body_lines.extend(["", "Warnings:"])
302
- body_lines.extend(shown)
303
- if len(errors) > len(shown):
304
- body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.")
305
-
306
- return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines)
307
-
308
-
309
- def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str:
310
- if not os.path.exists(abs_path):
311
- return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path))
312
- if os.path.isdir(abs_path):
313
- return _err(
314
- "is_directory",
315
- f"Path is a directory, not a file: {_display_path(abs_path)}",
316
- path=_display_path(abs_path),
317
- hint="Provide a file path.",
318
- )
319
- try:
320
- with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
321
- data = f.read()
322
- except Exception as exc:
323
- return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
324
- total = len(data)
325
- start = max(0, min(offset, total))
326
- end = total if max_chars <= 0 else min(total, start + max_chars)
327
- chunk = data[start:end]
328
- next_cursor = end if end < total else None
329
- header = (
330
- f"Reading {_display_path(abs_path)}\n"
331
- f"Offset {start}, returned {len(chunk)} of {total}."
332
- + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "")
333
- )
334
- return header + "\n\n---\n\n" + chunk
335
 
336
 
337
- def _info(abs_path: str) -> str:
338
- try:
339
- st = os.stat(abs_path)
340
- except Exception as exc:
341
- return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
342
- info = {
343
- "path": _display_path(abs_path),
344
- "type": "directory" if stat.S_ISDIR(st.st_mode) else "file",
345
- "size": st.st_size,
346
- "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'),
347
- "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'),
348
- "mode": oct(st.st_mode),
349
- "root": "/",
350
- }
351
- return json.dumps(info, indent=2)
352
 
353
 
354
  @autodoc(summary=TOOL_SUMMARY)
@@ -400,9 +105,9 @@ def Obsidian_Vault(
400
  if not os.path.exists(abs_path):
401
  result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
402
  else:
403
- result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries)
404
  elif action == "read":
405
- result = _read_file(abs_path, offset=offset, max_chars=max_chars)
406
  elif action == "search":
407
  query_text = query or ""
408
  if query_text.strip() == "":
@@ -412,7 +117,7 @@ def Obsidian_Vault(
412
  hint="Provide text in the Search field to look for.",
413
  )
414
  else:
415
- result = _search_text(
416
  abs_path,
417
  query_text,
418
  recursive=recursive,
@@ -422,9 +127,9 @@ def Obsidian_Vault(
422
  start_index=offset,
423
  )
424
  else: # info
425
- result = _info(abs_path)
426
  except Exception as exc:
427
- result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)})
428
 
429
  _log_call_end("Obsidian_Vault", _truncate_for_log(result))
430
  return result
 
1
  from __future__ import annotations
2
 
 
3
  import os
 
 
 
4
  from typing import Annotated, Optional
5
 
6
  import gradio as gr
7
 
8
  from app import _log_call_end, _log_call_start, _truncate_for_log
9
  from ._docstrings import autodoc
10
+ from ._core import obsidian_sandbox, OBSIDIAN_ROOT
11
 
12
 
13
  TOOL_SUMMARY = (
 
38
  )
39
 
40
 
41
+ # Use the pre-configured Obsidian sandbox
42
+ _sandbox = obsidian_sandbox
43
+ ROOT_DIR = OBSIDIAN_ROOT
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
 
45
 
46
+ # Convenience wrappers
47
  def _resolve_path(path: str) -> tuple[str, str]:
48
+ return _sandbox.resolve_path(path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
 
 
 
 
 
 
 
 
50
 
51
+ def _display_path(abs_path: str) -> str:
52
+ return _sandbox.display_path(abs_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
 
54
 
55
+ def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str:
56
+ return _sandbox.err(code, message, path=path, hint=hint, data=data)
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
 
59
  @autodoc(summary=TOOL_SUMMARY)
 
105
  if not os.path.exists(abs_path):
106
  result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
107
  else:
108
+ result = _sandbox.list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries)
109
  elif action == "read":
110
+ result = _sandbox.read_file(abs_path, offset=offset, max_chars=max_chars)
111
  elif action == "search":
112
  query_text = query or ""
113
  if query_text.strip() == "":
 
117
  hint="Provide text in the Search field to look for.",
118
  )
119
  else:
120
+ result = _sandbox.search_text(
121
  abs_path,
122
  query_text,
123
  recursive=recursive,
 
127
  start_index=offset,
128
  )
129
  else: # info
130
+ result = _sandbox.info(abs_path)
131
  except Exception as exc:
132
+ result = _err("exception", "Unhandled error during operation.", data={"error": _sandbox.safe_err(exc)})
133
 
134
  _log_call_end("Obsidian_Vault", _truncate_for_log(result))
135
  return result