Skip to main content

Platform MCPs

The Koveria Platform provides 4 remote MCP services for core infrastructure operations. These services run as separate Kubernetes pods, providing scalable, secure access to cloud storage, HTTP APIs, email delivery, and PDF processing.

Overview

Complete Service Catalog

ServicePurposeToolsLatencyCost per Call
cloud_storageFile operations on S3/GCS/NFS6 tools10-30ms$0.001
http_clientGeneric HTTP client for external APIs4 tools20-100ms$0.001
email_sendEmail delivery via SendGrid/AWS SES2 tools50-200ms$0.01
pdf_processingPDF text extraction and metadata3 tools100-500ms$0.01

Classification

All platform MCPs share the same classification:

DimensionValueMeaning
ExecutionremoteSeparate K8s pod (5-50ms latency)
CategoryplatformCore infrastructure services
OwnershipcoreKoveria-maintained (official support)

Full classification: Core Platform Service (Remote)

See 3D Classification for complete framework.

Common Characteristics

Performance

  • Latency: 5-50ms base (+ external service latency)
  • Throughput: Scales horizontally (1-10+ replicas per service)
  • Cost: $0.001-$0.01 per call (tracked to team_id)

Deployment

  • Namespace: koveria-mcp (isolated from agents)
  • Scaling: Independent horizontal pod autoscaler
  • Health Checks: Automatic health monitoring and circuit breaker
  • Updates: Rolling updates, zero-downtime deployments

Security

  • Network Policies: Strict ingress/egress rules
  • Secrets: API keys stored in Vault, injected at runtime
  • Audit Logging: All calls logged with org_id, team_id, agent_id
  • Rate Limiting: Per-team quotas enforced by MCP Router

Usage Pattern

# Agent code
from koveria import Agent, agent_action

class MyAgent(Agent):
@agent_action(action_type="process_document")
async def process_document(self, file_path: str) -> dict:
# All platform MCPs are available via self.mcp

# 1. Read file from cloud storage (remote call ~20ms)
file_content = await self.mcp.cloud_storage.read_file(file_path)

# 2. Extract text from PDF (remote call ~200ms)
text = await self.mcp.pdf_processing.extract_text(file_content)

# 3. Call external API (remote call ~100ms)
api_result = await self.mcp.http_client.post(
url="https://api.example.com/analyze",
data={"text": text}
)

# 4. Send confirmation email (remote call ~150ms)
await self.mcp.email_send.send_email(
to="user@example.com",
subject="Document processed",
body=f"Processed {file_path}"
)

# Total latency: ~470ms (all remote calls)
# Total cost: $0.023 ($0.001 + $0.01 + $0.001 + $0.01)

return {"status": "complete"}

Service Details

cloud_storage

Purpose: Manage files in object storage (S3, Google Cloud Storage, NFS).

Classification: remote + platform + core

Supported Providers:

  • AWS S3 - Amazon S3 or S3-compatible storage (MinIO, etc.)
  • Google Cloud Storage (GCS) - Google Drive via Service Account
  • NFS - Network File System (user-space via libnfs)

Tools (6):

# Read file
content = await mcp.cloud_storage.read_file(
path="/workspace/teams/team-123/documents/invoice.pdf"
)
# Returns: {"content": "base64...", "size": 12345, "mime_type": "application/pdf"}

# Write file
await mcp.cloud_storage.write_file(
path="/workspace/teams/team-123/output/report.txt",
content="Report text content...",
overwrite=True
)

# List directory
files = await mcp.cloud_storage.list_directory(
path="/workspace/teams/team-123/invoices",
pattern="*.pdf",
recursive=False
)
# Returns: [{"path": "invoice_001.pdf", "size": 12345, "modified": "2025-01-03T10:00:00Z"}]

# Check file exists
exists = await mcp.cloud_storage.file_exists(
path="/workspace/teams/team-123/data.csv"
)
# Returns: True/False

