Episode 3 — NodeJS MongoDB Backend Architecture / 3.11 — Database Optimization

3.11.e — Advanced Aggregation

Advanced aggregation stages unlock capabilities that go far beyond simple grouping — parallel faceted searches, automatic bucketing, recursive graph traversals, and writing pipeline results directly to collections. These are the tools that power real-world analytics dashboards.


< 3.11.d — MongoDB Operators | Exercise Questions >


1. $facet — Parallel Pipelines

$facet runs multiple aggregation pipelines in parallel on the same input documents and returns the results as separate fields:

// Run three analyses at once
db.products.aggregate([
  { $match: { status: "active" } },
  { $facet: {
    // Pipeline 1: Category breakdown
    byCategory: [
      { $group: { _id: "$category", count: { $sum: 1 } } },
      { $sort: { count: -1 } }
    ],
    // Pipeline 2: Price statistics
    priceStats: [
      { $group: {
        _id: null,
        avgPrice: { $avg: "$price" },
        minPrice: { $min: "$price" },
        maxPrice: { $max: "$price" }
      }}
    ],
    // Pipeline 3: Pagination metadata
    totalCount: [
      { $count: "count" }
    ]
  }}
]);

// Output:
// {
//   byCategory: [{ _id: "electronics", count: 245 }, ...],
//   priceStats: [{ _id: null, avgPrice: 89.50, minPrice: 1.99, maxPrice: 2499 }],
//   totalCount: [{ count: 1200 }]
// }

Faceted Search (like e-commerce filters)

// Product listing with filter counts — a single query
const results = await Product.aggregate([
  { $match: { $text: { $search: "headphones" } } },
  { $facet: {
    // The actual paginated results
    results: [
      { $sort: { score: { $meta: "textScore" } } },
      { $skip: 0 },
      { $limit: 20 },
      { $project: { name: 1, price: 1, category: 1, rating: 1 } }
    ],
    // Filter counts for the sidebar
    brands: [
      { $group: { _id: "$brand", count: { $sum: 1 } } },
      { $sort: { count: -1 } }
    ],
    priceRanges: [
      { $bucket: {
        groupBy: "$price",
        boundaries: [0, 25, 50, 100, 250, 500, Infinity],
        default: "Other",
        output: { count: { $sum: 1 } }
      }}
    ],
    totalCount: [
      { $count: "total" }
    ]
  }}
]);

Note: Each sub-pipeline inside $facet cannot use indexes (only the $match before $facet uses indexes). Each sub-pipeline receives the full output of the previous stage.


2. $bucket and $bucketAuto — Data Bucketing

$bucket — Manual boundaries

Group documents into buckets with explicit boundaries:

db.products.aggregate([
  { $bucket: {
    groupBy: "$price",
    boundaries: [0, 25, 50, 100, 250, 500],   // 5 buckets
    default: "500+",                            // Catch-all for values >= 500
    output: {
      count: { $sum: 1 },
      avgPrice: { $avg: "$price" },
      products: { $push: "$name" }
    }
  }}
]);

// Output:
// [
//   { _id: 0,    count: 45,  avgPrice: 12.50, ... }   // $0 - $24.99
//   { _id: 25,   count: 82,  avgPrice: 37.20, ... }   // $25 - $49.99
//   { _id: 50,   count: 120, avgPrice: 72.80, ... }   // $50 - $99.99
//   { _id: 100,  count: 95,  avgPrice: 168.40, ... }  // $100 - $249.99
//   { _id: 250,  count: 38,  avgPrice: 345.60, ... }  // $250 - $499.99
//   { _id: "500+", count: 20, avgPrice: 899.99, ... } // $500+
// ]

$bucketAuto — Automatic boundaries

MongoDB automatically determines bucket boundaries to evenly distribute documents:

db.orders.aggregate([
  { $bucketAuto: {
    groupBy: "$total",
    buckets: 5,              // Create 5 evenly distributed buckets
    output: {
      count: { $sum: 1 },
      avgTotal: { $avg: "$total" }
    }
  }}
]);

// Output: MongoDB chooses boundaries to distribute docs evenly
// [
//   { _id: { min: 5.00,   max: 42.50 },  count: 200, avgTotal: 22.30 },
//   { _id: { min: 42.50,  max: 89.99 },  count: 198, avgTotal: 64.10 },
//   { _id: { min: 89.99,  max: 175.00 }, count: 201, avgTotal: 128.50 },
//   { _id: { min: 175.00, max: 450.00 }, count: 199, avgTotal: 290.80 },
//   { _id: { min: 450.00, max: 2500.00 },count: 202, avgTotal: 890.20 }
// ]

3. $graphLookup — Recursive Graph Traversal

$graphLookup performs recursive lookups — perfect for tree structures, org charts, social networks, and category hierarchies:

