Scalable Data Architecture from Streaming to Visualization

This tutorial outlines a comprehensive approach to constructing a scalable data architecture on UCloud, aimed at enabling real-time data analysis and visualization. It integrates NGINX for efficient data routing, Apache Spark for immediate data processing, PostgreSQL for reliable data storage, and Streamlit for interactive data visualization. The guide facilitates seamless integration of these technologies, ensuring smooth data flow from ingestion to actionable insights.

Architecture Overview: Scalable Data Pipeline

drawing

This sophisticated system architecture incorporates a series of robust technologies, each serving a pivotal role in the data processing and visualization pipeline. The primary components include:

  • NGINX Gateway: Acts as the frontline for data ingestion, efficiently routing incoming data streams to the processing layer. NGINX's robust capabilities ensure that data is securely and swiftly directed from external sources to the Apache Spark application for real-time processing.

  • Apache Spark: Positioned at the heart of this architecture, Apache Spark processes incoming data streams in real-time. Leveraging its in-memory computing capabilities, Spark performs complex analytics, including but not limited to anomaly detection, aggregation, and filtering, transforming raw data into structured insights.

  • PostgreSQL Database: Serving as the reliable data storage solution, PostgreSQL stores processed data, ensuring data persistence and integrity. This robust relational database system enables structured data storage, query optimization, and secure data access, forming the backbone of data warehousing within the architecture.

  • Streamlit Dashboard: The final layer of the architecture, it offers an interactive platform for data visualization, enabling end-users to explore processed data through intuitive interfaces. By pulling data from the PostgreSQL database, Streamlit dashboards can render dynamic visualizations, facilitating real-time data-driven decision-making.

  • Data Source: At the inception of the data pipeline lies a simulated data source, specifically designed to generate energy consumption data. This generator mimics real-world energy usage patterns, producing time-series data that reflects varying energy consumption levels over time.

This data architecture leverages UCloud scalable infrastructure and services, providing a cohesive environment for deploying and managing the integrated technologies. Through the integration of NGINX, Apache Spark, PostgreSQL, and Streamlit, this system architecture facilitates a streamlined pathway for data from its point of entry, through processing and storage, culminating in actionable visual insights.

PostgreSQL Setup: Configuring the Data Warehouse

Start by setting up your PostgreSQL database, preparing it to store data efficiently.

Database initialization

Launch a new PostgreSQL instance on UCloud. In this example we use the default settings for database name, user and password.

From the job terminal window access the PostgreSQL console:

$ psql

Tip

psql (16.2 (Debian 16.2-1.pgdg120+2))
Type "help" for help.

ucloud=#

Create a new table specifically designed to store detected anomalies:

CREATE TABLE anomalies (
    id SERIAL PRIMARY KEY,
    timestamp TIMESTAMP NOT NULL,
    energy_usage DOUBLE PRECISION NOT NULL,
    anomaly BOOLEAN NOT NULL
);

Spark Streaming Analysis: Real-time data processing

Set up a Spark standlone cluster on UCloud to handle the real-time processing of incoming data streams. Set the following optional parameters:

  • Number of nodes: 1

  • Connect to other jobs: Select the Job ID of the running PostgresSQL instance and specify a hostname.

Below we will assume postgres-server as the PostgresSQL hostname.

Establishing the data ingestion pathway

Open a terminal window on the Spark cluster master node, change to the imported Spark application directory, and initialize a TCP echo server to facilitate data ingestion from NGINX into Spark's processing environment.

# tcp_echo_server.py
import socket
import threading

def client_handler(client_socket, all_clients):
    try:
        while True:
            # Receive data from one client
            data = client_socket.recv(1024)
            if not data:
                break  # Client disconnected

            # Echo the received data to all connected clients (including Spark)
            for c in all_clients:
                try:
                    c.send(data)
                except:
                    pass
    finally:
        client_socket.close()
        all_clients.remove(client_socket)
        print("Client disconnected")

def start_echo_server(host='0.0.0.0', port=9999):
    all_clients = []
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Echo server listening on {host}:{port}")

    try:
        while True:
            client_sock, addr = server_socket.accept()
            print(f"New connection from {addr}")
            all_clients.append(client_sock)
            # Handle each client connection in a new thread
            thread = threading.Thread(target=client_handler, args=(client_sock, all_clients))
            thread.start()
    finally:
        server_socket.close()

if __name__ == "__main__":
    start_echo_server()

By default the server listen to port 9999.

Ensure the server is active in the background to maintain a continuous data flow:

$ nohup python tcp_echo_server.py &

Spark application configuration

