From Reddit to Insights: Building an AI-Powered Data Pipeline with Gemini (On-Prem)¶
Introduction¶
Purpose
In this blog post, I document the process of building an AI-driven, on-premises data pipeline to automate this task. Using Google’s Gemini AI, the pipeline collects, processes, and synthesizes discussions from AI-related subreddits into structured daily reports. The system is designed to filter out irrelevant or harmful content, ensuring the extracted insights are both meaningful and actionable.
Check out the project GitHub repository for the full code and detailed documentation and Web Application.
Problem Statement
The field of artificial intelligence and machine learning evolves at an unprecedented pace, with new breakthroughs, trends, and discussions emerging daily. Platforms like Reddit host vibrant AI-focused communities that are rich with valuable insights. However, manually monitoring multiple subreddits to extract meaningful information is both time-consuming and inefficient.
What You’ll Learn¶
- Designing a robust data pipeline orchestrated with Airflow: How to automate and orchestrate complex data workflows.
- Optimizing data flow with PostgreSQL and dbt: Best practices for structuring and managing collected data efficiently.
- Applying AI for sentiment analysis and summarization: Techniques to extract concise insights from unstructured Reddit discussions.
- Utilizing Google’s Gemini AI for content analysis: Insights into leveraging advanced AI models to categorize and interpret data.
System Architecture: Modular and Scalable Design¶
Our pipeline is designed with modularity and scalability in mind, comprising six main layers. Below is a high-level overview of how the components interact:
The diagram above illustrates the flow of data through our system, from collection to presentation. Each layer has specific responsibilities and communicates with adjacent layers through well-defined interfaces.
1. Data Collection Layer¶
- Reddit API Integration:
- Fetch posts and comments from AI-focused subreddits.
- Text Preprocessing and Cleaning:
- Separate processing pipelines for posts and comments.
- Remove special characters and formatting.
- Content Validation and Filtering:
- Ensure only relevant and high-quality data is processed.
- Rate Limiting and Error Handling:
- Manage API limits.
- DBT Transformations:
- Stage posts and comments for further processing.
- Clean and standardize data structures.
- Manage processing state for each pipeline run.
- Automate cleanup upon pipeline completion.
2. Storage Layer¶
- PostgreSQL Database:
- Structured data storage with the following tables:
- Raw data tables for posts and comments.
- DBT staging tables for processing queues.
- Processed results tables for analysis outputs.
- Structured data storage with the following tables:
- Organized File System:
- Store analysis outputs in a structured and accessible manner.
- Data Versioning and Backup:
- Implement robust strategies for data integrity and recovery.
3. Processing Layer¶
- Text Summarization: Generate concise summaries of discussions.
- Sentiment Analysis: Extract emotional context from posts and comments.
- Gemini AI Integration: Perform comprehensive content analysis using LLMs.
4. Orchestration Layer¶
- Airflow DAGs: Manage pipeline workflows efficiently.
- Scheduled Task Execution: Automate task triggers based on predefined schedules.
- Monitoring and Error Handling: Ensure reliability through automated checks.
5. Observability Layer¶
- Grafana Dashboards: Provide real-time monitoring of pipeline health and performance.
- Prometheus Metrics Collection: Gather and analyze system metrics.
- MLflow: Track machine learning experiments and maintain a model registry.
6. Presentation Layer¶
- React-Based Web Application: Present insights through an interactive user interface.
- Automated Deployment: Streamline updates using GitHub Actions.
- Real-Time Data Synchronization: Provides updates in batches (daily updates) to ensure users receive timely insights.
- Interactive Visualizations: Enable dynamic exploration of AI-related discussions.
This modular design ensures adaptability, maintainability, and scalability, enabling seamless interaction between components and the efficient transformation of Reddit data into actionable insights.
Deep Dive: Key Components¶
Reddit Data Collection and Preprocessing¶
The foundation of our pipeline is reliable data collection and preprocessing. We utilize Python's Reddit API wrapper (PRAW) to fetch posts and comments from specified subreddits, with immediate text preprocessing for clean data storage.
1. Data Collection with PRAW¶
The Reddit client implementation focuses on efficient and reliable data collection. Here's how we handle both posts and their comments:
def fetch_and_save_posts(reddit, subreddit_name, db_utils, conn):
"""Fetches new posts from a subreddit since the last processed timestamp."""
last_processed_utc = get_last_processed_timestamp(conn, subreddit_name)
current_max_utc = 0
subreddit = reddit.subreddit(subreddit_name)
for submission in subreddit.hot(limit=20):
# Skip already processed posts
if submission.created_utc <= last_processed_utc:
continue
# Process post data
post_data = {
"subreddit": subreddit_name,
"post_id": submission.id,
"title": str(submission.title),
"author": str(submission.author),
"url": str(submission.url),
"score": int(submission.score),
"created_utc": submission.created_utc,
"comments": []
}
# Handle comments if they exist
if submission.num_comments > 0:
try:
submission.comments.replace_more(limit=0) # Expand comment tree
post_data["comments"] = [
{
"comment_id": comment.id,
"author": str(comment.author),
"body": str(comment.body),
"created_utc": comment.created_utc,
}
for comment in submission.comments.list()[:10] # Top 10 comments
if comment.created_utc > last_processed_utc
]
except Exception as e:
logging.error(f"Error fetching comments for post {submission.id}: {e}")
# Save to database
db_utils.insert_raw_post_data(conn, post_data)
current_max_utc = max(current_max_utc, submission.created_utc)
The raw data is stored in PostgreSQL with the following schema:
-- Raw posts table schema
CREATE TABLE raw_data.raw_posts (
id SERIAL PRIMARY KEY,
subreddit VARCHAR(50) NOT NULL,
post_id VARCHAR(20) UNIQUE NOT NULL,
title TEXT NOT NULL,
author VARCHAR(100) NOT NULL,
url TEXT,
score INTEGER,
created_utc TIMESTAMP WITH TIME ZONE,
comments JSONB, -- Stores comment data as JSON
processed BOOLEAN DEFAULT FALSE,
inserted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
Our system is designed for efficient and reliable data processing. We use incremental data collection, fetching only new data based on timestamps. To manage the volume of Reddit data, we focus on the top 10 comments per post, allowing us to process approximately 1,000 comments daily in about 50 minutes. This can be easily scaled by adding more compute resources. We store comments in JSONB format, which provides flexibility for handling semi-structured data.
We've implemented robust error handling with retry mechanisms and transaction management to ensure data consistency. Batch processing is used to improve scalability and efficiency when dealing with large datasets. While stream processing could further enhance scalability, we opted for batch processing for this use case.
JSONB storage was chosen for its ability to handle semi-structured data like Reddit comments, which often vary in format and content. By storing comments as JSONB, the system accommodates diverse data structures without rigid schemas, while still allowing efficient querying and indexing for analytics.
2. Data Preprocessing with PySpark¶
To ensure the quality of our analysis, we implement a series of preprocessing steps for Reddit posts using PySpark. The following code snippet demonstrates how we filter and clean post titles:
Posts Processing¶
The following code snippet demonstrates how we parse, filter, and clean posts:
def preprocess_posts(df):
"""Preprocesses Reddit posts with content filtering and cleaning."""
# Content filtering patterns
profanity_pattern = r'(?i)\b(wordword2|word3|word4|word5)\b|' + \
r'(?i)\b(w\*\*d1|w\*rd2|w\*rd3|w\*\*d4)\b|' + \
r'(?i)w[^\w]?r[^\w]?d1|' + \
r'(?i)w[^\w]?r[^\w]?d2'
# Filter and clean posts
return df.filter(
# Basic validation
(F.col("title").isNotNull()) &
(F.col("title") != "") &
(F.col("title") != "[deleted]") &
(F.col("author").isNotNull()) &
(F.col("author") != "[deleted]") &
# Content filtering
~F.col("title").rlike(profanity_pattern)
).withColumn(
# Clean and normalize text
"title", F.regexp_replace("title", r'[\n\r\t]', ' ')
).withColumn(
# Remove multiple spaces
"title", F.regexp_replace("title", r'\s+', ' ')
).dropDuplicates(["post_id"])
title
and author
fields, clean and normalize the text, and remove duplicate posts. This ensures our analysis is based on clean and relevant data. Comments Processing¶
Similar to posts, we also preprocess Reddit comments to ensure data quality. The following code snippet demonstrates how we parse, filter, and clean comments stored in JSONB format:
def preprocess_comments(df):
"""Preprocesses Reddit comments with content filtering and cleaning."""
# Parse and explode comments from JSONB
comments_schema = ArrayType(StructType([
StructField("body", StringType(), True),
StructField("author", StringType(), True),
StructField("comment_id", StringType(), True),
StructField("created_utc", DoubleType(), True)
]))
return df.withColumn(
"comments_parsed",
F.from_json(F.col("comments"), comments_schema)
).withColumn(
"comment", F.explode("comments_parsed")
).select(
"post_id",
F.col("comment.body").alias("body"),
F.col("comment.author").alias("author"),
F.col("comment.comment_id").alias("comment_id"),
F.to_timestamp(F.col("comment.created_utc")).alias("created_utc")
).filter(
# Remove deleted/empty comments
(F.col("body").isNotNull()) &
(F.col("body") != "") &
(F.col("body") != "[deleted]") &
(F.col("author") != "[deleted]")
).withColumn(
# Clean and normalize text
"body", F.regexp_replace("body", r'[\n\r\t]', ' ')
).withColumn(
# Remove multiple spaces
"body", F.regexp_replace("body", r'\s+', ' ')
).dropDuplicates(["comment_id"])
Processed Data Schema¶
The cleaned data is stored in separate tables for posts and comments:
-- Processed posts schema
CREATE TABLE processed_data.posts (
id SERIAL PRIMARY KEY,
subreddit VARCHAR(50) NOT NULL,
post_id VARCHAR(20) UNIQUE NOT NULL,
title TEXT NOT NULL,
author VARCHAR(100) NOT NULL,
url TEXT,
score INTEGER,
created_utc TIMESTAMP WITH TIME ZONE,
processed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Processed comments schema
CREATE TABLE processed_data.comments (
id SERIAL PRIMARY KEY,
post_id VARCHAR(20) REFERENCES processed_data.posts(post_id),
comment_id VARCHAR(20) UNIQUE NOT NULL,
body TEXT NOT NULL,
author VARCHAR(100) NOT NULL,
created_utc TIMESTAMP WITH TIME ZONE,
processed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
);
Daily Summary Processing¶
After preprocessing, we generate daily summaries of the Reddit data using PySpark. The following code snippet demonstrates how we load, filter, and join posts and comments to create these summaries:
def generate_daily_summaries(spark):
"""Generates daily summaries of the processed data using PySpark."""
current_timestamp = datetime.now()
last_processed = get_last_processed_timestamp(spark)
total_summaries_added = 0
for subreddit in SUBREDDITS:
# Load posts and comments tables
posts_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", f"processed_data.posts_{subreddit.lower()}") \
.load()
comments_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", f"processed_data.comments_{subreddit.lower()}") \
.load()
# Filter for new data since last processing
if last_processed:
daily_posts_df = posts_df.filter(
(F.col("created_utc") > last_processed) &
(F.col("created_utc") <= current_timestamp)
daily_comments_df = comments_df.filter(
(F.col("created_utc") > last_processed) &
(F.col("created_utc") <= current_timestamp)
)
else:
daily_posts_df = posts_df.filter(F.col("created_utc") <= current_timestamp)
daily_comments_df = comments_df.filter(F.col("created_utc") <= current_timestamp)
# Join posts and comments
daily_summary_df = daily_posts_df.alias("posts").join(
daily_comments_df.alias("comments"),
"post_id",
"right"
).select(
F.col("subreddit"),
F.col("posts.post_id").alias("post_id"),
F.col("posts.score").alias("post_score"),
F.col("posts.url").alias("post_url"),
F.col("comments.comment_id").alias("comment_id"),
F.to_date(F.col("comments.created_utc")).alias("summary_date"),
F.current_timestamp().alias("processed_date"),
F.col("posts.title").alias("post_content"),
F.col("comments.body").alias("comment_body")
)
# Deduplication and quality filtering
daily_summary_df = daily_summary_df.filter(
(F.col("comment_body").isNotNull()) &
(F.col("comment_id").isNotNull()) &
(F.col("post_id").isNotNull())
)
The daily summary data is stored with the following schema:
CREATE TABLE processed_data.daily_summary_data (
id SERIAL,
subreddit TEXT,
post_id TEXT,
post_score INTEGER,
post_url TEXT,
comment_id TEXT,
summary_date DATE,
processed_date TIMESTAMPTZ,
needs_processing BOOLEAN DEFAULT TRUE,
post_content TEXT,
comment_body TEXT,
PRIMARY KEY (post_id, comment_id)
);
dbt¶
DBT Data Transformations¶
Our data transformations are managed using dbt, which allows us to structure our project with specific models for each stage of processing. This approach provides a clear, modular, and maintainable data pipeline.
# dbt_project.yml configuration
name: 'dbt_reddit_summary_local'
version: '1.0.0'
models:
dbt_reddit_summary_local:
+materialized: table # Default materialization
current_summary_staging:
+materialized: view # Staging view
joined_summary_analysis:
+materialized: table # Final analysis
update_processing_status:
+materialized: incremental
vars:
subreddits: ['dataengineering', 'datascience', 'machinelearning', 'claudeai',
'singularity', 'localllama', 'openai', 'stablediffusion']
The DBT workflow includes three main transformation models:
-
Staging Model (
The current_summary_staging model is a view that selects key fields from the daily summary data, filtering out rows with null comment bodies and those that do not need processing. This view is materialized as a view for real-time filtering.current_summary_staging
): -
Analysis Model (
joined_summary_analysis
):The joined_summary_analysis model joins the staging data with text summary and sentiment analysis results. It then validates the quality of comments and summaries, selecting only valid rows for further analysis. This model also adds processing metadata.WITH validation_cte AS ( SELECT cs.*, ts.comment_summary, sa.sentiment_score, sa.sentiment_label, CASE WHEN LENGTH(cs.comment_body) < 2 THEN 'Too short' WHEN LENGTH(cs.comment_body) > 10000 THEN 'Too long' ELSE 'Valid' END as comment_quality, CASE WHEN ts.comment_summary IS NULL THEN 'Missing' WHEN LENGTH(ts.comment_summary) > LENGTH(cs.comment_body) THEN 'Invalid' ELSE 'Valid' END as summary_quality FROM {{ source('summary_analytics', 'current_summary_staging') }} cs LEFT JOIN {{ source('summary_analytics', 'text_summary_results') }} ts ON cs.comment_id = ts.comment_id LEFT JOIN {{ source('summary_analytics', 'sentiment_analysis_results') }} sa ON cs.comment_id = sa.comment_id ) SELECT *, CURRENT_TIMESTAMP as processed_at, 'dbt' as processed_by FROM validation_cte WHERE comment_quality = 'Valid' AND summary_quality = 'Valid'
-
Processing Status Model (
update_processing_status
):The update_processing_status model is an incremental model that updates the processing status of comments. It selects all records that were attempted to be processed, including those that failed validation, and uses a post-hook to mark them as processed in the source table. This ensures that comments are not reprocessed.{{ config( materialized='incremental', unique_key=['comment_id', 'post_id'], post_hook=[ """ UPDATE {{ source('summary_analytics', 'daily_summary_data') }} SET needs_processing = FALSE WHERE comment_id IN ( SELECT comment_id FROM {{ this }} ); """ ] ) }} -- Select all records that were attempted to be processed, including failed validations WITH validation_cte AS ( SELECT comment_id, comment_body, post_id FROM {{ source('summary_analytics', 'current_summary_staging') }} ), all_processed_records AS ( SELECT cs.comment_id, cs.post_id, v.comment_quality FROM {{ source('summary_analytics', 'current_summary_staging') }} cs LEFT JOIN ( SELECT comment_id, CASE WHEN LENGTH(comment_body) < 2 THEN 'Too short' WHEN LENGTH(comment_body) > 10000 THEN 'Too long' ELSE 'Valid' END as comment_quality FROM validation_cte ) v ON cs.comment_id = v.comment_id ) SELECT comment_id, post_id FROM all_processed_records
DBT Data Quality Tests¶
Our project implements comprehensive data quality tests using DBT's testing framework. These tests are defined in our schema.yml
:
version: 2
sources:
- name: raw_data
schema: raw_data
tables:
- name: raw_dataengineering
columns: &raw_columns
- name: post_id
tests:
- not_null
- unique
- is_text
- name: subreddit
tests:
- not_null
- is_text
- accepted_values:
values: ["dataengineering", "datascience", "machinelearning",
"claudeai", "singularity", "localllama", "openai",
"stablediffusion"]
- name: score
tests:
- not_null
- is_int
- dbt_utils.expression_is_true:
expression: "{{ column_name }} >= 0"
- name: comments
tests:
- not_null
- is_json
Our testing strategy includes generic tests for data presence and type validation, custom tests for specific data structures and rules, relationship tests for data integrity, and data quality metrics to monitor data health. These tests are integrated into our CI/CD pipeline for early detection of issues.
Text Processing Pipeline¶
Our cleaned data is analyzed using transformer models for text summarization and sentiment analysis, with MLflow for experiment tracking.
1. Text Summarization¶
We use a fine-tuned BART model for generating concise summaries of Reddit comments:
def create_summarizer():
"""Initialize the summarization model."""
model_name = "philschmid/bart-large-cnn-samsum"
local_model_path = "/models/models--philschmid--bart-large-cnn-samsum/snapshots/e49b3d60d923f12db22bdd363356f1a4c68532ad"
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
model = AutoModelForSeq2SeqLM.from_pretrained(
local_model_path,
torch_dtype=torch.float32,
device_map="auto",
low_cpu_mem_usage=True
)
return pipeline(
"summarization",
model=model,
tokenizer=tokenizer,
framework="pt"
)
def generate_summary(comment_body, summarizer):
"""Generate summary for a given comment."""
if len(comment_body.split()) <= 59:
return comment_body
try:
summary = summarizer(
comment_body,
max_length=60,
min_length=15,
do_sample=False,
truncation=True
)
if summary and isinstance(summary, list) and len(summary) > 0:
return summary[0]['summary_text']
except Exception as e:
logging.error(f"Summarization failed: {e}")
return ""
We use a fine-tuned BART-large model for dialogue summarization, which outperforms the standard model on Reddit comments. It's optimized for local machine use with efficient memory management.
2. Sentiment Analysis¶
We employ a RoBERTa-based model fine-tuned for emotion detection:
def initialize_emotion_analyzer():
"""Initialize the emotion analysis model."""
model_name = "SamLowe/roberta-base-go_emotions"
local_model_path = "/models/models--SamLowe--roberta-base-go_emotions/snapshots/58b6c5b44a7a12093f782442969019c7e2982299"
model = AutoModelForSequenceClassification.from_pretrained(local_model_path)
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
return pipeline(
"text-classification",
model=model,
tokenizer=tokenizer
)
def perform_sentiment_analysis(text, emotion_analyzer):
"""Perform sentiment analysis on the given text."""
try:
if not text or text.strip() == "":
return "Neutral", 0.0, "No content"
truncated_text = truncate_text(text, max_length=500)
emotion_output = emotion_analyzer(truncated_text)
if not emotion_output:
return "Neutral", 0.0, "Analysis failed"
emotion_label = emotion_output[0]['label']
emotion_score = emotion_output[0]['score']
# Map emotions to sentiment categories
sentiment_mapping = {
'joy': 'Positive',
'love': 'Positive',
'admiration': 'Positive',
'approval': 'Positive',
'excitement': 'Positive',
'gratitude': 'Positive',
'optimism': 'Positive',
'anger': 'Negative',
'disappointment': 'Negative',
'disgust': 'Negative',
'fear': 'Negative',
'sadness': 'Negative',
'confusion': 'Neutral',
'curiosity': 'Neutral',
'surprise': 'Neutral'
}
sentiment_label = sentiment_mapping.get(emotion_label.lower(), 'Neutral')
return sentiment_label, float(emotion_score), emotion_label
except Exception as e:
logging.error(f"Error during sentiment analysis: {str(e)}")
return "Neutral", 0.0, "Error in analysis"
Our sentiment analysis component uses a RoBERTa model fine-tuned for emotion detection, which we found to be more effective than standard sentiment analysis for Reddit comments. Key features include smart text truncation, mapping to sentiment categories (Positive, Negative, or Neutral), and comprehensive error handling.
3. Pipeline Integration¶
Both components are integrated into the data pipeline with MLflow, which we use for experiment tracking, allowing us to monitor model performance and manage different model versions. We also use MLflow for model serving, enabling us to deploy our models.
# MLflow configuration
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("reddit_sentiment_analysis_experiments")
with mlflow.start_run() as run:
# Log parameters
mlflow.log_param("model_name", "SamLowe/roberta-base-go_emotions")
Gemini AI Analysis¶
We use Google's Gemini AI for advanced analysis, generating structured insights from Reddit discussions with a focus on content safety and consistent formatting.
1. Analysis Configuration¶
The Gemini analysis is guided by a prompt template that instructs the model to filter content for safety, rank threads by score, summarize content, analyze emotions, extract top viewpoints, and embed links. The output is structured with a title, date, description, tags, overall ranking, and detailed thread analysis.
def create_prompt_template():
"""Return the standard prompt template for Gemini analysis."""
current_date = datetime.now().strftime('%Y-%m-%d')
return f"""
Analyze the provided text files, which contain Reddit posts and comments.
**Instructions:**
1. **Content Filtering:** Check for harassment, hate speech, or explicit material
2. **Ranking:** Rank threads by total "Score"
3. **Summarization:** Utilize "Summary" fields for thread overviews
4. **Emotional Analysis:** Analyze "Emotion Label" and "Emotion Score"
5. **Point of View Extraction:** Extract top 3 viewpoints per thread
6. **Links:** Embed URLs in thread titles using Markdown
**Output Format:**
---
title: "{{subreddit_name}} subreddit"
date: "{current_date}"
description: "Analysis of top discussions and trends"
tags: ["tag1", "tag2", "tag3"]
---
# Overall Ranking and Top Discussions
[Ranked list of threads with scores and summaries]
# Detailed Analysis by Thread
[Thread-by-thread analysis with summaries, emotions, and viewpoints]
"""
2. Data Processing¶
The system processes data subreddit by subreddit, formatting each post and its comments:
def process_subreddit(model, cur, subreddit, output_dir):
"""Process a single subreddit's data."""
# Fetch processed data
cur.execute("""
SELECT post_id, subreddit, post_score, post_url, comment_id,
summary_date, post_content, comment_body, comment_summary,
sentiment_score, sentiment_label
FROM processed_data.joined_summary_analysis
WHERE subreddit = %s
ORDER BY post_score DESC
""", (subreddit,))
# Format data for analysis
text_files = "".join(
format_text_file(row) for row in cur.fetchall()
)
# Generate insights using Gemini
response = model.generate_content(final_prompt + text_files)
# Save formatted output
output_file_path = os.path.join(output_dir, f"llm_{subreddit}.md")
with open(output_file_path, "w", encoding="utf-8") as f:
f.write(response.text)
3. Output Formatting¶
The system ensures consistent formatting across all analyses:
def get_formatted_subreddit_name(subreddit: str) -> str:
"""Returns a properly formatted subreddit name for display."""
subreddit_formats = {
"claudeai": "ClaudeAI",
"dataengineering": "Data Engineering",
"datascience": "Data Science",
"localllama": "LocalLLaMA",
"machinelearning": "Machine Learning",
"openai": "OpenAI",
"singularity": "Singularity",
"stablediffusion": "Stable Diffusion"
}
return f"{subreddit_formats.get(subreddit.lower(), subreddit)} Subreddit"
4. Pipeline Integration¶
The analysis is integrated into the main pipeline with organized output storage:
def analyze_data():
"""Main function to analyze Reddit data using Gemini."""
# Set up dated output directory
current_date = datetime.now()
output_dir = os.path.join(
'/opt/airflow/results',
current_date.strftime('%Y'),
current_date.strftime('%m'),
current_date.strftime('%d')
)
os.makedirs(output_dir, exist_ok=True)
# Initialize Gemini model
model = genai.GenerativeModel('gemini-2.0-flash-exp')
# Process each subreddit
for subreddit in SUBREDDITS:
process_subreddit(model, cur, subreddit, output_dir)
Web Application¶
Our web application, built with Next.js 15, React 18, TypeScript, and MDX, provides a modern and user-friendly way to explore our insights. The application automatically syncs with our on-premise analysis results and deploys to Vercel using GitHub Actions, with build optimizations for performance.
Automated Deployment¶
The deployment process leverages GitHub Actions:
# .github/workflows/deploy.yml
name: Deploy to Vercel
on:
push:
branches:
- main
pull_request:
branches:
- main
env:
VERCEL_ORG_ID: ${{ secrets.VERCEL_ORG_ID }}
VERCEL_PROJECT_ID: ${{ secrets.VERCEL_PROJECT_ID }}
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Install Vercel CLI
run: npm install --global vercel@latest
- name: Pull Vercel Environment Information
run: vercel pull --yes --environment=production --token=${{ secrets.VERCEL_TOKEN }}
- name: Build Project Artifacts
run: vercel build --prod --token=${{ secrets.VERCEL_TOKEN }}
- name: Deploy Project Artifacts to Vercel
run: vercel deploy --prebuilt --prod --token=${{ secrets.VERCEL_TOKEN }}
Orchestrator Stack¶
Our pipeline is orchestrated using Apache Airflow, configured with a robust setup that ensures reliability, scalability, and observability. The orchestration layer manages the entire data pipeline from ingestion to analysis.
1. Airflow Configuration¶
The Airflow setup is containerized using Docker Compose with several key components:
x-common-env: &common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__METRICS__STATSD_ON: "true"
AIRFLOW__METRICS__STATSD_HOST: "statsd-exporter"
AIRFLOW__METRICS__STATSD_PORT: "9125"
The Airflow services are deployed as separate containers, each with specific responsibilities:
-
Airflow Webserver
Our Airflow webserver provides the web UI for DAG management and monitoring, handles user authentication and authorization, exposes REST API endpoints, and includes health checks to ensure UI availability. -
Airflow Scheduler
The Airflow scheduler monitors and triggers task execution, manages DAG parsing and scheduling, handles task dependencies and queuing, and ensures proper task distribution to workers. This component is crucial for orchestrating our data pipeline. -
Airflow Worker
Our Airflow worker executes the actual tasks, handles ML model inference, manages resource allocation, supports parallel task execution, and is configured for ML workloads with PyTorch and Transformers.airflow-worker: build: context: . dockerfile: Dockerfile.worker volumes: *common-volumes environment: <<: *common-env TRANSFORMERS_OFFLINE: "0" TOKENIZERS_PARALLELISM: "false" PYTORCH_ENABLE_MPS_FALLBACK: "1" command: airflow celery worker healthcheck: test: ["CMD-SHELL", 'celery inspect ping -d "celery@$${HOSTNAME}"'] interval: 30s retries: 5
-
Airflow Init
The Airflow init service initializes the Airflow database, creates an admin user, performs database migrations, and runs only during the initial setup. This component is essential for setting up the Airflow environment.
2. Pipeline Structure¶
Our DAG (reddit_pipeline
) is organized into 26 stages but can be categorized into 5 main sections, each with specific responsibilities and metrics collection:
- Data Collection and Preprocessing Data is ingested and preprocessed.
- DBT Transformations DBT transformations are run to prepare the data for analysis.
-
Model Processing
Our analysis pipeline includes text summarization using BART, sentiment analysis with RoBERTa, and advanced analysis with Gemini AI.summarize_metrics_task = PythonOperator( task_id='parse_summarize_metrics', python_callable=parse_summarize_metrics, provide_context=True ) sentiment_task = PythonOperator( task_id='run_sentiment_analysis', python_callable=sentiment_analysis_process, provide_context=True ) gemini_task = PythonOperator( task_id='run_gemini', python_callable=gemini_analysis_process, dag=dag )
-
Quality Checks
DBT tests are run to ensure the data is valid. - Metrics Collection
- We monitor our pipeline with dedicated metrics tasks, using a StatsD exporter to send real-time data to Prometheus, and MLflow tracking for model performance.
3. Task Dependencies¶
The pipeline follows a clear dependency chain:
ingest_task >> ingest_metrics_task >> \
dbt_test_raw_sources >> dbt_test_raw_metrics_task >> \
dbt_staging_task >> dbt_test_staging_models >> \
summarize_task >> sentiment_task >> \
dbt_join_summary_analysis_task >> \
gemini_task >> push_gemini_results_task
4. Integration Points¶
The orchestrator connects with PostgreSQL for pipeline data storage, local model deployment for model serving, StatsD and Prometheus for monitoring, and GitHub for version control of results. These connections are essential for the functionality of our pipeline
Observability Stack¶
We use Grafana, Prometheus, and MLflow for comprehensive pipeline observability, with each component containerized and integrated through a centralized metrics system.
1. Metrics Collection Architecture¶
The metrics collection is structured in layers:
-
Source Metrics (via StatsD)
-
Prometheus Targets
2. Task-Specific Metrics¶
Each pipeline component has dedicated metrics tracking:
-
Gemini Analysis Metrics
-
Sentiment Analysis Performance
-
Pipeline Processing Volumes
3. Performance Dashboards¶
Our Grafana dashboard is designed to visualize processing efficiency (task durations, volumes, success rates, resource utilization), data quality (DBT test results, validation metrics, processing status, error rates), and provide real-time monitoring.
4. MLflow Integration¶
MLflow's model versioning, experiment tracking, model serving, and artifact management features aid in model selection by allowing us to identify the best-performing models.
5. Alert Configuration¶
The system monitors critical thresholds for performance, including task duration, error rate spikes, resource exhaustion, and pipeline stalls. It also monitors quality thresholds, including data validation failures, model performance degradation, processing anomalies, and system health issues
Results and Impact¶
The pipeline successfully processes a thousand of Reddit posts daily, generates concise and meaningful summaries, identifies key trends and discussions, maintains high accuracy in content filtering, provides real-time insights through a web interface.
Future Improvements¶
- Enhanced Analysis
- Integration with additional AI models
- Advanced visualization features
- Real-time analysis capabilities
- System Scalability
- Distributed processing implementation
- Enhanced caching mechanisms
- API endpoint for analysis access
- User Experience
- Interactive dashboard development
- Customizable analysis parameters
- Trend prediction features
Conclusion¶
This project showcases the power of automation in extracting valuable insights from social media using AI and data engineering, while handling real-world challenges. The open-source code is on GitHub, and welcome community contributions.