// Syntax
{
  $graphLookup: {
    from: "collection",           // Collection to search
    startWith: "$field",          // Starting value(s)
    connectFromField: "field",    // Field to recurse from
    connectToField: "field",      // Field to match against
    as: "outputField",            // Output array name
    maxDepth: 5,                  // Maximum recursion depth (optional)
    depthField: "depth"           // Field name for depth level (optional)
  }
}

Example: Organization Hierarchy

// employees collection:
// { _id: 1, name: "CEO", managerId: null }
// { _id: 2, name: "VP Engineering", managerId: 1 }
// { _id: 3, name: "Tech Lead", managerId: 2 }
// { _id: 4, name: "Developer", managerId: 3 }

// Find all reports (direct and indirect) under a manager
db.employees.aggregate([
  { $match: { name: "VP Engineering" } },
  { $graphLookup: {
    from: "employees",
    startWith: "$_id",
    connectFromField: "_id",
    connectToField: "managerId",
    as: "allReports",
    maxDepth: 10,
    depthField: "level"
  }}
]);

// Output:
// {
//   name: "VP Engineering",
//   allReports: [
//     { name: "Tech Lead", level: 0 },
//     { name: "Developer", level: 1 }
//   ]
// }

Example: Category Tree

// categories: { _id: "electronics", parent: null }
//             { _id: "phones", parent: "electronics" }
//             { _id: "smartphones", parent: "phones" }

db.categories.aggregate([
  { $match: { _id: "smartphones" } },
  { $graphLookup: {
    from: "categories",
    startWith: "$parent",
    connectFromField: "parent",
    connectToField: "_id",
    as: "ancestors"
  }}
]);
// ancestors: ["phones", "electronics"] — full breadcrumb path

4. $merge and $out — Writing Results to Collections

$out — Replace entire collection

// Write aggregation results to a new collection (replaces if exists)
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: {
    _id: { month: { $month: "$createdAt" }, year: { $year: "$createdAt" } },
    revenue: { $sum: "$total" },
    orderCount: { $sum: 1 }
  }},
  { $out: "monthly_reports" }     // Creates/replaces "monthly_reports" collection
]);

Warning: $out replaces the entire target collection. Use $merge for more control.

$merge — Upsert into collection

// Merge results into an existing collection
db.orders.aggregate([
  { $group: {
    _id: "$customerId",
    totalSpent: { $sum: "$total" },
    orderCount: { $sum: 1 },
    lastOrder: { $max: "$createdAt" }
  }},
  { $merge: {
    into: "customer_stats",
    on: "_id",                        // Match field
    whenMatched: "merge",             // Update existing docs
    whenNotMatched: "insert"          // Insert new docs
  }}
]);

whenMatched options:

  • "merge" — merge fields (keep existing, update matching)
  • "replace" — replace the entire document
  • "keepExisting" — do nothing if document exists
  • "fail" — throw error if document exists
  • [pipeline] — run a custom update pipeline

5. $expr — Using Expressions in Queries

$expr allows aggregation expressions inside find() and $match:

// Compare two fields in the same document
db.products.find({
  $expr: { $gt: ["$price", "$cost"] }
});
// Products where price > cost (both are fields, not constants)

// Compare field with computed value
db.orders.find({
  $expr: {
    $gt: ["$total", { $multiply: ["$subtotal", 1.5] }]
  }
});
// Orders where total > 1.5x subtotal (unusually high taxes/shipping)

// Use in $match within aggregation
db.inventory.aggregate([
  { $match: {
    $expr: { $lt: ["$stock", "$reorderLevel"] }
  }},
  { $project: { name: 1, stock: 1, reorderLevel: 1 } }
]);
// Items where stock has dropped below the reorder threshold

6. $cond and $switch — Conditional Logic

$cond — If/else

db.orders.aggregate([
  { $project: {
    total: 1,
    shippingType: {
      $cond: {
        if: { $gte: ["$total", 100] },
        then: "FREE",
        else: "STANDARD"
      }
    }
  }}
]);

// Shorthand array syntax: $cond: [condition, trueValue, falseValue]
db.orders.aggregate([
  { $addFields: {
    discount: { $cond: [{ $gte: ["$total", 200] }, 20, 0] }
  }}
]);

$switch — Multiple conditions

db.students.aggregate([
  { $addFields: {
    grade: {
      $switch: {
        branches: [
          { case: { $gte: ["$score", 90] }, then: "A" },
          { case: { $gte: ["$score", 80] }, then: "B" },
          { case: { $gte: ["$score", 70] }, then: "C" },
          { case: { $gte: ["$score", 60] }, then: "D" }
        ],
        default: "F"
      }
    }
  }}
]);

7. Performance Considerations

100MB Memory Limit

By default, each aggregation pipeline stage has a 100MB RAM limit. If a stage exceeds this, MongoDB throws an error.

// Enable disk usage for large datasets
db.orders.aggregate(
  [
    { $group: { _id: "$customerId", orders: { $push: "$$ROOT" } } },
    { $sort: { "orders.length": -1 } }
  ],
  { allowDiskUse: true }    // Spill to disk if > 100MB
);

