Episode 3 — NodeJS MongoDB Backend Architecture / 3.11 — Database Optimization
3.11.e — Advanced Aggregation
Advanced aggregation stages unlock capabilities that go far beyond simple grouping — parallel faceted searches, automatic bucketing, recursive graph traversals, and writing pipeline results directly to collections. These are the tools that power real-world analytics dashboards.
< 3.11.d — MongoDB Operators | Exercise Questions >
1. $facet — Parallel Pipelines
$facet runs multiple aggregation pipelines in parallel on the same input documents and returns the results as separate fields:
// Run three analyses at once
db.products.aggregate([
{ $match: { status: "active" } },
{ $facet: {
// Pipeline 1: Category breakdown
byCategory: [
{ $group: { _id: "$category", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
],
// Pipeline 2: Price statistics
priceStats: [
{ $group: {
_id: null,
avgPrice: { $avg: "$price" },
minPrice: { $min: "$price" },
maxPrice: { $max: "$price" }
}}
],
// Pipeline 3: Pagination metadata
totalCount: [
{ $count: "count" }
]
}}
]);
// Output:
// {
// byCategory: [{ _id: "electronics", count: 245 }, ...],
// priceStats: [{ _id: null, avgPrice: 89.50, minPrice: 1.99, maxPrice: 2499 }],
// totalCount: [{ count: 1200 }]
// }
Faceted Search (like e-commerce filters)
// Product listing with filter counts — a single query
const results = await Product.aggregate([
{ $match: { $text: { $search: "headphones" } } },
{ $facet: {
// The actual paginated results
results: [
{ $sort: { score: { $meta: "textScore" } } },
{ $skip: 0 },
{ $limit: 20 },
{ $project: { name: 1, price: 1, category: 1, rating: 1 } }
],
// Filter counts for the sidebar
brands: [
{ $group: { _id: "$brand", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
],
priceRanges: [
{ $bucket: {
groupBy: "$price",
boundaries: [0, 25, 50, 100, 250, 500, Infinity],
default: "Other",
output: { count: { $sum: 1 } }
}}
],
totalCount: [
{ $count: "total" }
]
}}
]);
Note: Each sub-pipeline inside $facet cannot use indexes (only the $match before $facet uses indexes). Each sub-pipeline receives the full output of the previous stage.
2. $bucket and $bucketAuto — Data Bucketing
$bucket — Manual boundaries
Group documents into buckets with explicit boundaries:
db.products.aggregate([
{ $bucket: {
groupBy: "$price",
boundaries: [0, 25, 50, 100, 250, 500], // 5 buckets
default: "500+", // Catch-all for values >= 500
output: {
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
products: { $push: "$name" }
}
}}
]);
// Output:
// [
// { _id: 0, count: 45, avgPrice: 12.50, ... } // $0 - $24.99
// { _id: 25, count: 82, avgPrice: 37.20, ... } // $25 - $49.99
// { _id: 50, count: 120, avgPrice: 72.80, ... } // $50 - $99.99
// { _id: 100, count: 95, avgPrice: 168.40, ... } // $100 - $249.99
// { _id: 250, count: 38, avgPrice: 345.60, ... } // $250 - $499.99
// { _id: "500+", count: 20, avgPrice: 899.99, ... } // $500+
// ]
$bucketAuto — Automatic boundaries
MongoDB automatically determines bucket boundaries to evenly distribute documents:
db.orders.aggregate([
{ $bucketAuto: {
groupBy: "$total",
buckets: 5, // Create 5 evenly distributed buckets
output: {
count: { $sum: 1 },
avgTotal: { $avg: "$total" }
}
}}
]);
// Output: MongoDB chooses boundaries to distribute docs evenly
// [
// { _id: { min: 5.00, max: 42.50 }, count: 200, avgTotal: 22.30 },
// { _id: { min: 42.50, max: 89.99 }, count: 198, avgTotal: 64.10 },
// { _id: { min: 89.99, max: 175.00 }, count: 201, avgTotal: 128.50 },
// { _id: { min: 175.00, max: 450.00 }, count: 199, avgTotal: 290.80 },
// { _id: { min: 450.00, max: 2500.00 },count: 202, avgTotal: 890.20 }
// ]
3. $graphLookup — Recursive Graph Traversal
$graphLookup performs recursive lookups — perfect for tree structures, org charts, social networks, and category hierarchies:
// Syntax
{
$graphLookup: {
from: "collection", // Collection to search
startWith: "$field", // Starting value(s)
connectFromField: "field", // Field to recurse from
connectToField: "field", // Field to match against
as: "outputField", // Output array name
maxDepth: 5, // Maximum recursion depth (optional)
depthField: "depth" // Field name for depth level (optional)
}
}
Example: Organization Hierarchy
// employees collection:
// { _id: 1, name: "CEO", managerId: null }
// { _id: 2, name: "VP Engineering", managerId: 1 }
// { _id: 3, name: "Tech Lead", managerId: 2 }
// { _id: 4, name: "Developer", managerId: 3 }
// Find all reports (direct and indirect) under a manager
db.employees.aggregate([
{ $match: { name: "VP Engineering" } },
{ $graphLookup: {
from: "employees",
startWith: "$_id",
connectFromField: "_id",
connectToField: "managerId",
as: "allReports",
maxDepth: 10,
depthField: "level"
}}
]);
// Output:
// {
// name: "VP Engineering",
// allReports: [
// { name: "Tech Lead", level: 0 },
// { name: "Developer", level: 1 }
// ]
// }
Example: Category Tree
// categories: { _id: "electronics", parent: null }
// { _id: "phones", parent: "electronics" }
// { _id: "smartphones", parent: "phones" }
db.categories.aggregate([
{ $match: { _id: "smartphones" } },
{ $graphLookup: {
from: "categories",
startWith: "$parent",
connectFromField: "parent",
connectToField: "_id",
as: "ancestors"
}}
]);
// ancestors: ["phones", "electronics"] — full breadcrumb path
4. $merge and $out — Writing Results to Collections
$out — Replace entire collection
// Write aggregation results to a new collection (replaces if exists)
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: { month: { $month: "$createdAt" }, year: { $year: "$createdAt" } },
revenue: { $sum: "$total" },
orderCount: { $sum: 1 }
}},
{ $out: "monthly_reports" } // Creates/replaces "monthly_reports" collection
]);
Warning: $out replaces the entire target collection. Use $merge for more control.
$merge — Upsert into collection
// Merge results into an existing collection
db.orders.aggregate([
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$total" },
orderCount: { $sum: 1 },
lastOrder: { $max: "$createdAt" }
}},
{ $merge: {
into: "customer_stats",
on: "_id", // Match field
whenMatched: "merge", // Update existing docs
whenNotMatched: "insert" // Insert new docs
}}
]);
whenMatched options:
"merge"— merge fields (keep existing, update matching)"replace"— replace the entire document"keepExisting"— do nothing if document exists"fail"— throw error if document exists[pipeline]— run a custom update pipeline
5. $expr — Using Expressions in Queries
$expr allows aggregation expressions inside find() and $match:
// Compare two fields in the same document
db.products.find({
$expr: { $gt: ["$price", "$cost"] }
});
// Products where price > cost (both are fields, not constants)
// Compare field with computed value
db.orders.find({
$expr: {
$gt: ["$total", { $multiply: ["$subtotal", 1.5] }]
}
});
// Orders where total > 1.5x subtotal (unusually high taxes/shipping)
// Use in $match within aggregation
db.inventory.aggregate([
{ $match: {
$expr: { $lt: ["$stock", "$reorderLevel"] }
}},
{ $project: { name: 1, stock: 1, reorderLevel: 1 } }
]);
// Items where stock has dropped below the reorder threshold
6. $cond and $switch — Conditional Logic
$cond — If/else
db.orders.aggregate([
{ $project: {
total: 1,
shippingType: {
$cond: {
if: { $gte: ["$total", 100] },
then: "FREE",
else: "STANDARD"
}
}
}}
]);
// Shorthand array syntax: $cond: [condition, trueValue, falseValue]
db.orders.aggregate([
{ $addFields: {
discount: { $cond: [{ $gte: ["$total", 200] }, 20, 0] }
}}
]);
$switch — Multiple conditions
db.students.aggregate([
{ $addFields: {
grade: {
$switch: {
branches: [
{ case: { $gte: ["$score", 90] }, then: "A" },
{ case: { $gte: ["$score", 80] }, then: "B" },
{ case: { $gte: ["$score", 70] }, then: "C" },
{ case: { $gte: ["$score", 60] }, then: "D" }
],
default: "F"
}
}
}}
]);
7. Performance Considerations
100MB Memory Limit
By default, each aggregation pipeline stage has a 100MB RAM limit. If a stage exceeds this, MongoDB throws an error.
// Enable disk usage for large datasets
db.orders.aggregate(
[
{ $group: { _id: "$customerId", orders: { $push: "$$ROOT" } } },
{ $sort: { "orders.length": -1 } }
],
{ allowDiskUse: true } // Spill to disk if > 100MB
);
// Mongoose
const results = await Order.aggregate([
{ $group: { _id: "$customerId", total: { $sum: "$total" } } },
{ $sort: { total: -1 } }
]).allowDiskUse(true);
Optimization Strategies
| Strategy | Details |
|---|---|
$match first | Reduces documents for all subsequent stages |
$project early | Drop unneeded fields to reduce memory |
| Use indexes | $match and $sort at the start can use indexes |
Avoid $unwind on large arrays | Creates N documents per array element |
$limit after $sort | MongoDB optimizes this into a top-N sort |
allowDiskUse: true | For datasets exceeding 100MB per stage |
Consider $merge | Materialize frequently-run reports into collections |
Pipeline Explain
// Analyze aggregation performance
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } }
]).explain("executionStats");
8. MongoDB Atlas Aggregation Setup
Using Atlas UI
- Navigate to your cluster in MongoDB Atlas
- Click Collections > select your database and collection
- Click the Aggregation tab
- Build pipeline stages visually — Atlas shows a preview after each stage
- Export the pipeline as code (Node.js, Python, Java, etc.)
Atlas Charts
Atlas Charts can consume aggregation pipeline output for dashboards:
Atlas Dashboard Setup:
┌─────────────────────────────────────────────────┐
│ 1. Cluster > Charts > Add Dashboard │
│ 2. Add Data Source > Select collection │
│ 3. Choose chart type (bar, line, pie, etc.) │
│ 4. Drag fields to axes │
│ 5. Add filters (these become $match stages) │
│ 6. Customize and save │
└─────────────────────────────────────────────────┘
Atlas Triggers with Aggregation
// Atlas Scheduled Trigger — runs daily at midnight
// Generates a daily report using aggregation and writes to a reports collection
exports = async function() {
const orders = context.services.get("mongodb-atlas")
.db("myapp").collection("orders");
const today = new Date();
today.setHours(0, 0, 0, 0);
const tomorrow = new Date(today);
tomorrow.setDate(tomorrow.getDate() + 1);
await orders.aggregate([
{ $match: { createdAt: { $gte: today, $lt: tomorrow } } },
{ $group: {
_id: null,
totalRevenue: { $sum: "$total" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total" }
}},
{ $addFields: { date: today } },
{ $merge: { into: "daily_reports", on: "date", whenMatched: "replace" } }
]).toArray();
};
9. Putting It All Together — Analytics Dashboard Pipeline
// Complete analytics dashboard in a single query
const dashboard = await Order.aggregate([
// Filter to relevant time period
{ $match: {
createdAt: { $gte: thirtyDaysAgo },
status: { $ne: "cancelled" }
}},
// Run all dashboard queries in parallel
{ $facet: {
// Revenue over time (daily)
revenueTimeline: [
{ $group: {
_id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
revenue: { $sum: "$total" },
orders: { $sum: 1 }
}},
{ $sort: { _id: 1 } }
],
// Revenue by category
categoryBreakdown: [
{ $unwind: "$items" },
{ $lookup: { from: "products", localField: "items.productId", foreignField: "_id", as: "product" } },
{ $unwind: "$product" },
{ $group: {
_id: "$product.category",
revenue: { $sum: { $multiply: ["$items.qty", "$items.price"] } }
}},
{ $sort: { revenue: -1 } }
],
// Order value distribution
orderDistribution: [
{ $bucketAuto: { groupBy: "$total", buckets: 6, output: { count: { $sum: 1 } } } }
],
// Summary stats
summary: [
{ $group: {
_id: null,
totalRevenue: { $sum: "$total" },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: "$total" },
maxOrder: { $max: "$total" }
}}
]
}}
]).allowDiskUse(true);
Key Takeaways
$facetruns multiple pipelines in parallel — perfect for dashboards and faceted search$bucket/$bucketAutoautomatically group data into ranges (price ranges, age groups, etc.)$graphLookuphandles recursive/tree structures — org charts, categories, social graphs$mergeis preferred over$out— it supports upsert behavior without replacing the entire collection$exprbridges queries and aggregation expressions — compare fields within the same document$cond/$switchadd conditional logic to projections and computed fields- 100MB memory limit per stage — use
allowDiskUse: truefor large datasets - Atlas provides visual pipeline builders and can schedule aggregation triggers
Explain-It Challenge
Your company needs a weekly analytics report that shows: revenue by product category, customer segments (VIP/Regular/New) with spending summaries, geographic distribution of orders using a nested address structure, and month-over-month growth percentages. Design a complete aggregation pipeline using
$facet,$bucket,$graphLookup(for category hierarchy), and$merge. Explain how you would schedule this report, handle the 100MB memory limit, and set it up in Atlas.