Skip to main content

Service Architecture

┌─────────────────────────────────────────────┐
│         FastAPI Application                 │
│         (app.py)                            │
└─────────────────┬───────────────────────────┘

    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
┌─────────┐ ┌─────────┐ ┌─────────────┐
│ Agent   │ │  Call   │ │Organization │
│Service  │ │Service  │ │  Service    │
└─────────┘ └─────────┘ └─────────────┘
    │             │             │
    └─────────────┼─────────────┘

    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Batch    │ │Scheduler │ │Knowledge │
│  Call    │ │ Service  │ │  Base    │
│ Service  │ │          │ │ Service  │
└──────────┘ └──────────┘ └──────────┘

Core Services

Agent Service

File: Service/AgentService.py Purpose: Create, configure, and manage AI agents. Key Methods:
  • create_agent() - Create new agent
  • get_agent() - Retrieve agent
  • update_agent() - Update agent configuration
  • delete_agent() - Remove agent
  • list_agents() - List all agents
Example:
from Service.AgentService import AgentService

service = AgentService()
agent = await service.create_agent({
    "name": "Support Agent",
    "ai_provider": "gemini",
    "system_prompt": "You are helpful..."
})

Call Service

File: Service/CallService.py Purpose: Handle voice call lifecycle and routing. Key Methods:
  • initiate_call() - Start outbound call
  • handle_incoming_call() - Receive incoming call
  • process_call_stream() - Handle WebSocket audio
  • end_call() - Disconnect call
  • get_call_status() - Check call state
Example:
from Service.CallService import CallService

service = CallService()
call = await service.initiate_call({
    "agent_id": "agent-123",
    "to_phone": "+1234567890"
})

Organization Service

File: Service/OrganizationService.py Purpose: Manage organizations and multi-tenancy. Key Methods:
  • create_organization() - Create new org
  • get_organization() - Retrieve org
  • add_member() - Add user to org
  • get_organization_agents() - List org agents
  • update_organization() - Update org settings
Example:
from Service.OrganizationService import OrganizationService

service = OrganizationService()
org = await service.create_organization({
    "name": "My Company",
    "industry": "retail"
})

Batch Call Service

File: Service/BatchCallService.py Purpose: Process multiple calls efficiently. Features:
  • CSV/JSON upload
  • Phone number validation
  • Parallel call execution
  • Progress tracking
  • Result reporting
Example:
from Service.BatchCallService import BatchCallService

service = BatchCallService()

# Upload batch
batch = await service.create_batch({
    "agent_id": "agent-123",
    "file_url": "s3://bucket/calls.csv",
    "column_mapping": {
        "phone": "phone_number",
        "name": "customer_name"
    }
})

# Monitor progress
status = await service.get_batch_status(batch.id)
print(f"Progress: {status.completed}/{status.total}")

Scheduler Service

File: scheduler/scheduler.py Purpose: Schedule calls for future execution. Features:
  • One-time scheduled calls
  • Recurring calls with cron
  • Timezone support
  • Automatic execution
  • Retry logic
Example:
from scheduler.scheduler import ScheduledCallService

service = ScheduledCallService()

# Schedule single call
await service.schedule_call({
    "agent_id": "agent-123",
    "phone_number": "+1234567890",
    "scheduled_time": "2024-02-01 14:00:00"
})

# Schedule recurring
await service.schedule_recurring({
    "agent_id": "agent-123",
    "phone_number": "+1234567890",
    "cron_expression": "0 9 * * MON"
})

Knowledge Base Service

File: Service/KnowldegeBaseService.py Purpose: Store and retrieve knowledge for agents. Features:
  • Document storage
  • Semantic search
  • Embedding support
  • Version control
Example:
from Service.KnowldegeBaseService import KnowledgeBaseService

service = KnowledgeBaseService()

# Add document
await service.add_document({
    "kb_id": "kb-123",
    "title": "Product FAQ",
    "content": "Q: How do I...? A: ..."
})

# Search knowledge
results = await service.search(
    "kb-123",
    query="product warranty",
    top_k=5
)

Phone Service