# Get file metadata
metadata = await mcp.cloud_storage.get_file_info(
path="/workspace/teams/team-123/contract.pdf"
)
# Returns: {"size": 54321, "modified": "...", "mime_type": "application/pdf"}

# Delete file
await mcp.cloud_storage.delete_file(
path="/workspace/teams/team-123/temp.txt",
confirm=True
)

Real-world example:

# Invoice processing workflow
@agent_action(action_type="process_invoices")
async def process_invoices(self) -> dict:
# 1. List all unprocessed invoices
invoices = await self.mcp.cloud_storage.list_directory(
path="/workspace/teams/team-finance/invoices/inbox",
pattern="*.pdf"
)

processed = []
for invoice in invoices:
# 2. Read invoice file
file_content = await self.mcp.cloud_storage.read_file(
path=f"/workspace/teams/team-finance/invoices/inbox/{invoice['path']}"
)

# 3. Extract text from PDF
text = await self.mcp.pdf_processing.extract_text(
pdf_base64=file_content["content"]
)

# 4. Parse invoice data (using LLM)
invoice_data = await self.llm.extract_structured(
text=text["text"],
schema={"invoice_id": "string", "total": "number", "date": "date"}
)

# 5. Save structured data
await self.mcp.cloud_storage.write_file(
path=f"/workspace/teams/team-finance/invoices/processed/{invoice_data['invoice_id']}.json",
content=await self.mcp.json_yaml.to_json(invoice_data, pretty=True)
)

# 6. Move original to archive
original_content = await self.mcp.cloud_storage.read_file(
path=f"/workspace/teams/team-finance/invoices/inbox/{invoice['path']}"
)
await self.mcp.cloud_storage.write_file(
path=f"/workspace/teams/team-finance/invoices/archive/{invoice['path']}",
content=original_content["content"]
)
await self.mcp.cloud_storage.delete_file(
path=f"/workspace/teams/team-finance/invoices/inbox/{invoice['path']}",
confirm=True
)

processed.append(invoice_data)

return {"processed_count": len(processed), "invoices": processed}

Configuration:

# Admin GUI - Org-level config
mcps:
cloud_storage:
provider: s3 # s3, gcs, nfs
region: us-east-1
bucket_prefix: koveria-data-
allowed_base_paths:
- /workspace/teams/{team_id}
max_file_size_mb: 100
allowed_extensions:
- txt
- pdf
- json
- csv
blocked_extensions:
- exe
- sh
rate_limit:
calls_per_minute: 100
calls_per_day: 10000

File path structure:

/workspace/teams/{team_id}/           # Team-scoped storage
/workspace/teams/team-123/documents/ # Documents folder
/workspace/teams/team-123/uploads/ # Uploads folder
/workspace/teams/team-123/archive/ # Archive folder