Develop and deploy a Spark application designed to analyze streaming data, identify anomalies, and persist these findings into PostgreSQL.

# app.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

# PostgreSQL connection details
url = "jdbc:postgresql://postgres-server/ucloud"
properties = {
    "user": "ucloud",
    "password": "ucloud",
    "driver": "org.postgresql.Driver"
}

table_name = "anomalies"

# Write the streaming DataFrame with anomalies to PostgreSQL using foreachBatch
def write_to_postgres(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", properties["user"]) \
        .option("password", properties["password"]) \
        .option("driver", properties["driver"]) \
        .mode("append") \
        .save()

if __name__ == "__main__":
    spark = SparkSession.builder.master("spark://node1:7077").appName("StreamingDetectionAnomaly").getOrCreate()

    # Define the schema of the incoming JSON data
    schema = StructType([
        StructField("timestamp", StringType(), True),
        StructField("energy_usage", DoubleType(), True),
    ])

    # Create DataFrame representing the stream of input lines from connection via the NGINX gateway server
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

    # Parse the lines of data into a DataFrame, and cast timestamp to TimestampType
    jsonData = lines.select(from_json(col("value"), schema).alias("data")).select("data.*") \
        .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

    # Perform anomaly detection, here considering energy usage > 100 as an anomaly
    anomalies = jsonData.withColumn("anomaly", expr("energy_usage > 100"))

    # Write the anomalies to PostgreSQL
    query = anomalies.writeStream \
        .foreachBatch(write_to_postgres) \
        .outputMode("append") \
        .start()

    query.awaitTermination()

Execute the application, integrating PostgreSQL for data storage:

$ spark-submit --packages org.postgresql:postgresql:42.2.7 app.py

Hint

To keep the app running after closing the terminal window, run the command above in a screen or tmux session.

The StreamingDetectionAnomaly application is now accessible directly from the Spark UI:


drawing

NGINX Integration: Gateway for Data Streaming

Configure NGINX to act as a pivotal data routing mechanism, directing streams efficiently to the Spark analysis layer.

Start a new NGINX web server on UCloud with the optional parameters:

  • Connect to other jobs: Select the Job ID of the running Spark Cluster instance and specify a hostname.

  • Attach public IP addresses to your application: Add a public IP which will be used to route TCP requests from the data generator to the Spark application.

In the following we will use the configuration parameters:

  • Spark server hostname: spark-app-server

  • Public IP: 130.225.164.136

Stream directive setup

Adjust NGINX's settings (/etc/nginx/nginx.conf) to ensure a seamless stream direction towards the Spark application.

worker_processes  auto;

error_log  /var/log/nginx/error.log notice;
pid        /var/run/nginx.pid;

events {
    worker_connections 1024;
}

stream {
    upstream spark_backend {
        server spark-app-server:9999;  # Replace with Spark app's host and port
    }

    server {
        listen 8080;           # Listen on IPv4
        listen [::]:8080;      # Listen on IPv6
        proxy_pass spark_backend;
    }
}

Apply and verify these changes to initiate the data routing process:

$ nginx -t
$ nginx -s reload

Data Generation: Simulating Energy Network Inputs

Emulate an energy network data flow, employing a Python script to generate and direct data towards the NGINX gateway.

Ingest simulated data

Create a simple data generator to simulate ingestion of energy network data and anomalies from a remote server.

# data-simulator.py
import socket
import time
import json
from datetime import datetime
import random

def generate_data():
    """Generate data with a specific structure."""
    current_time = datetime.now()
    energy_usage = random.uniform(50.0, 150.0)  # Random energy usage
    data = {
        "timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S"),
        "energy_usage": energy_usage
    }
    return json.dumps(data)

