From Reddit to Insights: Building an AI-Powered Data Pipeline with Gemini (Cloud)¶
Introduction¶
Purpose
In this blog post, I document the process of building an AI-driven, cloud data pipeline to automate this task. Using Google’s Gemini AI, the pipeline collects, processes, and synthesizes discussions from AI-related subreddits into structured daily reports. The system is designed to filter out irrelevant or harmful content, ensuring the extracted insights are both meaningful and actionable.
Check out the project GitHub repository for the full code and detailed documentation and Web Application.
Problem Statement
The field of artificial intelligence and machine learning evolves at an unprecedented pace, with new breakthroughs, trends, and discussions emerging daily. Platforms like Reddit host vibrant AI-focused communities that are rich with valuable insights. However, manually monitoring multiple subreddits to extract meaningful information is both time-consuming and inefficient.
What You’ll Learn¶
- Leveraging Google Cloud Storage (GCS) for scalable data lake: How to use GCS for storing raw data, ML artifacts, and daily results.
- Optimizing data warehousing with BigQuery and dbt: Best practices for structuring, managing, and transforming data efficiently in BigQuery.
- Applying AI for sentiment analysis and summarization: Techniques to extract concise insights from unstructured Reddit discussions using BART and RoBERTa models.
- Utilizing Google’s Gemini AI for content analysis: Insights into leveraging advanced AI models to categorize and interpret data.
- Orchestrating data workflows with Airflow on a single VM: How to manage complex data pipelines using Airflow in a cloud environment.
- Implementing cost-effective cloud solutions: Strategies for managing costs through scheduled VM start/stop, preemptible instances, and resource optimization.
System Architecture: Modular and Scalable Design¶
Our pipeline is designed with modularity and scalability in mind, comprising six main layers. Below is a high-level overview of how the components interact:
The diagram above illustrates the flow of data through our system, from collection to presentation. Each layer has specific responsibilities and communicates with adjacent layers through well-defined interfaces.
1. Data Collection Layer¶
- Reddit API Integration:
- Fetch posts and comments from AI-focused subreddits
- Store raw data in GCS buckets
- Text Preprocessing:
- Clean and standardize text data
- Prepare for ML processing
- BigQuery Integration:
- Load data into staging tables
- Manage processing state
- Incremental processing
2. Storage Layer¶
- Google Cloud Storage (GCS):
- Raw data storage
- ML model artifacts
- Daily results
- BigQuery:
- Structured data storage
- Efficient query processing
- Cost-optimized table partitioning
- DBT Transformations:
- Stage posts and comments for further processing.
- Clean and standardize data structures.
- Manage processing state for each pipeline run.
- Automate cleanup upon pipeline completion.
3. Processing Layer¶
- Text Summarization: BART for concise summaries
- Sentiment Analysis: RoBERTa model
- Insight Generation: Google's Gemini AI
- MLflow Integration: Track experiments and models
4. Orchestration Layer¶
- Single VM Deployment:
- e2-standard-2 instance
- Docker container orchestration
- Automated start/stop
- Airflow DAGs:
- Main pipeline workflow
- GitHub sync automation
- Metric collection
5. Observability Layer¶
- Cloud Monitoring:
- VM health checks
- Cost monitoring
- Alert policies
- Local Monitoring:
- Prometheus metrics
- Grafana dashboards
- StatsD integration
6. Cost Management Layer¶
- VM Lifecycle:
- Scheduled start/stop
- Preemptible instances
- Resource optimization
This modular design ensures adaptability, maintainability, and scalability, enabling seamless interaction between components and the efficient transformation of Reddit data into actionable insights.
Deep Dive: Key Components¶
Resource Management¶
The Reddit AI Pulse infrastructure on Google Cloud is a comprehensive data processing platform, designed with Terraform for scalability and security. At its heart, a custom network provides a secure foundation for a Debian VM running Airflow, which orchestrates the entire data pipeline. Data is efficiently managed using BigQuery for both raw feeds and analytics, along with a Cloud Storage bucket for logs and other artifacts. This automated, cost-effective setup ensures reliable analysis and processing of large-scale Reddit data.
Resource Creation¶
build_res.sh
is a shell script that builds the resources using Terraform.
# =============================================================================
# build_res.sh
# -----------------------------------------------------------------------------
# Main deployment script for the Reddit AI Pulse Cloud infrastructure.
# This script sets up and configures all necessary GCP resources for the project.
#
# The script performs the following operations:
# 1. Loads environment configuration
# 2. Generates terraform.tfvars from environment variables
# 3. Sets up Docker and Artifact Registry
# 4. Builds and pushes Cloud Run images
# 5. Initializes and applies Terraform configuration
# 6. Updates Cloud Run URLs in .env file
# 7. Manages service account credentials
# 8. Uploads secrets to GitHub
# 9. Configures Git repository settings
#
# The script includes error handling, logging, and automatic backup
# of sensitive files. It can optionally commit changes to Git.
#
# Usage:
# ./build_res.sh
#
# Requirements:
# - .env file with required configuration
# - Authenticated gcloud CLI
# - Docker installed and configured
# - GitHub CLI installed and authenticated
# - Terraform installed
# =============================================================================
upload-secrets.yml
workflow. name: Upload Secrets to GCS
on:
workflow_dispatch: # Manual trigger
repository_dispatch:
types: [secrets_updated] # Custom event type
push:
paths:
- '.github/workflows/upload-secrets.yml'
jobs:
upload-secrets:
runs-on: ubuntu-latest
permissions:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4
- id: 'auth'
name: 'Authenticate to Google Cloud'
uses: 'google-github-actions/auth@v2'
with:
credentials_json: '${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}'
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v2'
- name: 'Create secrets directory'
run: mkdir -p secrets
- name: 'Create .env file'
run: |
cat << EOF > secrets/.env
# Docker Registry
DOCKER_REGISTRY=${{ secrets.DOCKER_REGISTRY }}
# GCP Project Configuration
GCP_PROJECT_ID=${{ secrets.GCP_PROJECT_ID }}
GCS_BUCKET_NAME=${{ secrets.GCS_BUCKET_NAME }}
GCP_REGION=${{ secrets.GCP_REGION }}
GCP_ZONE=${{ secrets.GCP_ZONE }}
# Service Account & Authentication
GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/credentials/service-account.json
SA_EMAIL=${{ secrets.SA_EMAIL }}
# VM Configuration
VM_INSTANCE_NAME=${{ secrets.VM_INSTANCE_NAME }}
VM_MACHINE_TYPE=${{ secrets.VM_MACHINE_TYPE }}
# Network Configuration
NETWORK_NAME=${{ secrets.NETWORK_NAME }}
SUBNET_NAME=${{ secrets.SUBNET_NAME }}
SUBNET_CIDR=${{ secrets.SUBNET_CIDR }}
# Email Configuration
ALERT_EMAIL=${{ secrets.ALERT_EMAIL }}
ALERT_EMAIL_PASSWORD=${{ secrets.ALERT_EMAIL_PASSWORD }}
# BigQuery Configuration
BIGQUERY_DATASET_RAW=${{ secrets.BIGQUERY_DATASET_RAW }}
BIGQUERY_DATASET_PROCESSED=${{ secrets.BIGQUERY_DATASET_PROCESSED }}
# Reddit API Configuration
REDDIT_CLIENT_ID=${{ secrets.REDDIT_CLIENT_ID }}
REDDIT_CLIENT_SECRET=${{ secrets.REDDIT_CLIENT_SECRET }}
REDDIT_USERNAME=${{ secrets.REDDIT_USERNAME }}
REDDIT_PASSWORD=${{ secrets.REDDIT_PASSWORD }}
REDDIT_USER_AGENT=${{ secrets.REDDIT_USER_AGENT }}
# Gemini API Key
GOOGLE_GEMINI_API_KEY=${{ secrets.GOOGLE_GEMINI_API_KEY }}
# GitHub Configuration
GH_PAT=${{ secrets.GH_PAT }}
GH_OWNER=${{ secrets.GH_OWNER }}
GH_REPO=${{ secrets.GH_REPO }}
GH_WEBSITE_REPO=${{ secrets.GH_WEBSITE_REPO }}
AUTO_COMMIT=${{ secrets.AUTO_COMMIT }}
# Grafana credentials
GF_SECURITY_ADMIN_USER=${{ secrets.GF_SECURITY_ADMIN_USER }}
GF_SECURITY_ADMIN_PASSWORD=${{ secrets.GF_SECURITY_ADMIN_PASSWORD }}
# Airflow PostgreSQL credentials
AIRFLOW_DB_USER=${{ secrets.AIRFLOW_DB_USER }}
AIRFLOW_DB_PASSWORD=${{ secrets.AIRFLOW_DB_PASSWORD }}
AIRFLOW_DB_NAME=${{ secrets.AIRFLOW_DB_NAME }}
# VM Function URLs
STOP_VM_FUNCTION_URL=${{ secrets.STOP_VM_FUNCTION_URL }}
START_VM_FUNCTION_URL=${{ secrets.START_VM_FUNCTION_URL }}
# Airflow configuration
AIRFLOW_UID=50000
AIRFLOW_GID=0
EOF
- name: 'Create service account key file'
run: |
echo '${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}' > secrets/service-account.json
- name: 'Upload secrets to GCS'
run: |
gsutil cp secrets/.env gs://${{ secrets.GCS_BUCKET_NAME }}/secrets/.env
gsutil cp secrets/service-account.json gs://${{ secrets.GCS_BUCKET_NAME }}/secrets/service-account.json
Resource Deletion¶
cleanup.sh
is a shell script that deletes the resources using Terraform.
# =============================================================================
# cleanup.sh
# -----------------------------------------------------------------------------
# A comprehensive cleanup script for the Reddit AI Pulse Cloud infrastructure.
# This script systematically removes all GCP resources created by the project.
#
# The script performs cleanup in the following order:
# 1. IAM (roles and bindings)
# 2. Cloud Scheduler jobs
# 3. Cloud Functions
# 4. GCS buckets and contents
# 5. BigQuery datasets and tables
# 6. Artifact Registry repositories
# 7. Cloud Run services
# 8. Service accounts
# 9. Compute Engine resources
# 10. VPC, subnets, and firewall rules
# 11. Monitoring resources (alerts, dashboards)
# 12. Terraform state
# 13. Log buckets
#
# The script includes error handling, retries for dependent resources,
# and creates detailed logs of the cleanup process in the resource_info directory.
#
# Usage:
# ./cleanup.sh
#
# Requirements:
# - GCP project configuration in .env file
# - Authenticated gcloud CLI
# - Required permissions to delete resources
# =============================================================================
Reddit Data Collection and Preprocessing¶
The foundation of our pipeline is reliable data collection and preprocessing. We utilize Python's Reddit API wrapper (PRAW) to fetch posts and comments from specified subreddits, with immediate text preprocessing for clean data storage.
1. Data Collection with PRAW¶
The Reddit client implementation focuses on efficient and reliable data collection. Here's how we handle both posts and their comments:
def fetch_and_save_posts(reddit, subreddit_name, db_utils, conn):
"""Fetches new posts from a subreddit since the last processed timestamp."""
last_processed_utc = get_last_processed_timestamp(conn, subreddit_name)
current_max_utc = 0
subreddit = reddit.subreddit(subreddit_name)
for submission in subreddit.hot(limit=20):
# Skip already processed posts
if submission.created_utc <= last_processed_utc:
continue
# Process post data
post_data = {
"subreddit": subreddit_name,
"post_id": submission.id,
"title": str(submission.title),
"author": str(submission.author),
"url": str(submission.url),
"score": int(submission.score),
"created_utc": submission.created_utc,
"comments": []
}
# Handle comments if they exist
if submission.num_comments > 0:
try:
submission.comments.replace_more(limit=0) # Expand comment tree
post_data["comments"] = [
{
"comment_id": comment.id,
"author": str(comment.author),
"body": str(comment.body),
"created_utc": comment.created_utc,
}
for comment in submission.comments.list()[:10] # Top 10 comments
if comment.created_utc > last_processed_utc
]
except Exception as e:
logging.error(f"Error fetching comments for post {submission.id}: {e}")
# Save to database
db_utils.insert_raw_post_data(conn, post_data)
current_max_utc = max(current_max_utc, submission.created_utc)
The raw data is stored in GCS as a JSON file.
def save_to_gcs(storage_client: storage.Client, bucket_name: str,
subreddit_name: str, posts: List[Dict]) -> None:
"""Saves posts data to Google Cloud Storage."""
try:
# Get bucket
bucket = storage_client.bucket(bucket_name)
# Generate blob name with current timestamp
batch_timestamp = datetime.utcnow()
blob_name = get_gcs_blob_name(subreddit_name, batch_timestamp)
blob = bucket.blob(blob_name)
# Convert to newline-delimited JSON
ndjson_data = '\n'.join(json.dumps(post) for post in posts)
blob.upload_from_string(ndjson_data)
logger.info(f"Saved {len(posts)} posts to GCS: {blob_name}")
except Exception as e:
logger.error(f"Error saving to GCS: {e}")
raise
Our system is designed for efficient and reliable data processing. We use incremental data collection, fetching only new data based on timestamps. To manage the volume of Reddit data, we focus on the top 10 comments per post, allowing us to process approximately 1,000 comments daily in about 50 minutes. This can be easily scaled by adding more compute resources. We store comments in JSONB format, which provides flexibility for handling semi-structured data.
We've implemented robust error handling with retry mechanisms and transaction management to ensure data consistency. Batch processing is used to improve scalability and efficiency when dealing with large datasets. While stream processing could further enhance scalability, we opted for batch processing for this use case.
JSON was chosen for its ability to handle semi-structured data like Reddit comments, which often vary in format and content. By storing comments as JSON, the system accommodates diverse data structures without rigid schemas, while still allowing efficient querying and indexing for analytics.
2. Data Preprocessing with PySpark¶
To ensure the quality of our analysis, we implement a series of preprocessing steps for Reddit posts using PySpark. The following code snippet demonstrates how we filter and clean post titles:
Posts Processing¶
The following code snippet demonstrates how we parse, filter, and clean posts:
def preprocess_posts(df):
"""Preprocesses Reddit posts with content filtering and cleaning."""
# Content filtering patterns
profanity_pattern = r'(?i)\b(wordword2|word3|word4|word5)\b|' + \
r'(?i)\b(w\*\*d1|w\*rd2|w\*rd3|w\*\*d4)\b|' + \
r'(?i)w[^\w]?r[^\w]?d1|' + \
r'(?i)w[^\w]?r[^\w]?d2'
# Filter and clean posts
return df.filter(
# Basic validation
(F.col("title").isNotNull()) &
(F.col("title") != "") &
(F.col("title") != "[deleted]") &
(F.col("author").isNotNull()) &
(F.col("author") != "[deleted]") &
# Content filtering
~F.col("title").rlike(profanity_pattern)
).withColumn(
# Clean and normalize text
"title", F.regexp_replace("title", r'[\n\r\t]', ' ')
).withColumn(
# Remove multiple spaces
"title", F.regexp_replace("title", r'\s+', ' ')
).dropDuplicates(["post_id"])
title
and author
fields, clean and normalize the text, and remove duplicate posts. This ensures our analysis is based on clean and relevant data. Comments Processing¶
Similar to posts, we also preprocess Reddit comments to ensure data quality. The following code snippet demonstrates how we parse, filter, and clean comments stored in JSON format:
def preprocess_comments(df):
"""Preprocesses Reddit comments with content filtering and cleaning."""
# Parse and explode comments from JSONB
comments_schema = ArrayType(StructType([
StructField("body", StringType(), True),
StructField("author", StringType(), True),
StructField("comment_id", StringType(), True),
StructField("created_utc", DoubleType(), True)
]))
return df.withColumn(
"comments_parsed",
F.from_json(F.col("comments"), comments_schema)
).withColumn(
"comment", F.explode("comments_parsed")
).select(
"post_id",
F.col("comment.body").alias("body"),
F.col("comment.author").alias("author"),
F.col("comment.comment_id").alias("comment_id"),
F.to_timestamp(F.col("comment.created_utc")).alias("created_utc")
).filter(
# Remove deleted/empty comments
(F.col("body").isNotNull()) &
(F.col("body") != "") &
(F.col("body") != "[deleted]") &
(F.col("author") != "[deleted]")
).withColumn(
# Clean and normalize text
"body", F.regexp_replace("body", r'[\n\r\t]', ' ')
).withColumn(
# Remove multiple spaces
"body", F.regexp_replace("body", r'\s+', ' ')
).dropDuplicates(["comment_id"])
Processed Data Schema¶
The cleaned data is stored in separate tables for posts and comments:
# Dynamic processed posts tables for each subreddit
resource "google_bigquery_table" "processed_posts_tables" {
for_each = toset(var.subreddits)
dataset_id = google_bigquery_dataset.processed_data.dataset_id
table_id = "posts_${lower(each.value)}"
project = var.project
deletion_protection = false
schema = jsonencode([
{
name = "post_id",
type = "STRING",
mode = "REQUIRED"
},
{
name = "subreddit",
type = "STRING",
mode = "REQUIRED"
},
{
name = "title",
type = "STRING",
mode = "NULLABLE"
},
{
name = "author",
type = "STRING",
mode = "NULLABLE"
},
{
name = "url",
type = "STRING",
mode = "NULLABLE"
},
{
name = "score",
type = "INTEGER",
mode = "NULLABLE"
},
{
name = "created_utc",
type = "TIMESTAMP",
mode = "NULLABLE"
},
{
name = "ingestion_timestamp",
type = "TIMESTAMP",
mode = "NULLABLE"
}
])
}
# Dynamic processed comments tables for each subreddit
resource "google_bigquery_table" "processed_comments_tables" {
for_each = toset(var.subreddits)
dataset_id = google_bigquery_dataset.processed_data.dataset_id
table_id = "comments_${lower(each.value)}"
project = var.project
deletion_protection = false
schema = jsonencode([
{
name = "post_id",
type = "STRING",
mode = "REQUIRED"
},
{
name = "comment_id",
type = "STRING",
mode = "REQUIRED"
},
{
name = "author",
type = "STRING",
mode = "NULLABLE"
},
{
name = "body",
type = "STRING",
mode = "NULLABLE"
},
{
name = "created_utc",
type = "TIMESTAMP",
mode = "NULLABLE"
}
])
}
Daily Summary Processing¶
After preprocessing, we generate daily summaries of the Reddit data using PySpark. The following code snippet demonstrates how we load, filter, and join posts and comments to create these summaries:
def generate_daily_summaries(spark: SparkSession, project_id: str, dataset_id: str, subreddits: list, bucket_name: str):
"""
Generates daily summaries of the processed data using PySpark.
Implements incremental loading by only processing data since the last processed timestamp.
"""
current_timestamp = datetime.now()
last_processed = get_last_processed_timestamp(spark, project_id, dataset_id)
total_summaries_added = 0
if last_processed:
logger.info(f"Processing summaries from {last_processed} to {current_timestamp}")
else:
logger.info("No previous summaries found. Processing all available data.")
for subreddit in subreddits:
logger.info(f"Processing daily summary for subreddit: {subreddit}")
# Load posts and comments tables from BigQuery
posts_df = spark.read.format('bigquery') \
.option('table', f"{project_id}.{dataset_id}.posts_{subreddit.lower()}") \
.load()
posts_count = posts_df.count()
logger.info(f"Found {posts_count} posts for {subreddit}")
comments_df = spark.read.format('bigquery') \
.option('table', f"{project_id}.{dataset_id}.comments_{subreddit.lower()}") \
.load()
comments_count = comments_df.count()
logger.info(f"Found {comments_count} comments for {subreddit}")
# Filter posts and comments based on created_utc timestamp
if last_processed:
daily_posts_df = posts_df.filter(
(F.col("created_utc") > last_processed) &
(F.col("created_utc") <= current_timestamp)
)
daily_comments_df = comments_df.filter(
(F.col("created_utc") > last_processed) &
(F.col("created_utc") <= current_timestamp)
)
else:
daily_posts_df = posts_df.filter(F.col("created_utc") <= current_timestamp)
daily_comments_df = comments_df.filter(F.col("created_utc") <= current_timestamp)
filtered_posts_count = daily_posts_df.count()
logger.info(f"Filtered to {filtered_posts_count} posts for processing")
if filtered_posts_count == 0:
logger.info(f"No new posts to summarize for {subreddit}")
continue
# Join posts and comments on post_id
joined_df = daily_posts_df.alias("posts").join(
daily_comments_df.alias("comments"),
"post_id",
"right"
)
# Prepare the daily summary
daily_summary_df = joined_df.select(
F.monotonically_increasing_id().alias("id"),
F.col("subreddit"),
F.col("posts.post_id").alias("post_id"),
F.col("posts.score").alias("post_score"),
F.col("posts.url").alias("post_url"),
F.col("comments.comment_id").alias("comment_id"),
F.to_date(F.col("comments.created_utc")).alias("summary_date"),
F.current_timestamp().alias("processed_date"),
F.lit(True).alias("needs_processing"),
F.col("posts.title").alias("post_content"),
F.col("comments.body").alias("comment_body")
)
# Filter out rows where required fields are null
daily_summary_df = daily_summary_df.filter(
(F.col("comment_body").isNotNull()) &
(F.col("comment_id").isNotNull()) &
(F.col("post_id").isNotNull())
)
filtered_summaries_count = daily_summary_df.count()
logger.info(f"Generated {filtered_summaries_count} summaries before deduplication")
# Deduplication Logic
existing_summary_df = spark.read.format('bigquery') \
.option('table', f"{project_id}.{dataset_id}.daily_summary_data") \
.load()
# Perform a left anti-join to get only new records
new_daily_summary_df = daily_summary_df.join(
existing_summary_df,
["post_id", "comment_id"], # Using composite key for deduplication
"left_anti"
)
new_summaries_count = new_daily_summary_df.count()
if new_summaries_count > 0:
# Write the new daily summaries to BigQuery
new_daily_summary_df.write.format('bigquery') \
.option('table', f"{project_id}.{dataset_id}.daily_summary_data") \
.option('temporaryGcsBucket', bucket_name) \
.mode('append') \
.save()
total_summaries_added += new_summaries_count
logger.info(f"Successfully added {new_summaries_count} new summaries for {subreddit}")
else:
logger.info(f"No new unique summaries to add for {subreddit}")
The daily summary data is stored with the following schema:
resource "google_bigquery_table" "daily_summary" {
dataset_id = google_bigquery_dataset.processed_data.dataset_id
table_id = "daily_summary_data"
project = var.project
deletion_protection = false
schema = jsonencode([
{
name = "id",
type = "INTEGER",
mode = "REQUIRED"
},
{
name = "subreddit",
type = "STRING",
mode = "REQUIRED"
},
{
name = "post_id",
type = "STRING",
mode = "REQUIRED"
},
{
name = "post_score",
type = "INTEGER",
mode = "NULLABLE"
},
{
name = "post_url",
type = "STRING",
mode = "NULLABLE"
},
{
name = "comment_id",
type = "STRING",
mode = "NULLABLE"
},
{
name = "summary_date",
type = "DATE",
mode = "REQUIRED"
},
{
name = "processed_date",
type = "TIMESTAMP",
mode = "REQUIRED"
},
{
name = "needs_processing",
type = "BOOLEAN",
mode = "REQUIRED"
},
{
name = "post_content",
type = "STRING",
mode = "NULLABLE"
},
{
name = "comment_body",
type = "STRING",
mode = "NULLABLE"
}
])
}
dbt¶
DBT Data Transformations¶
Our data transformations are managed using dbt, which allows us to structure our project with specific models for each stage of processing. This approach provides a clear, modular, and maintainable data pipeline.
# dbt_project.yml configuration
name: 'dbt_reddit_summary_cloud'
version: '1.0.0'
models:
dbt_reddit_summary_local:
+materialized: table # Default materialization
current_summary_staging:
+materialized: view # Staging view
joined_summary_analysis:
+materialized: table # Final analysis
update_processing_status:
+materialized: incremental
vars:
subreddits: ['dataengineering', 'datascience', 'machinelearning', 'claudeai',
'singularity', 'localllama', 'openai', 'stablediffusion']
The DBT workflow includes three main transformation models:
-
Staging Model (
The current_summary_staging model is a view that selects key fields from the daily summary data, filtering out rows with null comment bodies and those that do not need processing. This view is materialized as a view for real-time filtering.current_summary_staging
): -
Analysis Model (
joined_summary_analysis
):The joined_summary_analysis model joins the staging data with text summary and sentiment analysis results. It then validates the quality of comments and summaries, selecting only valid rows for further analysis. This model also adds processing metadata.{{ config( materialized='table', unique_key='comment_id', ) }} WITH validation_cte AS ( SELECT cs.*, ts.comment_summary, sa.sentiment_score, sa.sentiment_label, CASE WHEN LENGTH(cs.comment_body) < 2 THEN 'Too short' WHEN LENGTH(cs.comment_body) > 10000 THEN 'Too long' ELSE 'Valid' END as comment_quality, CASE WHEN ts.comment_summary IS NULL OR TRIM(ts.comment_summary) = '' THEN 'Missing' WHEN LENGTH(ts.comment_summary) > LENGTH(cs.comment_body) THEN 'Invalid' ELSE 'Valid' END as summary_quality FROM {{ source('summary_analytics', 'current_summary_staging') }} cs LEFT JOIN {{ source('summary_analytics', 'text_summary_results') }} ts ON cs.comment_id = ts.comment_id LEFT JOIN {{ source('summary_analytics', 'sentiment_analysis_results') }} sa ON cs.comment_id = sa.comment_id ) SELECT *, CURRENT_TIMESTAMP as processed_at, 'dbt' as processed_by FROM validation_cte WHERE comment_quality = 'Valid' AND summary_quality = 'Valid'
-
Processing Status Model (
update_processing_status
):The update_processing_status model is an incremental model that updates the processing status of comments. It selects all records that were attempted to be processed, including those that failed validation, and uses a post-hook to mark them as processed in the source table. This ensures that comments are not reprocessed.{{ config( materialized='incremental', unique_key=['comment_id', 'post_id'], post_hook=[ """ UPDATE {{ source('summary_analytics', 'daily_summary_data') }} SET needs_processing = FALSE WHERE comment_id IN ( SELECT comment_id FROM {{ this }} ); """ ] ) }} -- Select all records that were attempted to be processed, including failed validations WITH validation_cte AS ( SELECT comment_id, comment_body, post_id FROM {{ source('summary_analytics', 'current_summary_staging') }} ), all_processed_records AS ( SELECT cs.comment_id, cs.post_id, v.comment_quality FROM {{ source('summary_analytics', 'current_summary_staging') }} cs LEFT JOIN ( SELECT comment_id, CASE WHEN LENGTH(comment_body) < 2 THEN 'Too short' WHEN LENGTH(comment_body) > 10000 THEN 'Too long' ELSE 'Valid' END as comment_quality FROM validation_cte ) v ON cs.comment_id = v.comment_id ) SELECT comment_id, post_id FROM all_processed_records
DBT Data Quality Tests¶
Our project implements comprehensive data quality tests using DBT's testing framework. These tests are defined in our schema.yml
:
version: 2
sources:
- name: raw_data
schema: raw_data
tables:
- name: raw_dataengineering
columns: &raw_columns
- name: post_id
tests:
- not_null
- unique
- is_text
- name: subreddit
tests:
- not_null
- is_text
- accepted_values:
values: ["dataengineering", "datascience", "machinelearning",
"claudeai", "singularity", "localllama", "openai",
"stablediffusion"]
- name: score
tests:
- not_null
- is_int
- dbt_utils.expression_is_true:
expression: "{{ column_name }} >= 0"
- name: comments
tests:
- not_null
- is_json
Our testing strategy includes generic tests for data presence and type validation, custom tests for specific data structures and rules, relationship tests for data integrity, and data quality metrics to monitor data health. These tests are integrated into our CI/CD pipeline for early detection of issues.
Text Processing Pipeline¶
Our cleaned data is analyzed using transformer models for text summarization and sentiment analysis, with MLflow for experiment tracking.
1. Text Summarization¶
We use a fine-tuned BART model for generating concise summaries of Reddit comments:
def create_summarizer():
"""Initialize the summarization model."""
model_name = "philschmid/bart-large-cnn-samsum"
local_model_path = "/models/models--philschmid--bart-large-cnn-samsum/snapshots/e49b3d60d923f12db22bdd363356f1a4c68532ad"
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
model = AutoModelForSeq2SeqLM.from_pretrained(
local_model_path,
torch_dtype=torch.float32,
device_map="auto",
low_cpu_mem_usage=True
)
return pipeline(
"summarization",
model=model,
tokenizer=tokenizer,
framework="pt"
)
def generate_summary(comment_body, summarizer):
"""Generate summary for a given comment."""
if len(comment_body.split()) <= 59:
return comment_body
try:
summary = summarizer(
comment_body,
max_length=60,
min_length=15,
do_sample=False,
truncation=True
)
if summary and isinstance(summary, list) and len(summary) > 0:
return summary[0]['summary_text']
except Exception as e:
logging.error(f"Summarization failed: {e}")
return ""
We use a fine-tuned BART-large model for dialogue summarization, which outperforms the standard model on Reddit comments. It's optimized for local machine use with efficient memory management.
2. Sentiment Analysis¶
We employ a RoBERTa-based model fine-tuned for emotion detection:
def initialize_emotion_analyzer():
"""Initialize the emotion analysis model."""
model_name = "SamLowe/roberta-base-go_emotions"
local_model_path = "/models/models--SamLowe--roberta-base-go_emotions/snapshots/58b6c5b44a7a12093f782442969019c7e2982299"
model = AutoModelForSequenceClassification.from_pretrained(local_model_path)
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
return pipeline(
"text-classification",
model=model,
tokenizer=tokenizer
)
def perform_sentiment_analysis(text, emotion_analyzer):
"""Perform sentiment analysis on the given text."""
try:
if not text or text.strip() == "":
return "Neutral", 0.0, "No content"
truncated_text = truncate_text(text, max_length=500)
emotion_output = emotion_analyzer(truncated_text)
if not emotion_output:
return "Neutral", 0.0, "Analysis failed"
emotion_label = emotion_output[0]['label']
emotion_score = emotion_output[0]['score']
# Map emotions to sentiment categories
sentiment_mapping = {
'joy': 'Positive',
'love': 'Positive',
'admiration': 'Positive',
'approval': 'Positive',
'excitement': 'Positive',
'gratitude': 'Positive',
'optimism': 'Positive',
'anger': 'Negative',
'disappointment': 'Negative',
'disgust': 'Negative',
'fear': 'Negative',
'sadness': 'Negative',
'confusion': 'Neutral',
'curiosity': 'Neutral',
'surprise': 'Neutral'
}
sentiment_label = sentiment_mapping.get(emotion_label.lower(), 'Neutral')
return sentiment_label, float(emotion_score), emotion_label
except Exception as e:
logging.error(f"Error during sentiment analysis: {str(e)}")
return "Neutral", 0.0, "Error in analysis"
Our sentiment analysis component uses a RoBERTa model fine-tuned for emotion detection, which we found to be more effective than standard sentiment analysis for Reddit comments. Key features include smart text truncation, mapping to sentiment categories (Positive, Negative, or Neutral), and comprehensive error handling.
3. Pipeline Integration¶
Both components are integrated into the data pipeline with MLflow, which we use for experiment tracking, allowing us to monitor model performance and manage different model versions. We also use MLflow for model serving, enabling us to deploy our models.
# MLflow configuration
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("reddit_sentiment_analysis_experiments")
with mlflow.start_run() as run:
# Log parameters
mlflow.log_param("model_name", "SamLowe/roberta-base-go_emotions")
Gemini AI Analysis¶
We use Google's Gemini AI for advanced analysis, generating structured insights from Reddit discussions with a focus on content safety and consistent formatting.
1. Analysis Configuration¶
The Gemini analysis is guided by a prompt template that instructs the model to filter content for safety, rank threads by score, summarize content, analyze emotions, extract top viewpoints, and embed links. The output is structured with a title, date, description, tags, overall ranking, and detailed thread analysis.
def create_prompt_template():
"""Return the standard prompt template for Gemini analysis."""
current_date = datetime.now().strftime('%Y-%m-%d')
return f"""
Analyze the provided text files, which contain Reddit posts and comments.
**Instructions:**
1. **Content Filtering:** Check for harassment, hate speech, or explicit material
2. **Ranking:** Rank threads by total "Score"
3. **Summarization:** Utilize "Summary" fields for thread overviews
4. **Emotional Analysis:** Analyze "Emotion Label" and "Emotion Score"
5. **Point of View Extraction:** Extract top 3 viewpoints per thread
6. **Links:** Embed URLs in thread titles using Markdown
**Output Format:**
---
title: "{{subreddit_name}} subreddit"
date: "{current_date}"
description: "Analysis of top discussions and trends"
tags: ["tag1", "tag2", "tag3"]
---
# Overall Ranking and Top Discussions
[Ranked list of threads with scores and summaries]
# Detailed Analysis by Thread
[Thread-by-thread analysis with summaries, emotions, and viewpoints]
"""
2. Data Processing¶
The system processes data subreddit by subreddit, formatting each post and its comments:
def process_subreddit(model, cur, subreddit, output_dir):
"""Process a single subreddit's data."""
# Fetch processed data
cur.execute("""
SELECT post_id, subreddit, post_score, post_url, comment_id,
summary_date, post_content, comment_body, comment_summary,
sentiment_score, sentiment_label
FROM processed_data.joined_summary_analysis
WHERE subreddit = %s
ORDER BY post_score DESC
""", (subreddit,))
# Format data for analysis
text_files = "".join(
format_text_file(row) for row in cur.fetchall()
)
# Generate insights using Gemini
response = model.generate_content(final_prompt + text_files)
# Save formatted output
output_file_path = os.path.join(output_dir, f"llm_{subreddit}.md")
with open(output_file_path, "w", encoding="utf-8") as f:
f.write(response.text)
3. Output Formatting¶
The system ensures consistent formatting across all analyses:
def get_formatted_subreddit_name(subreddit: str) -> str:
"""Returns a properly formatted subreddit name for display."""
subreddit_formats = {
"claudeai": "ClaudeAI",
"dataengineering": "Data Engineering",
"datascience": "Data Science",
"localllama": "LocalLLaMA",
"machinelearning": "Machine Learning",
"openai": "OpenAI",
"singularity": "Singularity",
"stablediffusion": "Stable Diffusion"
}
return f"{subreddit_formats.get(subreddit.lower(), subreddit)} Subreddit"
4. Pipeline Integration¶
The analysis is integrated into the main pipeline with organized output storage:
def analyze_data():
"""
Main function to analyze Reddit data using Google's Gemini model.
Pipeline steps:
1. Set up logging and output directory with current date
2. Initialize Gemini model
3. Connect to BigQuery and GCS
4. Process each subreddit
5. Clean generated markdown files
6. Upload results to GCS
"""
try:
# 1. Set up output directory with year/month/day structure
current_date = datetime.now()
year = current_date.strftime('%Y')
month = current_date.strftime('%m')
day = current_date.strftime('%d')
output_dir = os.path.join(CLOUD_DIR, 'results', year, month, day)
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Output directory set to {output_dir}")
# 2. Initialize model
genai.configure(api_key=GEMINI_CONFIG['GOOGLE_GEMINI_API_KEY'])
model = genai.GenerativeModel('gemini-2.0-flash-exp')
logger.info("Model loaded")
# 3. Connect to BigQuery and GCS
bq_client = bigquery.Client()
storage_client = storage.Client()
logger.info("BigQuery and GCS clients initialized")
# 4. Process subreddits
for subreddit in SUBREDDITS:
process_subreddit(model, bq_client, subreddit, output_dir)
# 5. Clean markdown files and upload to GCS
for filename in os.listdir(output_dir):
if filename.endswith('.md'):
local_file_path = os.path.join(output_dir, filename)
# Clean the markdown file
clean_markdown_file(local_file_path)
# Upload to GCS
gcs_blob_path = get_gcs_path(year, month, day, filename)
upload_to_gcs(local_file_path, gcs_blob_path, storage_client)
logger.info("Analysis complete - all files processed and uploaded to GCS")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise
Web Application¶
Our web application, built with Next.js 15, React 18, TypeScript, and MDX, provides a modern and user-friendly way to explore our insights. The application automatically syncs with our on-premise analysis results and deploys to Vercel using GitHub Actions, with build optimizations for performance.
Automated Deployment¶
The deployment process leverages GitHub Actions:
# .github/workflows/deploy.yml
name: Deploy to Vercel
on:
push:
branches:
- main
pull_request:
branches:
- main
env:
VERCEL_ORG_ID: ${{ secrets.VERCEL_ORG_ID }}
VERCEL_PROJECT_ID: ${{ secrets.VERCEL_PROJECT_ID }}
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Install Vercel CLI
run: npm install --global vercel@latest
- name: Pull Vercel Environment Information
run: vercel pull --yes --environment=production --token=${{ secrets.VERCEL_TOKEN }}
- name: Build Project Artifacts
run: vercel build --prod --token=${{ secrets.VERCEL_TOKEN }}
- name: Deploy Project Artifacts to Vercel
run: vercel deploy --prebuilt --prod --token=${{ secrets.VERCEL_TOKEN }}
Orchestrator Stack¶
Our pipeline is orchestrated using Apache Airflow, configured with a robust setup that ensures reliability, scalability, and observability. The orchestration layer manages the entire data pipeline from ingestion to analysis.
1. Airflow Configuration¶
The Airflow setup is containerized using Docker Compose with several key components:
x-common-env: &common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${AIRFLOW_DB_USER}:${AIRFLOW_DB_PASSWORD}@postgres/${AIRFLOW_DB_NAME}
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://${AIRFLOW_DB_USER}:${AIRFLOW_DB_PASSWORD}@postgres/${AIRFLOW_DB_NAME}
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES: false
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/airflow_project/dags
AIRFLOW__LOGGING__BASE_LOG_FOLDER: /opt/airflow/airflow_project/logs
AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/airflow_project/plugins
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARNING
AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG
AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: "true"
AIRFLOW__LOGGING__MAX_LOG_AGE_IN_DAYS: 7
PYTHONPATH: /opt/airflow
AIRFLOW__METRICS__STATSD_ON: "true"
AIRFLOW__METRICS__STATSD_HOST: "statsd-exporter"
AIRFLOW__METRICS__STATSD_PORT: "9125"
AIRFLOW__METRICS__STATSD_PREFIX: "airflow"
AIRFLOW__METRICS__STATSD_ALLOW_LIST: "*"
AIRFLOW__METRICS__METRICS_ALLOW_LIST: "*"
AIRFLOW__WEBSERVER__EXPOSE_METRICS: 'true'
AIRFLOW__METRICS__STATSD_INTERVAL: "30"
GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS}
GH_PAT: ${GH_PAT}
GH_OWNER: ${GH_OWNER}
GH_REPO: ${GH_REPO}
GH_WEBSITE_REPO: ${GH_WEBSITE_REPO}
GCP_PROJECT_ID: ${GCP_PROJECT_ID}
GCS_BUCKET_NAME: ${GCS_BUCKET_NAME}
STOP_VM_FUNCTION_URL: ${STOP_VM_FUNCTION_URL}
ALERT_EMAIL: ${ALERT_EMAIL}
AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
AIRFLOW__SMTP__SMTP_USER: ${ALERT_EMAIL}
AIRFLOW__SMTP__SMTP_PASSWORD: ${ALERT_EMAIL_PASSWORD}
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: ${ALERT_EMAIL}
AIRFLOW__SMTP__SMTP_STARTTLS: "true"
AIRFLOW__SMTP__SMTP_SSL: "false"
x-common-volumes: &common-volumes
- ..:/opt/airflow
- ../credentials:/opt/airflow/credentials
- ../mlflow:/mlflow
- ../results:/opt/airflow/results
- ../dbt_reddit_summary_cloud:/opt/airflow/dbt_reddit_summary_cloud
x-airflow-common: &airflow-common
volumes: *common-volumes
environment:
<<: *common-env
user: "${AIRFLOW_UID:-50000}:0"
The Airflow services are deployed as separate containers, each with specific responsibilities:
-
Airflow Webserver ```yaml airflow-webserver: image: ${DOCKER_REGISTRY}/reddit-airflow-webserver:latest <<: *airflow-common depends_on: airflow-init: condition: service_completed_successfully ports:
- "8080:8080" command: airflow webserver healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 30s retries: 10 start_period: 60s restart: always ``` Our Airflow webserver provides the web UI for DAG management and monitoring, handles user authentication and authorization, exposes REST API endpoints, and includes health checks to ensure UI availability.
-
Airflow Scheduler
yaml airflow-scheduler: image: ${DOCKER_REGISTRY}/reddit-airflow-scheduler:latest <<: *airflow-common depends_on: airflow-webserver: condition: service_healthy postgres: condition: service_healthy redis: condition: service_healthy command: airflow scheduler healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 30s retries: 10 start_period: 60s restart: always
The Airflow scheduler monitors and triggers task execution, manages DAG parsing and scheduling, handles task dependencies and queuing, and ensures proper task distribution to workers. This component is crucial for orchestrating our data pipeline. -
Airflow Worker
yaml airflow-worker: image: ${DOCKER_REGISTRY}/reddit-airflow-worker:latest <<: *airflow-common depends_on: airflow-webserver: condition: service_healthy command: airflow celery worker healthcheck: test: ["CMD-SHELL", 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'] interval: 30s timeout: 30s retries: 5 restart: always
Our Airflow worker executes the actual tasks, handles ML model inference, manages resource allocation, supports parallel task execution, and is configured for ML workloads with PyTorch and Transformers. -
Airflow Init ```yaml airflow-init: build: context: . dockerfile: Dockerfile.webserver env_file:
- ../.env <<: *airflow-common depends_on: postgres: condition: service_healthy redis: condition: service_healthy environment: <<: *common-env GIT_PYTHON_REFRESH: quiet command: > bash -c " airflow db init && airflow db upgrade && airflow users create -r Admin -u admin -p admin -e admin@example.com -f Anonymous -l Admin " ``` The Airflow init service initializes the Airflow database, creates an admin user, performs database migrations, and runs only during the initial setup. This component is essential for setting up the Airflow environment.
2. Pipeline Structure¶
Our DAG (reddit_pipeline
) is organized into 27 stages but can be categorized into 6 main sections, each with specific responsibilities and metrics collection:
- Data Collection and Preprocessing Data is ingested and preprocessed.
- DBT Transformations DBT transformations are run to prepare the data for analysis.
-
Model Processing
Our analysis pipeline includes text summarization using BART, sentiment analysis with RoBERTa, and advanced analysis with Gemini AI.summarize_metrics_task = PythonOperator( task_id='parse_summarize_metrics', python_callable=parse_summarize_metrics, provide_context=True ) sentiment_task = PythonOperator( task_id='run_sentiment_analysis', python_callable=sentiment_analysis_process, provide_context=True ) gemini_task = PythonOperator( task_id='run_gemini', python_callable=gemini_analysis_process, dag=dag )
-
Quality Checks
DBT tests are run to ensure the data is valid. -
Metrics Collection
- We monitor our pipeline with dedicated metrics tasks, using a StatsD exporter to send real-time data to Prometheus, and MLflow tracking for model performance.
-
Shutdown VM using Cloud Function
We use a Cloud Function to shutdown the VM after the pipeline is complete.
3. Task Dependencies¶
The pipeline follows a clear dependency chain:
ingest_task >> ingest_metrics_task >> \
dbt_test_raw_sources >> dbt_test_raw_metrics_task >> \
dbt_staging_task >> dbt_test_staging_models >> \
summarize_task >> sentiment_task >> \
dbt_join_summary_analysis_task >> \
gemini_task >> push_gemini_results_task >> shutdown_vm
4. Integration Points¶
The orchestrator connects with BigQuery for pipeline data storage, local model deployment for model serving, StatsD and Prometheus for monitoring, and GitHub for version control of results. These connections are essential for the functionality of our pipeline.
5. Cloud Function¶
5.1. Start VM¶
The Cloud Function to start the VM is deployed using Cloud Run. It ensures seamless startup of virtual machines on-demand, optimizing resource utilization.
# Create Flask app
app = Flask(__name__)
def get_gcloud_path():
"""Get the full path to gcloud executable based on OS"""
if platform.system() == "Windows":
# Common installation paths for gcloud on Windows
possible_paths = [
r"C:\Program Files (x86)\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd",
r"C:\Program Files\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd",
os.path.expanduser("~") + r"\AppData\Local\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd"
]
for path in possible_paths:
if os.path.exists(path):
return path
raise Exception("gcloud not found. Please ensure Google Cloud SDK is installed and in PATH")
return "gcloud" # For non-Windows systems, assume it's in PATH
@app.route("/", methods=["POST"])
def start_vm_http():
"""
HTTP endpoint for Cloud Run to start a Compute Engine instance.
Accepts POST requests with optional JSON body:
{'trigger_source': 'scheduler'|'manual'|'dag'}
"""
return start_vm(request)
def start_vm(request):
"""
Function to start a Compute Engine instance and execute startup script.
"""
project = os.environ.get('PROJECT_ID')
zone = os.environ.get('ZONE')
instance = os.environ.get('INSTANCE')
gh_repo = os.environ.get('GH_REPO')
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info(f"Environment: PROJECT_ID={project}, ZONE={zone}, INSTANCE={instance}, GH_REPO={gh_repo}")
# Parse request data
trigger_source = "scheduler"
if request and hasattr(request, 'is_json') and request.is_json:
data = request.get_json()
if data and 'trigger_source' in data:
trigger_source = data.get('trigger_source')
logger.info(f"Start triggered by: {trigger_source}")
try:
# Get gcloud path
gcloud_path = get_gcloud_path()
logger.info(f"Using gcloud path: {gcloud_path}")
# Check if instance is running
status_cmd = [gcloud_path, "compute", "instances", "describe", instance,
f"--zone={zone}", f"--project={project}",
"--format=get(status)"]
status = subprocess.run(status_cmd, text=True, capture_output=True)
is_running = status.stdout.strip().upper() == "RUNNING"
if not is_running:
# Start the instance
start_cmd = [gcloud_path, "compute", "instances", "start", instance,
f"--zone={zone}", f"--project={project}", "--quiet"]
logger.info(f"Starting instance with command: {' '.join(start_cmd)}")
process = subprocess.run(start_cmd, text=True, capture_output=True)
if process.returncode != 0:
error_msg = f"Failed to start instance: {process.stderr}"
logger.error(error_msg)
return jsonify({'status': 'error', 'message': error_msg}), 500
# Wait for VM to be fully started
logger.info("Waiting 90 seconds for VM to be fully ready...")
time.sleep(90)
# Execute startup script
logger.info("Executing startup script...")
startup_cmd = [gcloud_path, "compute", "ssh", "--quiet", f"airflow@{instance}",
f"--zone={zone}", f"--project={project}",
"--command", f"bash /opt/airflow/{gh_repo}/Cloud/infrastructure/terraform/vm_scripts/start_dag.sh"]
process = subprocess.run(startup_cmd, text=True, capture_output=True)
if process.returncode == 0:
logger.info("Startup script executed successfully")
logger.info(f"Startup stdout: {process.stdout}")
msg = f'Successfully started instance and executed startup script'
return jsonify({'status': 'success', 'message': msg})
else:
error_msg = f"Startup script failed: {process.stderr}"
logger.error(error_msg)
return jsonify({'status': 'error', 'message': error_msg}), 500
except Exception as e:
error_msg = f'Error during startup process: {str(e)}'
logger.error(error_msg)
logger.error(f"Full stack trace: {traceback.format_exc()}")
return jsonify({'status': 'error', 'message': error_msg}), 500
5.2. Stop VM¶
The Cloud Function to stop the VM is deployed using Cloud Run. This helps automate shutdown processes, reducing costs by powering down unused resources.
import os
import json
import logging
import subprocess
from flask import Flask, request, jsonify
import platform
import traceback
# Create Flask app
app = Flask(__name__)
def get_gcloud_path():
"""Get the full path to gcloud executable based on OS"""
if platform.system() == "Windows":
# Common installation paths for gcloud on Windows
possible_paths = [
r"C:\Program Files (x86)\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd",
r"C:\Program Files\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd",
os.path.expanduser("~") + r"\AppData\Local\Google\Cloud SDK\google-cloud-sdk\bin\gcloud.cmd"
]
for path in possible_paths:
if os.path.exists(path):
return path
raise Exception("gcloud not found. Please ensure Google Cloud SDK is installed and in PATH")
return "gcloud" # For non-Windows systems, assume it's in PATH
@app.route("/", methods=["POST"])
def stop_vm_http():
"""
HTTP endpoint for Cloud Run to stop a Compute Engine instance.
Accepts POST requests with optional JSON body:
{'trigger_source': 'scheduler'|'manual'|'dag'}
"""
return stop_vm(request)
def stop_vm(request):
"""
Function to stop a Compute Engine instance.
Only runs shutdown script if VM is running.
Args:
request (flask.Request): The request object
- If called from manual trigger: expects JSON with {'trigger_source': 'manual'}
- If called from Scheduler: no specific payload needed
Returns:
JSON response with status message
"""
project = os.environ.get('PROJECT_ID')
zone = os.environ.get('ZONE')
instance = os.environ.get('INSTANCE')
gh_repo = os.environ.get('GH_REPO')
# Setup detailed logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Log environment variables (excluding any sensitive data)
logger.info(f"Environment: PROJECT_ID={project}, ZONE={zone}, INSTANCE={instance}, GH_REPO={gh_repo}")
# Parse request data if present
trigger_source = "scheduler"
if request and hasattr(request, 'is_json') and request.is_json:
data = request.get_json()
if data and 'trigger_source' in data:
trigger_source = data.get('trigger_source')
logger.info(f"Shutdown triggered by: {trigger_source}")
try:
# Get gcloud path
gcloud_path = get_gcloud_path()
logger.info(f"Using gcloud path: {gcloud_path}")
# Check if instance is running
status_cmd = [gcloud_path, "compute", "instances", "describe", instance,
f"--zone={zone}", f"--project={project}",
"--format=get(status)"]
status = subprocess.run(status_cmd, text=True, capture_output=True)
is_running = status.stdout.strip().upper() == "RUNNING"
if not is_running:
msg = f'Instance {instance} is already stopped (Triggered by: {trigger_source})'
logger.info(msg)
return jsonify({'status': 'success', 'message': msg})
# Execute shutdown script via SSH since VM is running
logger.info("Executing shutdown script via SSH...")
shutdown_cmd = f"bash /opt/airflow/{gh_repo}/Cloud/infrastructure/terraform/vm_scripts/shutdown.sh"
ssh_cmd = [gcloud_path, "compute", "ssh", "--quiet", f"airflow@{instance}",
f"--zone={zone}", f"--project={project}", "--command", shutdown_cmd]
logger.info(f"Executing command: {' '.join(ssh_cmd)}")
process = subprocess.run(ssh_cmd, text=True, capture_output=True)
if process.returncode != 0:
logger.error(f"SSH command failed with return code {process.returncode}")
logger.error(f"SSH stderr: {process.stderr}")
logger.error(f"SSH stdout: {process.stdout}")
raise Exception(f"Failed to execute shutdown script: {process.stderr}")
logger.info("Shutdown script executed successfully. Stopping instance...")
logger.info(f"SSH stdout: {process.stdout}")
# Stop the instance
stop_cmd = [gcloud_path, "compute", "instances", "stop", instance,
f"--zone={zone}", f"--project={project}", "--quiet"]
logger.info(f"Executing stop command: {' '.join(stop_cmd)}")
process = subprocess.run(stop_cmd, text=True, capture_output=True)
if process.returncode != 0:
logger.error(f"Stop command failed with return code {process.returncode}")
logger.error(f"Stop stderr: {process.stderr}")
logger.error(f"Stop stdout: {process.stdout}")
raise Exception(f"Failed to stop instance: {process.stderr}")
logger.info(f"Stop command stdout: {process.stdout}")
msg = f'Successfully executed shutdown script and stopped instance {instance} (Triggered by: {trigger_source})'
logger.info(msg)
return jsonify({'status': 'success', 'message': msg})
except Exception as e:
error_msg = f'Error during shutdown process: {str(e)}'
logger.error(error_msg)
logger.error(f"Full stack trace: {traceback.format_exc()}")
return jsonify({'status': 'error', 'message': error_msg}), 500
if __name__ == "__main__":
# For local testing with Flask
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 9099)), debug=False)
5.3. Cloud Scheduler¶
Cloud Scheduler is used to trigger the Cloud Function to start and stop the VM daily at 4:00 PM EST and 6:00 PM EST respectively.
# Service account for Cloud Scheduler
resource "google_service_account" "scheduler_sa" {
account_id = "vm-scheduler-sa"
display_name = "Service Account for VM Scheduler"
lifecycle {
ignore_changes = [
account_id,
display_name
]
}
}
# Grant necessary permissions
resource "google_project_iam_member" "scheduler_sa_compute_admin" {
project = var.project
role = "roles/compute.instanceAdmin.v1"
member = "serviceAccount:${google_service_account.scheduler_sa.email}"
}
# Cloud Scheduler job for starting VM (4:00 PM EST)
resource "google_cloud_scheduler_job" "start_vm_schedule" {
name = "start-vm-daily"
description = "Starts the Airflow VM daily at 4:00 PM EST"
schedule = "0 16 * * *" # 16:00 = 4:00 PM
time_zone = "America/New_York"
attempt_deadline = "900s" # 15 minutes
http_target {
http_method = "POST"
uri = google_cloud_run_service.start_vm.status[0].url
body = base64encode(jsonencode({
trigger_source = "scheduler"
}))
headers = {
"Content-Type" = "application/json"
}
}
}
# Cloud Scheduler job for stopping VM (6:00 PM EST)
resource "google_cloud_scheduler_job" "stop_vm_schedule" {
name = "stop-vm-daily"
description = "Stops the Airflow VM daily at 6:00 PM EST (backup)"
schedule = "0 18 * * *" # 18:00 = 6:00 PM
time_zone = "America/New_York"
attempt_deadline = "900s" # 15 minutes
http_target {
http_method = "POST"
uri = google_cloud_run_service.stop_vm.status[0].url
body = base64encode(jsonencode({
trigger_source = "scheduler"
}))
headers = {
"Content-Type" = "application/json"
}
}
}
Observability Stack¶
We use Grafana, Prometheus, and MLflow for comprehensive pipeline observability, with each component containerized and integrated through a centralized metrics system.
1. Metrics Collection Architecture¶
The metrics collection is structured in layers:
-
Source Metrics (via StatsD)
-
Prometheus Targets
2. Task-Specific Metrics¶
Each pipeline component has dedicated metrics tracking:
-
Gemini Analysis Metrics
-
Sentiment Analysis Performance
-
Pipeline Processing Volumes
3. Performance Dashboards¶
Our Grafana dashboard is designed to visualize processing efficiency (task durations, volumes, success rates, resource utilization), data quality (DBT test results, validation metrics, processing status, error rates), and provide real-time monitoring.
4. MLflow Integration¶
MLflow's model versioning, experiment tracking, model serving, and artifact management features aid in model selection by allowing us to identify the best-performing models.
5. Alert Configuration¶
The system monitors critical thresholds for performance, including task duration, error rate spikes, resource exhaustion, and pipeline stalls. It also monitors quality thresholds, including data validation failures, model performance degradation, processing anomalies, and system health issues
6. Cloud Monitoring¶
We use Cloud Monitoring to monitor the health of our VM and Cloud Run service, and to set up alerts for critical issues.
# Cloud Monitoring Dashboard
resource "google_monitoring_dashboard" "airflow_dashboard" {
dashboard_json = jsonencode({
displayName = "Reddit Pipeline Dashboard"
gridLayout = {
columns = "2"
widgets = [
# VM Metrics
{
title = "VM CPU Utilization"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"compute.googleapis.com/instance/cpu/utilization\" resource.type=\"gce_instance\""
}
}
}]
}
},
{
title = "VM Memory Usage"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"compute.googleapis.com/instance/memory/usage\" resource.type=\"gce_instance\""
}
}
}]
}
},
# Airflow Task Metrics
{
title = "DAG Success Rate"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"custom.googleapis.com/airflow/dag_success_rate\""
}
}
}]
}
},
{
title = "Task Duration"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"custom.googleapis.com/airflow/task_duration\""
}
}
}]
}
},
# Pipeline Metrics
{
title = "Records Processed"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"custom.googleapis.com/reddit/records_processed\""
}
}
}]
}
},
{
title = "Processing Errors"
xyChart = {
dataSets = [{
timeSeriesQuery = {
timeSeriesFilter = {
filter = "metric.type=\"custom.googleapis.com/reddit/processing_errors\""
}
}
}]
}
}
]
}
})
}
# Alert Policies
resource "google_monitoring_alert_policy" "vm_cpu_utilization" {
display_name = "High CPU Utilization Alert"
combiner = "OR"
conditions {
display_name = "CPU Usage > 80%"
condition_threshold {
filter = "metric.type=\"compute.googleapis.com/instance/cpu/utilization\" resource.type=\"gce_instance\""
duration = "300s"
comparison = "COMPARISON_GT"
threshold_value = 0.8
trigger {
count = 1
}
}
}
notification_channels = [google_monitoring_notification_channel.email.id]
}
resource "google_monitoring_alert_policy" "airflow_task_failures" {
display_name = "Airflow Task Failures Alert"
combiner = "OR"
conditions {
display_name = "Task Failure Rate > 20%"
condition_threshold {
filter = "metric.type=\"${google_monitoring_metric_descriptor.airflow_task_failure_rate.type}\" AND resource.type=\"gce_instance\""
duration = "300s"
comparison = "COMPARISON_GT"
threshold_value = 0.2
trigger {
count = 1
}
}
}
notification_channels = [google_monitoring_notification_channel.email.id]
enabled = true
depends_on = [google_monitoring_metric_descriptor.airflow_task_failure_rate]
}
resource "google_monitoring_alert_policy" "pipeline_errors" {
display_name = "Pipeline Processing Errors Alert"
combiner = "OR"
conditions {
display_name = "Processing Error Rate"
condition_threshold {
filter = "metric.type=\"${google_monitoring_metric_descriptor.processing_errors.type}\" AND resource.type=\"gce_instance\""
duration = "300s"
comparison = "COMPARISON_GT"
threshold_value = 10
trigger {
count = 1
}
}
}
notification_channels = [google_monitoring_notification_channel.email.id]
enabled = true
depends_on = [google_monitoring_metric_descriptor.processing_errors]
}
resource "google_monitoring_alert_policy" "vm_automation" {
display_name = "VM Automation Alert"
combiner = "OR"
conditions {
display_name = "VM Start/Stop Failures"
condition_threshold {
filter = "metric.type=\"${google_monitoring_metric_descriptor.vm_automation_failures.type}\" AND resource.type=\"gce_instance\""
duration = "0s"
comparison = "COMPARISON_GT"
threshold_value = 0
trigger {
count = 1
}
}
}
notification_channels = [google_monitoring_notification_channel.email.id]
enabled = true
depends_on = [google_monitoring_metric_descriptor.vm_automation_failures]
}
# Notification Channels
resource "google_monitoring_notification_channel" "email" {
display_name = "Email Notification Channel"
type = "email"
labels = {
email_address = var.alert_email_address
}
}
# Optional: Slack notification channel
resource "google_monitoring_notification_channel" "slack" {
count = var.slack_webhook_url != "" ? 1 : 0
display_name = "Slack Notification Channel"
type = "slack"
labels = {
channel_name = "#airflow-alerts"
}
sensitive_labels {
auth_token = var.slack_webhook_url
}
}
# First, define the custom metric descriptors
resource "google_monitoring_metric_descriptor" "airflow_task_failure_rate" {
description = "Rate of Airflow task failures"
display_name = "Airflow Task Failure Rate"
type = "custom.googleapis.com/airflow/task_failure_rate"
metric_kind = "GAUGE"
value_type = "DOUBLE"
unit = "1"
labels {
key = "task_id"
value_type = "STRING"
description = "The ID of the Airflow task"
}
}
resource "google_monitoring_metric_descriptor" "processing_errors" {
description = "Number of processing errors in the pipeline"
display_name = "Pipeline Processing Errors"
type = "custom.googleapis.com/reddit/processing_errors"
metric_kind = "GAUGE"
value_type = "INT64"
unit = "1"
}
resource "google_monitoring_metric_descriptor" "vm_automation_failures" {
description = "Number of VM automation failures"
display_name = "VM Automation Failures"
type = "custom.googleapis.com/airflow/vm_automation_failures"
metric_kind = "GAUGE"
value_type = "INT64"
unit = "1"
}
Results and Impact¶
The pipeline successfully processes a thousand of Reddit posts daily, generates concise and meaningful summaries, identifies key trends and discussions, maintains high accuracy in content filtering, provides real-time insights through a web interface.
Future Improvements¶
- Enhanced Analysis
- Integration with additional AI models
- Advanced visualization features
- Real-time analysis capabilities
- System Scalability
- Distributed processing implementation
- Enhanced caching mechanisms
- API endpoint for analysis access
- User Experience
- Interactive dashboard development
- Customizable analysis parameters
- Trend prediction features
Conclusion¶
This project showcases the power of automation in extracting valuable insights from social media using AI and data engineering, while handling real-world challenges. The open-source code is on GitHub, and welcome community contributions.