Security:

  • ✅ Team-scoped access (agents can only access their team's folder)
  • ✅ Path validation (prevents directory traversal attacks)
  • ✅ Extension filtering (blocks dangerous file types)
  • ✅ Size limits (prevents abuse)
  • ✅ Audit logging (all operations logged)

Files: mcp-services/cloud-storage/


http_client

Purpose: Make HTTP requests to external APIs and web services.

Classification: remote + platform + core

Tools (4):

# GET request
result = await mcp.http_client.get(
url="https://api.github.com/repos/anthropics/koveria",
headers={"Authorization": "token ghp_..."}
)
# Returns: {"status": 200, "text": "{...}", "json": {...}, "headers": {...}}

# POST request
result = await mcp.http_client.post(
url="https://api.example.com/webhooks",
data={"event": "order_created", "order_id": "12345"},
headers={"X-API-Key": "secret"}
)

# PUT request
result = await mcp.http_client.put(
url="https://api.example.com/users/123",
data={"name": "Alice", "email": "alice@example.com"}
)

# DELETE request
result = await mcp.http_client.delete(
url="https://api.example.com/orders/456",
headers={"Authorization": "Bearer token"}
)

Real-world example:

# Webhook notification workflow
@agent_action(action_type="send_webhook_notification")
async def send_webhook_notification(self, event: dict) -> dict:
# 1. Call customer's webhook endpoint
response = await self.mcp.http_client.post(
url=event["webhook_url"],
data={
"event_type": event["type"],
"timestamp": await self.mcp.datetime.now(),
"data": event["data"]
},
headers={
"X-Koveria-Signature": await self.mcp.hashing.sha256(
event["webhook_secret"] + event["data"]
)
}
)

# 2. Handle response
if response["status"] == 200:
# Webhook succeeded
await self.memory.set(
f"webhook:{event['id']}",
{"status": "delivered", "timestamp": await self.mcp.datetime.now()}
)
return {"status": "success"}
else:
# Webhook failed - retry logic
retry_count = await self.memory.get(f"webhook:{event['id']}:retries", default=0)

if retry_count < 3:
# Schedule retry with exponential backoff
await self.memory.set(f"webhook:{event['id']}:retries", retry_count + 1)

# Calculate backoff delay: 2^retry_count minutes (1min, 2min, 4min)
backoff_seconds = 60 * (2 ** retry_count)
retry_at = await self.mcp.datetime.add_seconds(
await self.mcp.datetime.now(),
backoff_seconds
)

# Store retry schedule in memory
await self.memory.set(
f"webhook:{event['id']}:next_retry",
{"scheduled_at": retry_at, "attempt": retry_count + 1},
ttl=backoff_seconds + 300 # TTL slightly longer than backoff
)

return {
"status": "retry_scheduled",
"attempt": retry_count + 1,
"next_retry_at": retry_at
}
else:
# Max retries exceeded - alert
await self.mcp.email_send.send_email(
to="ops@company.com",
subject=f"Webhook delivery failed: {event['id']}",
body=f"Failed to deliver webhook to {event['webhook_url']} after 3 attempts"
)
return {"status": "failed", "retries": retry_count}

Configuration:

mcps:
http_client:
allowed_domains:
- "*.example.com"
- "api.stripe.com"
- "api.salesforce.com"
blocked_domains:
- "*.malicious.com"
max_request_size_mb: 10
max_response_size_mb: 50
timeout_seconds: 30
follow_redirects: true
max_redirects: 5
verify_ssl: true
rate_limit:
calls_per_minute: 100
calls_per_day: 10000

Security:

  • ✅ Domain allowlist (only approved domains)
  • ✅ SSL verification (enforce HTTPS and valid certificates)
  • ✅ Size limits (prevent abuse)
  • ✅ Timeout protection (prevent hanging requests)
  • ✅ Rate limiting (per-team quotas)
  • ✅ Audit logging (all HTTP calls logged with full URL)

URL normalization: Automatically prepends https:// if missing

InputNormalized
https://example.comhttps://example.com
www.example.comhttps://www.example.com
example.com/apihttps://example.com/api

Files: mcp-services/http-client/


email_send

Purpose: Send transactional emails via SendGrid or AWS SES.

Classification: remote + platform + core

Supported Providers:

  • SendGrid - Via API (recommended)
  • AWS SES - Via SMTP or API

Tools (2):

# Send plain text email
await mcp.email_send.send_email(
to="customer@example.com",
subject="Order Confirmation",
body="Your order #12345 has been confirmed."
)

# Send HTML email with attachments
await mcp.email_send.send_html_email(
to=["customer@example.com", "manager@example.com"],
cc=["support@company.com"],
subject="Invoice #12345",
html_body="""
<h1>Invoice #12345</h1>
<p>Thank you for your order!</p>
<p>Total: $250.00</p>
""",
attachments=[
{
"filename": "invoice_12345.pdf",
"content": invoice_pdf_base64,
"mime_type": "application/pdf"
}
],
reply_to="billing@company.com"
)

Real-world example:

# Order confirmation workflow
@agent_action(action_type="send_order_confirmation")
async def send_order_confirmation(self, order: dict) -> dict:
# 1. Format the order date
formatted_date = await self.mcp.datetime.format(
date_string=order["date"],
format="%B %d, %Y"
)

# 2. Calculate delivery date
delivery_date = await self.mcp.business_date.add_business_days(
start_date=order["date"],
days=5
)

# 3. Send confirmation email
await self.mcp.email_send.send_html_email(
to=order["customer_email"],
subject=f"Order Confirmation #{order['id']}",
html_body=f"""
<h2>Thank you for your order!</h2>
<p>Dear {order['customer_name']},</p>
<p>Your order #{order['id']} has been confirmed.</p>
<p>Order date: {formatted_date}</p>
<p>Order total: ${order['total']:.2f}</p>
<p>Expected delivery: {delivery_date}</p>
<p>Best regards,<br/>The Team</p>
""",
reply_to="support@company.com"
)

# 3. Log email sent
await self.memory.set(
f"order:{order['id']}:confirmation_sent",
{"timestamp": await self.mcp.datetime.now(), "to": order["customer_email"]}
)

return {"status": "sent", "order_id": order["id"]}

Configuration:

mcps:
email_send:
provider: sendgrid # sendgrid, aws_ses
sendgrid_api_key: sg.xxx # From Vault
from_email: noreply@company.com
from_name: Koveria Platform
max_recipients: 50
max_attachment_size_mb: 25
rate_limit:
calls_per_minute: 60
calls_per_day: 5000

Security:

  • ✅ API key stored in Vault (never in code)
  • ✅ SPF/DKIM/DMARC validation (sender authentication)
  • ✅ Recipient limits (prevent spam)
  • ✅ Attachment size limits (prevent abuse)
  • ✅ Rate limiting (per-team quotas)
  • ✅ Audit logging (all emails logged with recipients)

Cost: ~$0.01 per email (SendGrid charges apply)

Files: mcp-services/email-send/


pdf_processing

Purpose: Extract text and metadata from PDF documents.

Classification: remote + platform + core

Dependencies:

  • Poppler (poppler-utils 22.02) - PDF rendering and text extraction
  • PyPDF2 - PDF metadata and structure parsing

Tools (3):

# Extract full text from PDF
result = await mcp.pdf_processing.extract_text(
pdf_base64=invoice_file_content
)
# Returns: {
# "text": "INVOICE #12345\nDate: 2025-01-03...",
# "pages": 3,
# "character_count": 1247
# }

# Get PDF metadata
metadata = await mcp.pdf_processing.get_metadata(
pdf_base64=contract_file_content
)
# Returns: {
# "title": "Service Agreement",
# "author": "Legal Department",
# "pages": 12,
# "created": "2025-01-01T10:00:00Z",
# "modified": "2025-01-03T14:30:00Z"
# }

# Search for text in PDF
search_results = await mcp.pdf_processing.search_text(
pdf_base64=report_file_content,
query="revenue"
)
# Returns: {
# "matches": [
# {"page": 2, "context": "Q4 revenue increased by 25%..."},
# {"page": 5, "context": "Total revenue for FY2024..."}
# ],
# "total_matches": 5
# }

Real-world example:

# Contract analysis workflow
@agent_action(action_type="analyze_contract")
async def analyze_contract(self, contract_path: str) -> dict:
# 1. Read contract PDF from cloud storage
contract_file = await self.mcp.cloud_storage.read_file(
path=f"/workspace/teams/team-legal/contracts/{contract_path}"
)

# 2. Get metadata (check page count, author)
metadata = await self.mcp.pdf_processing.get_metadata(
pdf_base64=contract_file["content"]
)

# 3. Extract full text
content = await self.mcp.pdf_processing.extract_text(
pdf_base64=contract_file["content"]
)

# 4. Search for key terms
liability_matches = await self.mcp.pdf_processing.search_text(
pdf_base64=contract_file["content"],
query="liability"
)

termination_matches = await self.mcp.pdf_processing.search_text(
pdf_base64=contract_file["content"],
query="termination"
)

# 5. Use LLM to extract structured data
contract_data = await self.llm.extract_structured(
text=content["text"],
schema={
"contract_number": "string",
"parties": ["string"],
"effective_date": "date",
"expiration_date": "date",
"payment_terms": "string",
"liability_cap": "number",
"auto_renewal": "boolean",
"termination_notice_days": "number"
}
)

# 6. Flag high-risk contracts
is_high_risk = (
contract_data["liability_cap"] > 1_000_000 or
contract_data["termination_notice_days"] < 30
)

if is_high_risk:
await self.mcp.email_send.send_email(
to="legal@company.com",
subject=f"High-Risk Contract Alert: {contract_data['contract_number']}",
body=f"""
Contract {contract_data['contract_number']} requires review:
- Liability cap: ${contract_data['liability_cap']:,}
- Termination notice: {contract_data['termination_notice_days']} days
- Found {len(liability_matches['matches'])} liability clauses
"""
)

# 7. Save analysis results
await self.mcp.cloud_storage.write_file(
path=f"/workspace/teams/team-legal/analysis/{contract_path}.json",
content=await self.mcp.json_yaml.to_json({
"metadata": metadata,
"contract_data": contract_data,
"risk_flags": {
"is_high_risk": is_high_risk,
"liability_clauses": len(liability_matches["matches"]),
"termination_clauses": len(termination_matches["matches"])
}
}, pretty=True)
)

return {
"contract_number": contract_data["contract_number"],
"is_high_risk": is_high_risk,
"pages": metadata["pages"],
"analysis_saved": True
}

Configuration:

mcps:
pdf_processing:
max_file_size_mb: 50
max_pages: 500
timeout_seconds: 120
rate_limit:
calls_per_minute: 20
calls_per_day: 1000

Limitations:

  • ❌ Password-protected PDFs (fails with clear error)
  • ❌ Scanned documents without OCR (returns empty text)
  • ❌ Complex tables (text extraction loses structure - use LLM to parse)
  • ❌ Embedded images (not extracted)
  • ❌ Form filling (not supported)

Performance: ~200ms for small PDFs (<10 pages), ~500ms for large PDFs (50+ pages)

Cost: $0.01 per PDF processed

Files: mcp-services/pdf-processing/


Best Practices

1. Use Platform MCPs for External Integrations

Platform MCPs handle external dependencies with proper error handling:

# Good: Use platform MCP for HTTP calls
try:
result = await self.mcp.http_client.get(
url="https://api.example.com/data"
)
# Platform handles timeouts, retries, circuit breaker
except Exception as e:
# Handle API errors gracefully
pass

# Bad: Direct HTTP calls from agent (no rate limiting, no audit)
# import httpx
# result = await httpx.get("https://api.example.com/data")

2. Combine Platform MCPs for Multi-Step Workflows

Chain platform calls for complex operations:

@agent_action(action_type="process_and_notify")
async def process_and_notify(self, file_path: str, recipient: str) -> dict:
# Step 1: Read file (cloud_storage)
file_content = await self.mcp.cloud_storage.read_file(file_path)

# Step 2: Extract text (pdf_processing)
text = await self.mcp.pdf_processing.extract_text(file_content["content"])

# Step 3: Call external API (http_client)
api_result = await self.mcp.http_client.post(
url="https://api.example.com/analyze",
data={"text": text["text"]}
)

# Step 4: Send notification (email_send)
await self.mcp.email_send.send_email(
to=recipient,
subject="Analysis Complete",
body=f"Analysis result: {api_result['json']['result']}"
)

return {"status": "complete"}

3. Handle Rate Limits Gracefully

Platform MCPs have rate limits - implement batching and retry logic:

@agent_action(action_type="batch_process_invoices")
async def batch_process_invoices(self, invoice_paths: list[str]) -> dict:
results = []
errors = []

# Process in batches of 10 (respect rate limits)
for i in range(0, len(invoice_paths), 10):
batch = invoice_paths[i:i+10]

for path in batch:
try:
# Each PDF processing call costs $0.01
file_content = await self.mcp.cloud_storage.read_file(path)
text = await self.mcp.pdf_processing.extract_text(
file_content["content"]
)
results.append({"path": path, "status": "success"})
except Exception as e:
errors.append({"path": path, "error": str(e)})

# Brief pause between batches to avoid rate limits
if i + 10 < len(invoice_paths):
await asyncio.sleep(1)

return {
"processed": len(results),
"errors": len(errors),
"total_cost": len(results) * 0.01 # $0.01 per PDF
}

4. Use Caching for Repeated Reads

Cache frequently accessed files to reduce costs:

@agent_action(action_type="get_cached_file")
async def get_cached_file(self, file_path: str) -> str:
# Check agent memory cache first
cached = await self.memory.get(f"file:{file_path}")

if cached:
return cached["content"]

# Cache miss - fetch from cloud storage
file_content = await self.mcp.cloud_storage.read_file(file_path)

# Cache for 1 hour (3600 seconds)
await self.memory.set(
f"file:{file_path}",
{"content": file_content["content"]},
ttl=3600
)

return file_content["content"]

Performance & Cost Analysis

Latency Breakdown

OperationBase LatencyExternal ServiceTotal
cloud_storage.read_file10ms+10ms (S3)20ms
http_client.get5ms+50-500ms (API)55-505ms
email_send.send_email50ms+100ms (SendGrid)150ms
pdf_processing.extract_text100ms+100ms (Poppler)200ms

Cost per Call

ServiceCostNotes
cloud_storage$0.001Per read/write operation
http_client$0.001Per HTTP request
email_send$0.01Per email sent (+ SendGrid fees)
pdf_processing$0.01Per PDF processed

Cost Optimization Tips

  1. Batch operations - Process multiple files in a single workflow
  2. Cache aggressively - Store frequently accessed data in agent memory
  3. Use stdlib for local ops - Use local MCPs for JSON parsing, date formatting, etc. (free)
  4. Compress files - Smaller files = faster transfers = lower costs

Troubleshooting

MCP Service Unavailable

Error:

MCPError: Service cloud_storage unavailable

Cause: MCP pod is down or unhealthy.

Solution:

  1. Check MCP pod status:

    kubectl get pods -n koveria-mcp
    kubectl logs -n koveria-mcp cloud-storage-xxx
  2. Check circuit breaker status in Admin GUI

  3. Restart MCP pod if needed:

    kubectl rollout restart deployment/cloud-storage -n koveria-mcp

Rate Limit Exceeded

Error:

HTTPException: 429 Rate limit exceeded for team team-123

Cause: Team has exceeded rate limits.

Solution:

  1. Check current usage in Admin GUI:

    • Go to Cost Tracker → MCP Costs
    • View per-team usage graphs
  2. Increase rate limits (if justified):

    • Go to Admin GUI → MCP Services → [Service Name]
    • Adjust Calls per minute and Calls per day
  3. Implement batching in agent code (see best practices)

File Not Found

Error:

FileNotFoundError: Path /workspace/teams/team-123/missing.txt does not exist

Cause: File path is incorrect or file doesn't exist.

Solution:

  1. Verify file path is correct

  2. Check file exists before reading:

    exists = await self.mcp.cloud_storage.file_exists(file_path)
    if exists:
    content = await self.mcp.cloud_storage.read_file(file_path)
  3. List directory to see available files:

    files = await self.mcp.cloud_storage.list_directory("/workspace/teams/team-123")

Next Steps