def send_data(host, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        try:
            sock.connect((host, port))
            print(f"Connected to {host}:{port}")
            while True:
                data = generate_data()
                print(f"Sending: {data}")
                sock.sendall(data.encode('utf-8') + b'\n')
                time.sleep(1)
        except ConnectionRefusedError:
            print(f"Connection to {host}:{port} refused. Make sure the Spark application is running and listening on the port.")
        except BrokenPipeError:
            print("Connection broken. It seems the server closed the connection.")
        finally:
            sock.close()

if __name__ == "__main__":
    nginx_host = '130.225.164.136'  # NGINX server public IP
    nginx_port = 8080  # NGINX server and public IP port
    send_data(nginx_host, nginx_port)

Replace the NGINX public IP with the address allocated in your running job instance.

Open a terminal window on your local host and execute the script, simulating a consistent data stream into the architecture:

$ python3 data-simulator.py

Tip

Connected to 130.225.164.136:8080

Sending: {"timestamp": "2024-03-01 10:49:26", "energy_usage": 96.63672702481784}

Sending: {"timestamp": "2024-03-01 10:49:27", "energy_usage": 52.67580060984939}

...

Verify data processing

Check that Spark application is processing incoming data from the Spark UI:


drawing

Check the database

Verify that anomaly detection data is ingested into the database from the PostgreSQL console:

SELECT * FROM anomalies LIMIT 10;

Tip

id | timestamp           | energy_usage      | anomaly

---+---------------------+-------------------+---------

67 | 2024-03-01 10:49:26 |  96.63672702481784 | f

68 | 2024-03-01 10:49:27 |  52.67580060984939 | f

69 | 2024-03-01 10:49:28 |  93.58373839493248 | f

70 | 2024-03-01 10:49:29 |  84.93459675609593 | f

71 | 2024-03-01 10:49:30 |  93.67996038597265 | f

72 | 2024-03-01 10:49:31 |  99.52943818138002 | f

73 | 2024-03-01 10:49:32 |  97.27706150016957 | f

74 | 2024-03-01 10:49:33 | 135.45167262911912 | t

75 | 2024-03-01 10:49:34 | 126.18114890899919 | t

76 | 2024-03-01 10:49:35 |  98.26561458051634 | f

(10 rows)

Streamlit Dashboard: Visualizing Insights

Implement a Streamlit application running on UCloud to dynamically visualize the processed and stored data, enhancing data interpretability.

Connect to the database

Connect to the running PostgresSQL database instance using the Connect to other jobs optional paramter.

Below we will assume postgres-server as the PostgreSQL hostname.

Install dependencies

Use the Initialization optional parameter to install the Streamlit app dependencies:

# requirements.txt
psycopg2-binary
pandas
plotly

Import app main file

The Streamlit dashboard app is configured to fetch 100 data points every 15 minutes:

# dashboard.py
import streamlit as st
import pandas as pd
import psycopg2
import plotly.graph_objects as go
from datetime import datetime, timedelta

DB_HOST = 'postgres-server'
DB_NAME = 'ucloud'
DB_USER = 'ucloud'
DB_PASS = 'ucloud'

def load_data():
    now = datetime.now()
    time_15_min_ago = now - timedelta(minutes=15)
    formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
    formatted_time_15_min_ago = time_15_min_ago.strftime("%Y-%m-%d %H:%M:%S")

    conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
    sql = f"""
    SELECT * FROM anomalies
    WHERE timestamp BETWEEN '{formatted_time_15_min_ago}' AND '{formatted_now}'
    ORDER BY timestamp DESC
    LIMIT 100;
    """
    df = pd.read_sql_query(sql, conn)
    conn.close()
    return df

st.title('Energy Network Anomaly Detection')

# Button to refresh data
if st.button('Refresh Data'):
    df = load_data()
else:
    df = load_data()  # Load data on initial app load as well

if not df.empty:
    fig = go.Figure()

    # Add line for all points
    fig.add_trace(go.Scatter(x=df['timestamp'], y=df['energy_usage'], mode='lines+markers',
                             name='Energy Consumption',
                             line=dict(color='blue'),
                             showlegend=False,
                             marker=dict(size=10,
                                         color=['red' if anomaly else 'blue' for anomaly in df['anomaly']],
                                         symbol=['diamond' if anomaly else 'circle' for anomaly in df['anomaly']])))

    # Update layout
    fig.update_layout(title='Energy Usage Over Time',
                      xaxis_title='Time', yaxis_title='Energy Usage',
                      legend_title='Legend',
                      legend=dict(itemsizing='constant'))

    # Custom legend
    fig.add_trace(go.Scatter(x=[None], y=[None], mode='markers',
                             name='Anomalies',
                             showlegend=True,
                             marker=dict(color='red', size=12, symbol='diamond')))
    fig.add_trace(go.Scatter(x=[None], y=[None], mode='markers',
                             name='Normal Data',
                             showlegend=True,
                             marker=dict(color='blue', size=10, symbol='circle')))

    st.plotly_chart(fig)
else:
    st.write("No anomalies detected in the latest data.")

The file dashboard.py must be located in the folder imported in the Streamlit app.

Visualize the results

After launching Streamlit, open the app web interface to view the power consumption data and anomalies.

drawing

Conclusion: Insights and Integration

This comprehensive guide underscores the creation and integration of a data architecture capable of handling, processing, and visualizing data in real-time on UCloud. By leveraging NGINX, Apache Spark, PostgreSQL, and Streamlit, you can develop a resilient pipeline that not only processes streaming data efficiently but also provides actionable insights through interactive visualizations.