File size: 5,714 Bytes
709c473 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import random
import sqlite3
from typing import Generator, Literal
from models import Element, Pair, PendingPair
def connect() -> sqlite3.Connection:
return sqlite3.connect("db/cache.sqlite")
with connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS element (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
name TEXT UNIQUE,
emoji TEXT
)
""",
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pair (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
first_element_id INTEGER,
second_element_id INTEGER,
result_element_id INTEGER,
is_discovery INTEGER,
FOREIGN KEY (first_element_id) REFERENCES element (id),
FOREIGN KEY (second_element_id) REFERENCES element (id),
FOREIGN KEY (result_element_id) REFERENCES element (id)
UNIQUE(first_element_id, second_element_id)
)
""",
)
def _upsert_element(conn: sqlite3.Connection, element: Element) -> None:
conn.execute(
"""
INSERT INTO element (name, emoji)
VALUES (?, ?)
ON CONFLICT(name) DO UPDATE SET
emoji = excluded.emoji
""",
(element.name, element.emoji),
)
(element.database_id,) = conn.execute(
"SELECT id FROM element WHERE name = ?",
(element.name,),
).fetchone()
def _upsert_pair(conn: sqlite3.Connection, pair: Pair) -> None:
# first, insert the elements:
for element in pair.elements:
if element.database_id is not None:
continue
_upsert_element(conn, element)
# now, record the pair:
conn.execute(
"""
INSERT INTO pair (first_element_id, second_element_id, result_element_id, is_discovery)
VALUES (?, ?, ?, ?)
ON CONFLICT(first_element_id, second_element_id) DO UPDATE SET
result_element_id = excluded.result_element_id,
is_discovery = MAX(is_discovery, excluded.is_discovery)
""",
(*(e.database_id for e in pair.elements), 1 if pair.is_discovery else 0),
)
def record_pair(pair: Pair) -> None:
with connect() as conn:
_upsert_pair(conn, pair)
PendingPairOrder = Literal[
"first.id ASC, second.id ASC",
"first.id ASC, second.id DESC",
"first.id DESC, second.id ASC",
"first.id DESC, second.id DESC"
]
PENDING_PAIR_ORDERS: list[PendingPairOrder] = [
"first.id DESC, second.id ASC",
"first.id ASC, second.id ASC",
"first.id ASC, second.id DESC",
]
def _select_pending_pairs(
conn: sqlite3.Connection,
order: PendingPairOrder = PENDING_PAIR_ORDERS[0],
) -> Generator[PendingPair, None, None]:
result = conn.execute(
f"""
SELECT
first.id,
first.name,
first.emoji,
second.id,
second.name,
second.emoji
FROM element AS first
LEFT JOIN element AS second ON first.name <= second.name
LEFT JOIN pair ON pair.first_element_id = first.id AND pair.second_element_id = second.id
WHERE pair.id IS NULL
ORDER BY {order}
""",
)
for row in result:
first_id, first_name, first_emoji, second_id, second_name, second_emoji = row
yield PendingPair(
Element(first_name, first_emoji, first_id),
Element(second_name, second_emoji, second_id),
)
def select_pending_pairs(order: PendingPairOrder) -> Generator[PendingPair, None, None]:
with connect() as conn:
yield from _select_pending_pairs(conn, order)
def _element_count(conn: sqlite3.Connection) -> int:
(count,) = conn.execute("SELECT COUNT(*) FROM element").fetchone()
return count
def _pair_count(conn: sqlite3.Connection) -> int:
(count,) = conn.execute("SELECT COUNT(*) FROM pair").fetchone()
return count
def counts() -> tuple[int, int]:
with connect() as conn:
return _element_count(conn), _pair_count(conn)
def _select_elements_and_discovered(
conn: sqlite3.Connection,
) -> Generator[tuple[Element, bool], None, None]:
result = conn.execute(
"""
SELECT
e.name,
e.emoji,
e.id,
MAX(p.result_element_id IS NOT NULL) AS is_discovery
FROM element e
LEFT JOIN pair p
ON p.result_element_id = e.id
AND p.is_discovery = TRUE
GROUP BY e.name, e.emoji, e.id
ORDER BY e.id ASC
""",
)
for row in result:
*e, is_discovery = row
yield Element(*e), is_discovery
def select_elements_and_discovered() -> Generator[tuple[Element, bool], None, None]:
with connect() as conn:
return _select_elements_and_discovered(conn)
with connect() as conn:
primary_elements = [
Element("Fire", "\N{FIRE}"),
Element("Earth", "\N{EARTH GLOBE EUROPE-AFRICA}"),
Element("Water", "\N{DROPLET}"),
Element("Wind", "\N{WIND BLOWING FACE}\N{VARIATION SELECTOR-16}"),
]
# The search order is "mostly deterministic" on the macroscopic scale
# so randomize the order of the primary elements so that everyone who runs
# this code gets one of 4! (factorial) possible "macroscopic routes"
random.shuffle(primary_elements)
for e in primary_elements:
_upsert_element(conn, e)
|