Files
db_example_app/main.py
T
2026-04-20 15:43:41 +03:00

381 lines
13 KiB
Python

import os
import sys
import psycopg2
from dotenv import load_dotenv
from influxdb_client_3 import InfluxDBClient3
from neo4j import GraphDatabase
from psycopg2 import sql
from psycopg2.errors import DuplicateDatabase
from PyQt5.QtCore import QSize, Qt
from PyQt5.QtWidgets import QApplication, QGridLayout, QHeaderView, QMainWindow, QTableWidget, QTableWidgetItem, QWidget
load_dotenv()
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_PASS = os.getenv("POSTGRES_PASS")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
POSTGRES_ACTIVE = os.getenv("POSTGRES_ACTIVE", "false").lower() == "true"
NEO_ACTIVE = os.getenv("NEO_ACTIVE", "false").lower() == "true"
NEO_USER = os.getenv("NEO_USER")
NEO_PASS = os.getenv("NEO_PASS")
NEO_HOST = os.getenv("NEO_HOST")
AGE_ACTIVE = os.getenv("AGE_ACTIVE", "false").lower() == "true"
AGE_USER = os.getenv("AGE_USER", POSTGRES_USER)
AGE_PASS = os.getenv("AGE_PASS", POSTGRES_PASS)
AGE_HOST = os.getenv("AGE_HOST", POSTGRES_HOST)
AGE_PORT = os.getenv("AGE_PORT", POSTGRES_PORT)
AGE_DB = os.getenv("AGE_DB", POSTGRES_DB)
AGE_GRAPH_NAME = os.getenv("AGE_GRAPH_NAME", "movie_graph")
INFLUX_ACTIVE = os.getenv("INFLUX_ACTIVE", "false").lower() == "true"
INFLUXDB3_HOST = os.getenv("INFLUXDB3_HOST", "http://localhost:8181")
INFLUXDB3_AUTH_TOKEN = os.getenv("INFLUXDB3_AUTH_TOKEN")
INFLUXDB3_DATABASE = os.getenv("INFLUXDB3_DATABASE", "sensors")
USER_SEED_DATA = [
(1, "Ivan", 15),
(2, "Igor", 22),
(3, "Alex", 16),
(4, "Anna", 40),
(5, "Inna", 30),
]
def connect_postgres(dbname):
conn = psycopg2.connect(
dbname=dbname,
user=POSTGRES_USER,
password=POSTGRES_PASS,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
conn.autocommit = True
return conn
def create_database(admin_conn, database_name):
try:
with admin_conn.cursor() as cur:
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(database_name)))
print(f"Database '{database_name}' created.")
except DuplicateDatabase:
print(f"Database '{database_name}' already exists.")
def create_users_table(conn):
try:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY,
name VARCHAR(255),
age INT
)
"""
)
print("Users table is ready.")
except (psycopg2.DatabaseError, Exception) as error:
print(error)
def select_users(conn):
try:
with conn.cursor() as cur:
cur.execute("SELECT id, name, age FROM users ORDER BY id")
return cur.fetchall()
except (psycopg2.DatabaseError, Exception) as error:
print(error)
return []
def insert_users_data(conn, values):
try:
existing_users = select_users(conn)
if existing_users:
print("Users are already seeded.")
return
with conn.cursor() as cur:
args = ",".join(cur.mogrify("(%s,%s,%s)", value).decode("utf-8") for value in values)
cur.execute("INSERT INTO users (id, name, age) VALUES " + args)
print("Users inserted.")
except (psycopg2.DatabaseError, Exception) as error:
print(error)
def fetch_neo_movies():
uri = NEO_HOST
auth = (NEO_USER, NEO_PASS)
with GraphDatabase.driver(uri, auth=auth) as driver:
records, _, _ = driver.execute_query(
"""
MATCH (movie:Movie)
WHERE movie.released >= 1990 AND movie.released < 2000
RETURN movie.title AS title
ORDER BY title
"""
)
return [record["title"] for record in records]
def create_age_movies(cur):
cur.execute("CREATE EXTENSION IF NOT EXISTS age;")
cur.execute("LOAD 'age';")
cur.execute('SET search_path = ag_catalog, "$user", public;')
cur.execute("SELECT 1 FROM ag_catalog.ag_graph WHERE name = %s", (AGE_GRAPH_NAME,))
if cur.fetchone() is None:
cur.execute("SELECT * FROM ag_catalog.create_graph(%s)", (AGE_GRAPH_NAME,))
cur.execute(
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (seeded agtype)").format(
sql.Literal(AGE_GRAPH_NAME),
sql.SQL('''
MERGE (matrix:Movie {title: 'The Matrix', released: 1999})
MERGE (apollo:Movie {title: 'Apollo 13', released: 1995})
MERGE (toy_story:Movie {title: 'Toy Story', released: 1995})
RETURN matrix.title AS seeded
'''),
)
)
def select_age_movies(cur):
cur.execute(
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (title agtype)").format(
sql.Literal(AGE_GRAPH_NAME),
sql.SQL('''
MATCH (movie:Movie)
WHERE movie.released >= 1990 AND movie.released < 2000
RETURN movie.title AS title
ORDER BY title
'''),
)
)
return [str(row[0]).strip('"') for row in cur.fetchall()]
def connect_influx():
return InfluxDBClient3(
host=INFLUXDB3_HOST,
token=INFLUXDB3_AUTH_TOKEN,
database=INFLUXDB3_DATABASE,
)
def insert_influx_sensor_data(client):
client.write(
"""temperature,location=room1,sensor_id=s01 value=22.5
temperature,location=room2,sensor_id=s02 value=24.1
temperature,location=room1,sensor_id=s01 value=23.0
humidity,location=room1,sensor_id=s01 value=55.2
humidity,location=room2,sensor_id=s02 value=60.8"""
)
def select_influx_sensor_data(client):
table = client.query(
query="""
SELECT measurement, location, sensor_id, value, CAST(time AS STRING) AS time
FROM (
SELECT 'temperature' AS measurement, location, sensor_id, value, time FROM temperature
UNION ALL
SELECT 'humidity' AS measurement, location, sensor_id, value, time FROM humidity
)
ORDER BY time, measurement
""",
language="sql",
mode="all",
)
rows = table.to_pylist()
if not rows:
return ["measurement", "location", "sensor_id", "value", "time"], []
headers = list(rows[0].keys())
values = [tuple(row.get(header) for header in headers) for row in rows]
return headers, values
def load_users():
admin_conn = connect_postgres("postgres")
try:
create_database(admin_conn, POSTGRES_DB)
finally:
admin_conn.close()
app_conn = connect_postgres(POSTGRES_DB)
try:
create_users_table(app_conn)
insert_users_data(app_conn, USER_SEED_DATA)
return select_users(app_conn)
finally:
app_conn.close()
class MainWindow(QMainWindow):
def __init__(self, postgres_active):
super().__init__()
self.setMinimumSize(QSize(640, 480))
self.setWindowTitle("postgres" if postgres_active else "postgres (disabled)")
central_widget = QWidget(self)
self.setCentralWidget(central_widget)
grid_layout = QGridLayout()
central_widget.setLayout(grid_layout)
self.table = QTableWidget(self)
self.table.setColumnCount(3)
self.table.setRowCount(0)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.table.setHorizontalHeaderLabels(["ID", "Name", "Age"])
self.table.horizontalHeaderItem(0).setToolTip("ID")
self.table.horizontalHeaderItem(1).setToolTip("Name")
self.table.horizontalHeaderItem(2).setToolTip("Age")
self.table.horizontalHeaderItem(0).setTextAlignment(Qt.AlignHCenter)
self.table.horizontalHeaderItem(1).setTextAlignment(Qt.AlignHCenter)
self.table.horizontalHeaderItem(2).setTextAlignment(Qt.AlignHCenter)
grid_layout.addWidget(self.table, 0, 0)
def load_data(self, users_data):
self.table.setRowCount(len(users_data))
for row_number, user in enumerate(users_data):
self.table.setItem(row_number, 0, QTableWidgetItem(str(user[0])))
self.table.setItem(row_number, 1, QTableWidgetItem(str(user[1])))
self.table.setItem(row_number, 2, QTableWidgetItem(str(user[2])))
self.table.resizeColumnsToContents()
class DataWindow(QWidget):
def __init__(self, title, headers, parent_window, window_index=0):
super().__init__()
self.setWindowTitle(title)
self.parent_window = parent_window
self.window_index = window_index
grid_layout = QGridLayout(self)
self.setLayout(grid_layout)
self.table = QTableWidget(self)
self.table.setColumnCount(len(headers))
self.table.setRowCount(0)
self.table.setHorizontalHeaderLabels(headers)
for index, header in enumerate(headers):
self.table.horizontalHeaderItem(index).setToolTip(header)
self.table.horizontalHeaderItem(index).setTextAlignment(Qt.AlignHCenter)
grid_layout.addWidget(self.table, 0, 0)
self.position_relative_to_parent()
def position_relative_to_parent(self):
parent = self.parent_window.frameGeometry()
screen = QApplication.primaryScreen().availableGeometry()
self.move(
max(screen.x(), min(parent.right() + 6 + self.window_index * (self.width() + 5), screen.right() - self.width() - 10)),
max(screen.y(), min(parent.y(), screen.bottom() - self.height() - 10)),
)
def load_data(self, rows):
self.table.setRowCount(len(rows))
for row_number, row in enumerate(rows):
values = row if isinstance(row, (list, tuple)) else [row]
for column_number, value in enumerate(values):
self.table.setItem(row_number, column_number, QTableWidgetItem(str(value)))
self.table.resizeColumnsToContents()
self.table.resizeRowsToContents()
margins = self.layout().contentsMargins()
width = (
self.table.verticalHeader().width()
+ sum(self.table.columnWidth(column) for column in range(self.table.columnCount()))
+ self.table.frameWidth() * 2
+ margins.left()
+ margins.right()
+ 24
)
height = (
self.table.horizontalHeader().height()
+ sum(self.table.rowHeight(row) for row in range(self.table.rowCount()))
+ self.table.frameWidth() * 2
+ margins.top()
+ margins.bottom()
+ 24
)
screen_geometry = QApplication.primaryScreen().availableGeometry()
max_width = max(screen_geometry.width() - 40, 220)
max_height = max(screen_geometry.height() - 40, 120)
self.resize(min(max(width, 220), max_width), min(max(height, 120), max_height))
self.position_relative_to_parent()
if __name__ == "__main__":
users = []
if POSTGRES_ACTIVE:
try:
users = load_users()
except Exception as error:
print(f"PostgreSQL load failed: {error}")
app = QApplication(sys.argv)
main_window = MainWindow(POSTGRES_ACTIVE)
main_window.load_data(users)
main_window.show()
graph_windows = []
if NEO_ACTIVE:
try:
neo_window = DataWindow("neo4j", ["Info"], main_window, len(graph_windows))
neo_window.load_data([(title,) for title in fetch_neo_movies()])
neo_window.show()
graph_windows.append(neo_window)
except Exception as error:
print(f"Neo4j load failed: {error}")
if AGE_ACTIVE:
try:
with psycopg2.connect(
dbname=AGE_DB,
user=AGE_USER,
password=AGE_PASS,
host=AGE_HOST,
port=AGE_PORT,
) as age_conn:
age_conn.autocommit = True
with age_conn.cursor() as cur:
create_age_movies(cur)
age_movies = select_age_movies(cur)
age_window = DataWindow("apache age", ["Info"], main_window, len(graph_windows))
age_window.load_data([(title,) for title in age_movies])
age_window.show()
graph_windows.append(age_window)
except Exception as error:
print(f"Apache AGE load failed: {error}")
if INFLUX_ACTIVE:
try:
with connect_influx() as influx_client:
insert_influx_sensor_data(influx_client)
influx_headers, influx_rows = select_influx_sensor_data(influx_client)
influx_window = DataWindow("influxdb 3 core", influx_headers, main_window, len(graph_windows))
influx_window.load_data(influx_rows)
influx_window.show()
graph_windows.append(influx_window)
except Exception as error:
print(f"InfluxDB 3 Core load failed: {error}")
sys.exit(app.exec())