431 lines
14 KiB
Python
431 lines
14 KiB
Python
import importlib
|
|
import os
|
|
import sys
|
|
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from influxdb_client_3 import InfluxDBClient3
|
|
from neo4j import GraphDatabase
|
|
from psycopg2 import sql
|
|
from psycopg2.errors import DuplicateDatabase
|
|
from PyQt5.QtCore import QSize, Qt
|
|
from PyQt5.QtWidgets import QApplication, QGridLayout, QHeaderView, QMainWindow, QTableWidget, QTableWidgetItem, QWidget
|
|
|
|
MongoClient = importlib.import_module("pymongo").MongoClient
|
|
|
|
load_dotenv()
|
|
|
|
POSTGRES_USER = os.getenv("POSTGRES_USER")
|
|
POSTGRES_DB = os.getenv("POSTGRES_DB")
|
|
POSTGRES_PASS = os.getenv("POSTGRES_PASS")
|
|
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
|
|
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
|
|
POSTGRES_ACTIVE = os.getenv("POSTGRES_ACTIVE", "false").lower() == "true"
|
|
|
|
NEO_ACTIVE = os.getenv("NEO_ACTIVE", "false").lower() == "true"
|
|
NEO_USER = os.getenv("NEO_USER")
|
|
NEO_PASS = os.getenv("NEO_PASS")
|
|
NEO_HOST = os.getenv("NEO_HOST")
|
|
|
|
AGE_ACTIVE = os.getenv("AGE_ACTIVE", "false").lower() == "true"
|
|
AGE_USER = os.getenv("AGE_USER", POSTGRES_USER)
|
|
AGE_PASS = os.getenv("AGE_PASS", POSTGRES_PASS)
|
|
AGE_HOST = os.getenv("AGE_HOST", POSTGRES_HOST)
|
|
AGE_PORT = os.getenv("AGE_PORT", POSTGRES_PORT)
|
|
AGE_DB = os.getenv("AGE_DB", POSTGRES_DB)
|
|
AGE_GRAPH_NAME = os.getenv("AGE_GRAPH_NAME", "movie_graph")
|
|
|
|
INFLUX_ACTIVE = os.getenv("INFLUX_ACTIVE", "false").lower() == "true"
|
|
INFLUXDB3_HOST = os.getenv("INFLUXDB3_HOST", "http://localhost:8181")
|
|
INFLUXDB3_AUTH_TOKEN = os.getenv("INFLUXDB3_AUTH_TOKEN")
|
|
INFLUXDB3_DATABASE = os.getenv("INFLUXDB3_DATABASE", "sensors")
|
|
|
|
MONGO_ACTIVE = os.getenv("MONGO_ACTIVE", "false").lower() == "true"
|
|
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
|
|
MONGO_DB = os.getenv("MONGO_DB", "movie_data")
|
|
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "movies")
|
|
|
|
USER_SEED_DATA = [
|
|
(1, "Ivan", 15),
|
|
(2, "Igor", 22),
|
|
(3, "Alex", 16),
|
|
(4, "Anna", 40),
|
|
(5, "Inna", 30),
|
|
]
|
|
|
|
def connect_postgres(dbname):
|
|
conn = psycopg2.connect(
|
|
dbname=dbname,
|
|
user=POSTGRES_USER,
|
|
password=POSTGRES_PASS,
|
|
host=POSTGRES_HOST,
|
|
port=POSTGRES_PORT,
|
|
)
|
|
conn.autocommit = True
|
|
return conn
|
|
|
|
|
|
def create_database(admin_conn, database_name):
|
|
try:
|
|
with admin_conn.cursor() as cur:
|
|
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(database_name)))
|
|
print(f"Database '{database_name}' created.")
|
|
except DuplicateDatabase:
|
|
print(f"Database '{database_name}' already exists.")
|
|
|
|
|
|
def create_users_table(conn):
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INT PRIMARY KEY,
|
|
name VARCHAR(255),
|
|
age INT
|
|
)
|
|
"""
|
|
)
|
|
print("Users table is ready.")
|
|
except (psycopg2.DatabaseError, Exception) as error:
|
|
print(error)
|
|
|
|
|
|
def select_users(conn):
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, name, age FROM users ORDER BY id")
|
|
return cur.fetchall()
|
|
except (psycopg2.DatabaseError, Exception) as error:
|
|
print(error)
|
|
return []
|
|
|
|
|
|
def insert_users_data(conn, values):
|
|
try:
|
|
existing_users = select_users(conn)
|
|
if existing_users:
|
|
print("Users are already seeded.")
|
|
return
|
|
|
|
with conn.cursor() as cur:
|
|
args = ",".join(cur.mogrify("(%s,%s,%s)", value).decode("utf-8") for value in values)
|
|
cur.execute("INSERT INTO users (id, name, age) VALUES " + args)
|
|
print("Users inserted.")
|
|
except (psycopg2.DatabaseError, Exception) as error:
|
|
print(error)
|
|
|
|
|
|
def fetch_neo_movies():
|
|
uri = NEO_HOST
|
|
auth = (NEO_USER, NEO_PASS)
|
|
|
|
with GraphDatabase.driver(uri, auth=auth) as driver:
|
|
records, _, _ = driver.execute_query(
|
|
"""
|
|
MATCH (movie:Movie)
|
|
WHERE movie.released >= 1990 AND movie.released < 2000
|
|
RETURN movie.title AS title
|
|
ORDER BY title
|
|
"""
|
|
)
|
|
return [record["title"] for record in records]
|
|
|
|
|
|
def create_age_movies(cur):
|
|
cur.execute("CREATE EXTENSION IF NOT EXISTS age;")
|
|
cur.execute("LOAD 'age';")
|
|
cur.execute('SET search_path = ag_catalog, "$user", public;')
|
|
|
|
cur.execute("SELECT 1 FROM ag_catalog.ag_graph WHERE name = %s", (AGE_GRAPH_NAME,))
|
|
if cur.fetchone() is None:
|
|
cur.execute("SELECT * FROM ag_catalog.create_graph(%s)", (AGE_GRAPH_NAME,))
|
|
|
|
cur.execute(
|
|
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (seeded agtype)").format(
|
|
sql.Literal(AGE_GRAPH_NAME),
|
|
sql.SQL('''
|
|
MERGE (matrix:Movie {title: 'The Matrix', released: 1999})
|
|
MERGE (apollo:Movie {title: 'Apollo 13', released: 1995})
|
|
MERGE (toy_story:Movie {title: 'Toy Story', released: 1995})
|
|
RETURN matrix.title AS seeded
|
|
'''),
|
|
)
|
|
)
|
|
|
|
|
|
def select_age_movies(cur):
|
|
cur.execute(
|
|
sql.SQL("SELECT * FROM cypher({}, $$ {} $$) AS (title agtype)").format(
|
|
sql.Literal(AGE_GRAPH_NAME),
|
|
sql.SQL('''
|
|
MATCH (movie:Movie)
|
|
WHERE movie.released >= 1990 AND movie.released < 2000
|
|
RETURN movie.title AS title
|
|
ORDER BY title
|
|
'''),
|
|
)
|
|
)
|
|
return [str(row[0]).strip('"') for row in cur.fetchall()]
|
|
|
|
|
|
def connect_influx():
|
|
return InfluxDBClient3(
|
|
host=INFLUXDB3_HOST,
|
|
token=INFLUXDB3_AUTH_TOKEN,
|
|
database=INFLUXDB3_DATABASE,
|
|
)
|
|
|
|
|
|
def insert_influx_sensor_data(client):
|
|
client.write(
|
|
"""temperature,location=room1,sensor_id=s01 value=22.5
|
|
temperature,location=room2,sensor_id=s02 value=24.1
|
|
temperature,location=room1,sensor_id=s01 value=23.0
|
|
humidity,location=room1,sensor_id=s01 value=55.2
|
|
humidity,location=room2,sensor_id=s02 value=60.8"""
|
|
)
|
|
|
|
|
|
def select_influx_sensor_data(client):
|
|
table = client.query(
|
|
query="""
|
|
SELECT measurement, location, sensor_id, value, CAST(time AS STRING) AS time
|
|
FROM (
|
|
SELECT 'temperature' AS measurement, location, sensor_id, value, time FROM temperature
|
|
UNION ALL
|
|
SELECT 'humidity' AS measurement, location, sensor_id, value, time FROM humidity
|
|
)
|
|
ORDER BY time, measurement
|
|
""",
|
|
language="sql",
|
|
mode="all",
|
|
)
|
|
rows = table.to_pylist()
|
|
|
|
if not rows:
|
|
return ["measurement", "location", "sensor_id", "value", "time"], []
|
|
|
|
headers = list(rows[0].keys())
|
|
values = [tuple(row.get(header) for header in headers) for row in rows]
|
|
return headers, values
|
|
|
|
|
|
def connect_mongo():
|
|
return MongoClient(MONGO_URI)
|
|
|
|
|
|
def insert_mongo_movies(collection):
|
|
if collection.count_documents({}) > 0:
|
|
print("MongoDB collection is already seeded.")
|
|
return
|
|
|
|
collection.insert_many(
|
|
[
|
|
{"title": "Apollo 13", "released": 1995},
|
|
{"title": "The Matrix", "released": 1999},
|
|
{"title": "Toy Story", "released": 1995},
|
|
{"title": "Alien", "released": 1979},
|
|
]
|
|
)
|
|
print("MongoDB movies inserted.")
|
|
|
|
|
|
def select_mongo_movies(collection):
|
|
cursor = collection.find(
|
|
{"released": {"$gte": 1990, "$lt": 2000}},
|
|
{"_id": 0, "title": 1},
|
|
).sort("title", 1)
|
|
return [document["title"] for document in cursor]
|
|
|
|
|
|
def load_users():
|
|
admin_conn = connect_postgres("postgres")
|
|
try:
|
|
create_database(admin_conn, POSTGRES_DB)
|
|
finally:
|
|
admin_conn.close()
|
|
|
|
app_conn = connect_postgres(POSTGRES_DB)
|
|
try:
|
|
create_users_table(app_conn)
|
|
insert_users_data(app_conn, USER_SEED_DATA)
|
|
return select_users(app_conn)
|
|
finally:
|
|
app_conn.close()
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
def __init__(self, postgres_active):
|
|
super().__init__()
|
|
|
|
self.setMinimumSize(QSize(640, 480))
|
|
self.setWindowTitle("postgres" if postgres_active else "postgres (disabled)")
|
|
|
|
central_widget = QWidget(self)
|
|
self.setCentralWidget(central_widget)
|
|
|
|
grid_layout = QGridLayout()
|
|
central_widget.setLayout(grid_layout)
|
|
|
|
self.table = QTableWidget(self)
|
|
self.table.setColumnCount(3)
|
|
self.table.setRowCount(0)
|
|
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
|
|
self.table.setHorizontalHeaderLabels(["ID", "Name", "Age"])
|
|
self.table.horizontalHeaderItem(0).setToolTip("ID")
|
|
self.table.horizontalHeaderItem(1).setToolTip("Name")
|
|
self.table.horizontalHeaderItem(2).setToolTip("Age")
|
|
self.table.horizontalHeaderItem(0).setTextAlignment(Qt.AlignHCenter)
|
|
self.table.horizontalHeaderItem(1).setTextAlignment(Qt.AlignHCenter)
|
|
self.table.horizontalHeaderItem(2).setTextAlignment(Qt.AlignHCenter)
|
|
|
|
grid_layout.addWidget(self.table, 0, 0)
|
|
|
|
def load_data(self, users_data):
|
|
self.table.setRowCount(len(users_data))
|
|
for row_number, user in enumerate(users_data):
|
|
self.table.setItem(row_number, 0, QTableWidgetItem(str(user[0])))
|
|
self.table.setItem(row_number, 1, QTableWidgetItem(str(user[1])))
|
|
self.table.setItem(row_number, 2, QTableWidgetItem(str(user[2])))
|
|
self.table.resizeColumnsToContents()
|
|
|
|
|
|
class DataWindow(QWidget):
|
|
def __init__(self, title, headers, parent_window, window_index=0):
|
|
super().__init__()
|
|
|
|
self.setWindowTitle(title)
|
|
self.parent_window = parent_window
|
|
self.window_index = window_index
|
|
|
|
grid_layout = QGridLayout(self)
|
|
self.setLayout(grid_layout)
|
|
|
|
self.table = QTableWidget(self)
|
|
self.table.setColumnCount(len(headers))
|
|
self.table.setRowCount(0)
|
|
self.table.setHorizontalHeaderLabels(headers)
|
|
for index, header in enumerate(headers):
|
|
self.table.horizontalHeaderItem(index).setToolTip(header)
|
|
self.table.horizontalHeaderItem(index).setTextAlignment(Qt.AlignHCenter)
|
|
|
|
grid_layout.addWidget(self.table, 0, 0)
|
|
self.position_relative_to_parent()
|
|
|
|
def position_relative_to_parent(self):
|
|
parent = self.parent_window.frameGeometry()
|
|
screen = QApplication.primaryScreen().availableGeometry()
|
|
self.move(
|
|
max(screen.x(), min(parent.right() + 6 + self.window_index * (self.width() + 5), screen.right() - self.width() - 10)),
|
|
max(screen.y(), min(parent.y(), screen.bottom() - self.height() - 10)),
|
|
)
|
|
|
|
def load_data(self, rows):
|
|
self.table.setRowCount(len(rows))
|
|
for row_number, row in enumerate(rows):
|
|
values = row if isinstance(row, (list, tuple)) else [row]
|
|
for column_number, value in enumerate(values):
|
|
self.table.setItem(row_number, column_number, QTableWidgetItem(str(value)))
|
|
self.table.resizeColumnsToContents()
|
|
self.table.resizeRowsToContents()
|
|
|
|
margins = self.layout().contentsMargins()
|
|
width = (
|
|
self.table.verticalHeader().width()
|
|
+ sum(self.table.columnWidth(column) for column in range(self.table.columnCount()))
|
|
+ self.table.frameWidth() * 2
|
|
+ margins.left()
|
|
+ margins.right()
|
|
+ 24
|
|
)
|
|
height = (
|
|
self.table.horizontalHeader().height()
|
|
+ sum(self.table.rowHeight(row) for row in range(self.table.rowCount()))
|
|
+ self.table.frameWidth() * 2
|
|
+ margins.top()
|
|
+ margins.bottom()
|
|
+ 24
|
|
)
|
|
|
|
screen_geometry = QApplication.primaryScreen().availableGeometry()
|
|
max_width = max(screen_geometry.width() - 40, 220)
|
|
max_height = max(screen_geometry.height() - 40, 120)
|
|
self.resize(min(max(width, 220), max_width), min(max(height, 120), max_height))
|
|
self.position_relative_to_parent()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
users = []
|
|
if POSTGRES_ACTIVE:
|
|
try:
|
|
users = load_users()
|
|
except Exception as error:
|
|
print(f"PostgreSQL load failed: {error}")
|
|
|
|
app = QApplication(sys.argv)
|
|
main_window = MainWindow(POSTGRES_ACTIVE)
|
|
main_window.load_data(users)
|
|
main_window.show()
|
|
|
|
graph_windows = []
|
|
|
|
if NEO_ACTIVE:
|
|
try:
|
|
neo_window = DataWindow("neo4j", ["Info"], main_window, len(graph_windows))
|
|
neo_window.load_data([(title,) for title in fetch_neo_movies()])
|
|
neo_window.show()
|
|
graph_windows.append(neo_window)
|
|
except Exception as error:
|
|
print(f"Neo4j load failed: {error}")
|
|
|
|
if AGE_ACTIVE:
|
|
try:
|
|
with psycopg2.connect(
|
|
dbname=AGE_DB,
|
|
user=AGE_USER,
|
|
password=AGE_PASS,
|
|
host=AGE_HOST,
|
|
port=AGE_PORT,
|
|
) as age_conn:
|
|
age_conn.autocommit = True
|
|
|
|
with age_conn.cursor() as cur:
|
|
create_age_movies(cur)
|
|
age_movies = select_age_movies(cur)
|
|
|
|
age_window = DataWindow("apache age", ["Info"], main_window, len(graph_windows))
|
|
age_window.load_data([(title,) for title in age_movies])
|
|
age_window.show()
|
|
graph_windows.append(age_window)
|
|
except Exception as error:
|
|
print(f"Apache AGE load failed: {error}")
|
|
|
|
if INFLUX_ACTIVE:
|
|
try:
|
|
with connect_influx() as influx_client:
|
|
insert_influx_sensor_data(influx_client)
|
|
influx_headers, influx_rows = select_influx_sensor_data(influx_client)
|
|
|
|
influx_window = DataWindow("influxdb 3 core", influx_headers, main_window, len(graph_windows))
|
|
influx_window.load_data(influx_rows)
|
|
influx_window.show()
|
|
graph_windows.append(influx_window)
|
|
except Exception as error:
|
|
print(f"InfluxDB 3 Core load failed: {error}")
|
|
|
|
if MONGO_ACTIVE:
|
|
try:
|
|
with connect_mongo() as mongo_client:
|
|
movies_collection = mongo_client[MONGO_DB][MONGO_COLLECTION]
|
|
insert_mongo_movies(movies_collection)
|
|
mongo_movies = select_mongo_movies(movies_collection)
|
|
|
|
mongo_window = DataWindow("mongodb", ["Info"], main_window, len(graph_windows))
|
|
mongo_window.load_data([(title,) for title in mongo_movies])
|
|
mongo_window.show()
|
|
graph_windows.append(mongo_window)
|
|
except Exception as error:
|
|
print(f"MongoDB load failed: {error}")
|
|
|
|
sys.exit(app.exec()) |