File: Service/PhoneService.py Purpose: Manage phone numbers and routing. Features:
  • DID number management
  • Phone number validation
  • Call routing rules
  • SMS capabilities
Example:
from Service.PhoneService import PhoneService

service = PhoneService()

# Get agent for DID
agent = await service.get_agent_for_number("+1234567890")

# Route call
agent_id = await service.route_call({
    "from_number": "+1111111111",
    "to_number": "+1234567890"
})

Authentication Service

File: utils/auth.py Purpose: Handle API authentication and authorization. Features:
  • Supabase JWT verification
  • Token validation
  • User context extraction
  • Role-based access
Example:
from utils.auth import verify_supabase_token, AuthenticatedUser

@app.get("/api/protected")
async def protected(user: AuthenticatedUser = Depends(verify_supabase_token)):
    return {"user_id": user.id, "org_id": user.org_id}

MCP Service

File: Service/McpService.py Purpose: Handle external integrations via MCP. Integrated Services:
  • Telegram (messaging)
  • Gmail (email)
  • WhatsApp (messaging)
  • Shopify (e-commerce)
Example:
from Service.McpService import McpService

service = McpService()

# Send Telegram message
await service.execute_tool(
    "telegram",
    "send_message",
    {
        "chat_id": "123456",
        "text": "Hello!"
    }
)

# Send email
await service.execute_tool(
    "gmail",
    "send_email",
    {
        "to": "user@example.com",
        "subject": "Hello",
        "body": "Hi there!"
    }
)

Data Access Service

File: Service/Supabase.py Purpose: Database access and persistence. Features:
  • Connection management
  • Query execution
  • Transaction support
  • Authentication
Example:
from Service.Supabase import get_supabase_client

supabase = get_supabase_client()

# Query data
response = supabase.table("calls").select("*").execute()

# Insert data
supabase.table("calls").insert({
    "agent_id": "agent-123",
    "from_phone": "+1111111111",
    "to_phone": "+1234567890"
}).execute()

# Update data
supabase.table("calls").update({
    "status": "completed"
}).eq("id", "call-456").execute()

Service Dependencies

AgentService
  ├── Supabase (database)
  ├── AI Bridges (Gemini, OpenAI, Hume)
  └── McpService (integrations)

CallService
  ├── AgentService
  ├── Supabase
  ├── PhoneService
  └── Twilio/Plivo

BatchCallService
  ├── CallService
  ├── AgentService
  └── Supabase

SchedulerService
  ├── CallService
  └── APScheduler

OrganizationService
  └── Supabase

PhoneService
  └── Supabase

Service Configuration

Each service can be configured via environment variables:
# Agent Service
GEMINI_API_KEY=key
OPENAI_API_KEY=key
HUME_CONFIG_ID=id

# Call Service
TWILIO_ACCOUNT_ID=sid
TWILIO_AUTH_TOKEN=token

# Database
SUPABASE_URL=url
SUPABASE_KEY=key

# MCP Services
TELE_MCP_SERVER_URL=http://localhost:8100
GMAIL_MCP_SERVER_URL=http://localhost:8101

Error Handling

All services implement consistent error handling:
try:
    agent = await service.get_agent(agent_id)
except AgentNotFound:
    return {"error": "Agent not found", "status": 404}
except DatabaseError as e:
    logger.error(f"Database error: {e}")
    return {"error": "Internal server error", "status": 500}

Logging

Services use structured logging:
from utils.logger import logger

logger.info(f"Created agent: {agent_id}")
logger.warning(f"Agent not responding: {agent_id}")
logger.error(f"Call failed: {call_id} - {error}")

Testing Services

# Mock a service
from unittest.mock import AsyncMock

mock_service = AsyncMock()
mock_service.get_agent.return_value = {
    "id": "agent-123",
    "name": "Test Agent"
}

# Use in test
agent = await mock_service.get_agent("agent-123")
assert agent["id"] == "agent-123"

Performance Considerations

  • Connection Pooling: Reuse database connections
  • Caching: Cache agent configurations
  • Async Operations: Use async/await for I/O
  • Rate Limiting: Prevent abuse
  • Monitoring: Track service health

Next Steps