Agent Skill · MongoDB

mongodb-atlas-stream-processing

Manages MongoDB Atlas Stream Processing (ASP) workflows. Handles workspace provisioning, data source/sink connections, processor lifecycle operations, debugging diagnostics, and tier sizing. Supports Kafka, Atlas clusters, S3, HTTPS, and Lambda integrations for streaming data workloads and event processing. NOT for general MongoDB queries or Atlas cluster management. Requires MongoDB MCP Server with Atlas API credentials.

Provider: MongoDB Path in repo: skills/mongodb-atlas-stream-processing/SKILL.md

Skill body

MongoDB Atlas Streams

Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.

Prerequisites

This skill requires the MongoDB MCP Server connected with:

The 4 tools: atlas-streams-discover, atlas-streams-build, atlas-streams-manage, atlas-streams-teardown.

All operations require an Atlas project ID. If unknown, call atlas-list-projects first to find your project ID.

If MCP tools are unavailable

If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.

Tool Selection Matrix

atlas-streams-discover — ALL read operations

| Action | Use when | |——–|———-| | list-workspaces | See all workspaces in a project | | inspect-workspace | Review workspace config, state, region | | list-connections | See all connections in a workspace | | inspect-connection | Check connection state, config, health | | list-processors | See all processors in a workspace | | inspect-processor | Check processor state, pipeline, config | | diagnose-processor | Full health report: state, stats, errors | | get-networking | PrivateLink and VPC peering details. Optional: cloudProvider + region to get Atlas account details for PrivateLink setup |

Pagination (all list actions): limit (1-100, default 20), pageNum (default 1). Response format: responseFormat"concise" (default for list actions) or "detailed" (default for inspect/diagnose).

atlas-streams-build — ALL create operations

| Resource | Key parameters | |———-|—————| | workspace | cloudProvider, region, tier (default SP10), includeSampleData | | connection | connectionName, connectionType (Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample), connectionConfig | | processor | processorName, pipeline (must start with $source, end with $merge/$emit), dlq, autoStart | | privatelink | privateLinkConfig (project-level, not tied to a specific workspace) |

Field mapping — only fill fields for the selected resource type:

atlas-streams-manage — ALL update/state operations

| Action | Notes | |——–|——-| | start-processor | Begins billing. Optional tier override, resumeFromCheckpoint | | stop-processor | Stops billing. Retains state 45 days | | modify-processor | Processor must be stopped first. Change pipeline, DLQ, or name | | update-workspace | Change tier or region | | update-connection | Update config (networking is immutable — must delete and recreate) | | accept-peering / reject-peering | VPC peering management |

Field mapping — always fill projectId, workspaceName, then by action:

State pre-checks:

Processor states: CREATEDSTARTED (via start) → STOPPED (via stop). Can also enter FAILED on runtime errors. Modify requires STOPPED or CREATED state.

Teardown safety checks:

atlas-streams-teardown — ALL delete operations

| Resource | Safety behavior | |———-|—————-| | processor | Auto-stops before deleting | | connection | Blocks if referenced by running processor | | workspace | Cascading delete of all connections and processors | | privatelink / peering | Remove networking resources |

Field mapping — always fill projectId, resource, then:

Before deleting a workspace, inspect it first:

  1. atlas-streams-discoverinspect-workspace — get connection/processor counts
  2. Present to user: “Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?”
  3. Wait for confirmation before calling atlas-streams-teardown

CRITICAL: Validate Before Creating Processors

You MUST call search-knowledge before composing any processor pipeline. This is not optional.

Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with example_processors/README.md for the full pattern catalog.

Key quickstarts: | Quickstart | Pattern | |———–|———| | 00_hello_world.json | Inline $source.documents with $match (zero infra, ephemeral) | | 01_changestream_basic.json | Change stream → tumbling window → $merge to Atlas | | 03_kafka_to_mongo.json | Kafka source → tumbling window rollup → $merge to Atlas | | 04_mongo_to_mongo.json | Chained processors: rollup → archive to separate collection | | 05_kafka_tail.json | Real-time Kafka topic monitoring (sinkless, like tail -f) |

Pipeline Rules & Warnings

Invalid constructs — these are NOT valid in streaming pipelines:

Required fields by stage:

See references/pipeline-patterns.md for stage field examples with JSON syntax.

SchemaRegistry connection: connectionType must be "SchemaRegistry" (not "Kafka"). Schema type values are case-sensitive (use lowercase avro, not AVRO). See references/connection-configs.md for required fields and auth types.

MCP Tool Behaviors

Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.

Auto-normalization:

Workspace creation: includeSampleData defaults to true, which auto-creates the sample_stream_solar connection.

Region naming: The region field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic dataProcessRegion error.

Provider Cloud Region Streams region Value
AWS us-east-1 VIRGINIA_USA
AWS us-east-2 OHIO_USA
AWS eu-west-1 DUBLIN_IRL
GCP us-central1 US_CENTRAL1
GCP europe-west1 EUROPE_WEST1
Azure eastus eastus
Azure westeurope westeurope

See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with atlas-streams-discoverinspect-workspace and check dataProcessRegion.region.

Connection Capabilities — Source/Sink Reference

Know what each connection type can do before creating pipelines:

Connection Type As Source ($source) As Sink ($merge / $emit) Mid-Pipeline Notes
Cluster ✅ Change streams ✅ $merge to collections ✅ $lookup Change streams monitor insert/update/delete/replace operations
Kafka ✅ Topic consumer ✅ $emit to topics Source MUST include topic field
Sample Stream ✅ Sample data ❌ Not valid Testing/demo only
S3 ❌ Not valid ✅ $emit to buckets Sink only - use path, format, compression. Supports AWS PrivateLink.
Https ❌ Not valid ✅ $https as sink ✅ $https enrichment Can be used mid-pipeline for enrichment OR as final sink stage
AWSLambda ❌ Not valid ✅ $externalFunction (async only) ✅ $externalFunction (sync or async) Sink: execution: "async" required. Mid-pipeline: execution: "sync" or "async"
AWS Kinesis ✅ Stream consumer ✅ $emit to streams Similar to Kafka pattern
SchemaRegistry ❌ Not valid ❌ Not valid ✅ Schema resolution Metadata only - used by Kafka connections for Avro schemas

Common connection usage mistakes to avoid:

See references/connection-configs.md for detailed connection configuration schemas by type.

Core Workflows

Setup from scratch

  1. atlas-streams-discoverlist-workspaces (check existing)
  2. atlas-streams-buildresource: "workspace" (region near data, SP10 for dev)
  3. atlas-streams-buildresource: "connection" (for each source/sink/enrichment)
  4. Validate connections: atlas-streams-discoverlist-connections + inspect-connection for each — verify names match targets, present summary to user
  5. Call search-knowledge to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
  6. atlas-streams-buildresource: "processor" (with DLQ configured)
  7. atlas-streams-managestart-processor (warn about billing)

Workflow Patterns

Incremental pipeline development (recommended): See references/development-workflow.md for the full 5-phase lifecycle.

  1. Start with basic $source$merge pipeline (validate connectivity)
  2. Add $match stages (validate filtering)
  3. Add $addFields / $project transforms (validate reshaping)
  4. Add windowing or enrichment (validate aggregation logic)
  5. Add error handling / DLQ configuration

Modify a processor pipeline:

  1. atlas-streams-manageaction: "stop-processor"processor MUST be stopped first
  2. atlas-streams-manageaction: "modify-processor" — provide new pipeline
  3. atlas-streams-manageaction: "start-processor" — restart

Debug a failing processor:

  1. atlas-streams-discoverdiagnose-processor — one-shot health report. Always call this first.
  2. Commit to a specific root cause. Match symptoms to diagnostic patterns:
    • Error 419 + “no partitions found” → Kafka topic doesn’t exist or is misspelled
    • State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
    • State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add partitionIdleTimeout to Kafka $source (e.g., {"size": 30, "unit": "second"})
    • State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
    • High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
    • DLQ count increasing → per-document errors; use MongoDB find on DLQ collection See references/output-diagnostics.md for the full pattern table.
  3. Classify processor type before interpreting output volume (alert vs transformation vs filter).
  4. Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
  5. If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.

Chained processors (multi-sink pattern)

CRITICAL: A single pipeline can only have ONE terminal sink ($merge or $emit). When users request multiple output destinations (e.g., “write to Atlas AND emit to Kafka”), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.

Pre-Deploy & Post-Deploy Checklists

See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.

Tier Sizing & Performance

See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.

Troubleshooting

See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.

Billing & Cost

Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.

Safety Rules

Reference Files

File Read when…
references/pipeline-patterns.md Building or modifying processor pipelines
references/connection-configs.md Creating connections (type-specific schemas)
references/development-workflow.md Following lifecycle management or debugging decision trees
references/output-diagnostics.md Processor output is unexpected (zero, low, or wrong)
references/sizing-and-parallelism.md Choosing tiers, tuning parallelism, or optimizing cost

Skill frontmatter

license: Apache-2.0 metadata: {"version"=>"1.0.0", "user-invocable"=>"true"}