Files
db_example_app/main.py
T
2026-04-08 16:04:23 +03:00

301 lines
9.5 KiB
Python

import os
import sys
import psycopg2
from dotenv import load_dotenv
from neo4j import GraphDatabase
from psycopg2 import sql
from psycopg2.errors import DuplicateDatabase
from PyQt5.QtCore import QSize, Qt
from PyQt5.QtWidgets import QApplication, QGridLayout, QHeaderView, QMainWindow, QTableWidget, QTableWidgetItem, QWidget
load_dotenv()
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_PASS = os.getenv("POSTGRES_PASS")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
NEO_ACTIVE = os.getenv("NEO_ACTIVE", "false").lower() == "true"
NEO_USER = os.getenv("NEO_USER")
NEO_PASS = os.getenv("NEO_PASS")
NEO_HOST = os.getenv("NEO_HOST")
AGE_ACTIVE = os.getenv("AGE_ACTIVE", "false").lower() == "true"
AGE_USER = os.getenv("AGE_USER", POSTGRES_USER)
AGE_PASS = os.getenv("AGE_PASS", POSTGRES_PASS)
AGE_HOST = os.getenv("AGE_HOST", POSTGRES_HOST)
AGE_PORT = os.getenv("AGE_PORT", POSTGRES_PORT)
AGE_DB = os.getenv("AGE_DB", POSTGRES_DB)
AGE_GRAPH_NAME = os.getenv("AGE_GRAPH_NAME", "movie_graph")
USER_SEED_DATA = [
(1, "Ivan", 15),
(2, "Igor", 22),
(3, "Alex", 16),
(4, "Anna", 40),
(5, "Inna", 30),
]
def connect_postgres(dbname):
conn = psycopg2.connect(
dbname=dbname,
user=POSTGRES_USER,
password=POSTGRES_PASS,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
conn.autocommit = True
return conn
def create_database(admin_conn, database_name):
try:
with admin_conn.cursor() as cur:
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(database_name)))
print(f"Database '{database_name}' created.")
except DuplicateDatabase:
print(f"Database '{database_name}' already exists.")
def create_users_table(conn):
try:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY,
name VARCHAR(255),
age INT
)
"""
)
print("Users table is ready.")
except (psycopg2.DatabaseError, Exception) as error:
print(error)
def select_users(conn):
try:
with conn.cursor() as cur:
cur.execute("SELECT id, name, age FROM users ORDER BY id")
return cur.fetchall()
except (psycopg2.DatabaseError, Exception) as error:
print(error)
return []
def insert_users_data(conn, values):
try:
existing_users = select_users(conn)
if existing_users:
print("Users are already seeded.")
return
with conn.cursor() as cur:
args = ",".join(cur.mogrify("(%s,%s,%s)", value).decode("utf-8") for value in values)
cur.execute("INSERT INTO users (id, name, age) VALUES " + args)
print("Users inserted.")
except (psycopg2.DatabaseError, Exception) as error:
print(error)
def fetch_neo_movies():
uri = NEO_HOST
auth = (NEO_USER, NEO_PASS)
with GraphDatabase.driver(uri, auth=auth) as driver:
records, _, _ = driver.execute_query(
"""
MATCH (movie:Movie)
WHERE movie.released >= 1990 AND movie.released < 2000
RETURN movie.title AS title
ORDER BY title
"""
)
return [record["title"] for record in records]
def create_age_movies(cur):
cur.execute("CREATE EXTENSION IF NOT EXISTS age;")
cur.execute("LOAD 'age';")
cur.execute('SET search_path = ag_catalog, "$user", public;')
cur.execute("SELECT 1 FROM ag_catalog.ag_graph WHERE name = %s", (AGE_GRAPH_NAME,))
if cur.fetchone() is None:
cur.execute("SELECT * FROM ag_catalog.create_graph(%s)", (AGE_GRAPH_NAME,))
cur.execute(
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (seeded agtype)").format(
sql.Literal(AGE_GRAPH_NAME),
sql.SQL('''
MERGE (matrix:Movie {title: 'The Matrix', released: 1999})
MERGE (apollo:Movie {title: 'Apollo 13', released: 1995})
MERGE (toy_story:Movie {title: 'Toy Story', released: 1995})
RETURN matrix.title AS seeded
'''),
)
)
def select_age_movies(cur):
cur.execute(
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (title agtype)").format(
sql.Literal(AGE_GRAPH_NAME),
sql.SQL('''
MATCH (movie:Movie)
WHERE movie.released >= 1990 AND movie.released < 2000
RETURN movie.title AS title
ORDER BY title
'''),
)
)
return [str(row[0]).strip('"') for row in cur.fetchall()]
def load_users():
admin_conn = connect_postgres("postgres")
try:
create_database(admin_conn, POSTGRES_DB)
finally:
admin_conn.close()
app_conn = connect_postgres(POSTGRES_DB)
try:
create_users_table(app_conn)
insert_users_data(app_conn, USER_SEED_DATA)
return select_users(app_conn)
finally:
app_conn.close()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setMinimumSize(QSize(640, 480))
self.setWindowTitle("postgres")
central_widget = QWidget(self)
self.setCentralWidget(central_widget)
grid_layout = QGridLayout(self)
central_widget.setLayout(grid_layout)
self.table = QTableWidget(self)
self.table.setColumnCount(3)
self.table.setRowCount(0)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.table.setHorizontalHeaderLabels(["ID", "Name", "Age"])
self.table.horizontalHeaderItem(0).setToolTip("ID")
self.table.horizontalHeaderItem(1).setToolTip("Name")
self.table.horizontalHeaderItem(2).setToolTip("Age")
self.table.horizontalHeaderItem(0).setTextAlignment(Qt.AlignHCenter)
self.table.horizontalHeaderItem(1).setTextAlignment(Qt.AlignHCenter)
self.table.horizontalHeaderItem(2).setTextAlignment(Qt.AlignHCenter)
grid_layout.addWidget(self.table, 0, 0)
def load_data(self, users_data):
self.table.setRowCount(len(users_data))
for row_number, user in enumerate(users_data):
self.table.setItem(row_number, 0, QTableWidgetItem(str(user[0])))
self.table.setItem(row_number, 1, QTableWidgetItem(str(user[1])))
self.table.setItem(row_number, 2, QTableWidgetItem(str(user[2])))
self.table.resizeColumnsToContents()
class GraphWindow(QWidget):
def __init__(self, title, parent_window):
super().__init__()
self.setWindowTitle(title)
grid_layout = QGridLayout(self)
self.setLayout(grid_layout)
self.table = QTableWidget(self)
self.table.setColumnCount(1)
self.table.setRowCount(0)
self.table.setHorizontalHeaderLabels(["Info"])
self.table.horizontalHeaderItem(0).setToolTip("Info")
self.table.horizontalHeaderItem(0).setTextAlignment(Qt.AlignHCenter)
grid_layout.addWidget(self.table, 0, 0)
self.position_relative_to_parent(parent_window)
def position_relative_to_parent(self, parent_window):
parent_frame = parent_window.frameGeometry()
self.move(parent_frame.x() + parent_frame.width() + 5, parent_frame.y())
def load_data(self, rows):
self.table.setRowCount(len(rows))
for row_number, item in enumerate(rows):
self.table.setItem(row_number, 0, QTableWidgetItem(str(item)))
self.table.resizeColumnsToContents()
self.table.resizeRowsToContents()
margins = self.layout().contentsMargins()
width = (
self.table.verticalHeader().width()
+ self.table.columnWidth(0)
+ self.table.frameWidth() * 2
+ margins.left()
+ margins.right()
+ 24
)
height = (
self.table.horizontalHeader().height()
+ sum(self.table.rowHeight(row) for row in range(self.table.rowCount()))
+ self.table.frameWidth() * 2
+ margins.top()
+ margins.bottom()
+ 24
)
self.resize(min(max(width, 220), 700), min(max(height, 120), 500))
if __name__ == "__main__":
users = load_users()
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.load_data(users)
main_window.show()
graph_windows = []
if NEO_ACTIVE:
try:
neo_window = GraphWindow("neo4j", main_window)
neo_window.load_data(fetch_neo_movies())
neo_window.show()
graph_windows.append(neo_window)
except Exception as error:
print(f"Neo4j load failed: {error}")
if AGE_ACTIVE:
try:
with psycopg2.connect(
dbname=AGE_DB,
user=AGE_USER,
password=AGE_PASS,
host=AGE_HOST,
port=AGE_PORT,
) as age_conn:
age_conn.autocommit = True
with age_conn.cursor() as cur:
create_age_movies(cur)
age_movies = select_age_movies(cur)
age_window = GraphWindow("apache age", main_window)
age_window.load_data(age_movies)
age_window.show()
graph_windows.append(age_window)
except Exception as error:
print(f"Apache AGE load failed: {error}")
sys.exit(app.exec())