// Mongoose
const results = await Order.aggregate([
  { $group: { _id: "$customerId", total: { $sum: "$total" } } },
  { $sort: { total: -1 } }
]).allowDiskUse(true);

Optimization Strategies

StrategyDetails
$match firstReduces documents for all subsequent stages
$project earlyDrop unneeded fields to reduce memory
Use indexes$match and $sort at the start can use indexes
Avoid $unwind on large arraysCreates N documents per array element
$limit after $sortMongoDB optimizes this into a top-N sort
allowDiskUse: trueFor datasets exceeding 100MB per stage
Consider $mergeMaterialize frequently-run reports into collections

Pipeline Explain

// Analyze aggregation performance
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$customerId", total: { $sum: "$amount" } } }
]).explain("executionStats");

8. MongoDB Atlas Aggregation Setup

Using Atlas UI

  1. Navigate to your cluster in MongoDB Atlas
  2. Click Collections > select your database and collection
  3. Click the Aggregation tab
  4. Build pipeline stages visually — Atlas shows a preview after each stage
  5. Export the pipeline as code (Node.js, Python, Java, etc.)

Atlas Charts

Atlas Charts can consume aggregation pipeline output for dashboards:

Atlas Dashboard Setup:
┌─────────────────────────────────────────────────┐
│ 1. Cluster > Charts > Add Dashboard             │
│ 2. Add Data Source > Select collection           │
│ 3. Choose chart type (bar, line, pie, etc.)      │
│ 4. Drag fields to axes                           │
│ 5. Add filters (these become $match stages)      │
│ 6. Customize and save                            │
└─────────────────────────────────────────────────┘

Atlas Triggers with Aggregation

// Atlas Scheduled Trigger — runs daily at midnight
// Generates a daily report using aggregation and writes to a reports collection
exports = async function() {
  const orders = context.services.get("mongodb-atlas")
    .db("myapp").collection("orders");

  const today = new Date();
  today.setHours(0, 0, 0, 0);
  const tomorrow = new Date(today);
  tomorrow.setDate(tomorrow.getDate() + 1);

  await orders.aggregate([
    { $match: { createdAt: { $gte: today, $lt: tomorrow } } },
    { $group: {
      _id: null,
      totalRevenue: { $sum: "$total" },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: "$total" }
    }},
    { $addFields: { date: today } },
    { $merge: { into: "daily_reports", on: "date", whenMatched: "replace" } }
  ]).toArray();
};

9. Putting It All Together — Analytics Dashboard Pipeline

// Complete analytics dashboard in a single query
const dashboard = await Order.aggregate([
  // Filter to relevant time period
  { $match: {
    createdAt: { $gte: thirtyDaysAgo },
    status: { $ne: "cancelled" }
  }},

  // Run all dashboard queries in parallel
  { $facet: {
    // Revenue over time (daily)
    revenueTimeline: [
      { $group: {
        _id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
        revenue: { $sum: "$total" },
        orders: { $sum: 1 }
      }},
      { $sort: { _id: 1 } }
    ],

    // Revenue by category
    categoryBreakdown: [
      { $unwind: "$items" },
      { $lookup: { from: "products", localField: "items.productId", foreignField: "_id", as: "product" } },
      { $unwind: "$product" },
      { $group: {
        _id: "$product.category",
        revenue: { $sum: { $multiply: ["$items.qty", "$items.price"] } }
      }},
      { $sort: { revenue: -1 } }
    ],

    // Order value distribution
    orderDistribution: [
      { $bucketAuto: { groupBy: "$total", buckets: 6, output: { count: { $sum: 1 } } } }
    ],

    // Summary stats
    summary: [
      { $group: {
        _id: null,
        totalRevenue: { $sum: "$total" },
        totalOrders: { $sum: 1 },
        avgOrderValue: { $avg: "$total" },
        maxOrder: { $max: "$total" }
      }}
    ]
  }}
]).allowDiskUse(true);

Key Takeaways

  1. $facet runs multiple pipelines in parallel — perfect for dashboards and faceted search
  2. $bucket/$bucketAuto automatically group data into ranges (price ranges, age groups, etc.)
  3. $graphLookup handles recursive/tree structures — org charts, categories, social graphs
  4. $merge is preferred over $out — it supports upsert behavior without replacing the entire collection
  5. $expr bridges queries and aggregation expressions — compare fields within the same document
  6. $cond/$switch add conditional logic to projections and computed fields
  7. 100MB memory limit per stage — use allowDiskUse: true for large datasets
  8. Atlas provides visual pipeline builders and can schedule aggregation triggers

Explain-It Challenge

Your company needs a weekly analytics report that shows: revenue by product category, customer segments (VIP/Regular/New) with spending summaries, geographic distribution of orders using a nested address structure, and month-over-month growth percentages. Design a complete aggregation pipeline using $facet, $bucket, $graphLookup (for category hierarchy), and $merge. Explain how you would schedule this report, handle the 100MB memory limit, and set it up in Atlas.