Call Types
Outbound Calls
Agent-initiated calls to customers:Copy
curl -X POST http://localhost:8010/api/calls \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{
"agent_id": "agent-123",
"to_phone": "+1234567890",
"from_phone": "+0987654321",
"customer_data": {
"name": "John Doe",
"order_id": "ORD-123"
}
}'
Copy
{
"call_id": "call-456",
"status": "initiated",
"created_at": "2024-01-15T10:30:00Z",
"estimated_start_time": "2024-01-15T10:30:05Z"
}
Incoming Calls
Calls from customers to your system:- Webhook Receiver receives Twilio/Plivo webhook
- Router determines appropriate agent
- Call Stream connects caller to agent
Copy
from fastapi import Request
@app.post("/webhook/twilio/incoming")
async def handle_incoming_call(request: Request):
"""Handle incoming Twilio calls"""
from_phone = request.form.get("From")
to_phone = request.form.get("To")
# Determine agent based on called number
agent_id = await route_call(to_phone)
# Return TwiML response
response = VoiceResponse()
response.say("Connecting you to an agent...")
response.connect(create_connection(agent_id))
return Response(content=str(response), media_type="application/xml")
WebSocket Streaming
Real-time audio streaming for AI processing:Copy
// Client-side WebSocket
const ws = new WebSocket('ws://localhost:8010/ws/stream');
ws.addEventListener('open', () => {
console.log('Connected to stream');
});
ws.addEventListener('message', (event) => {
const audioChunk = event.data;
// Play audio chunk
audioContext.play(audioChunk);
});
ws.addEventListener('close', () => {
console.log('Stream closed');
});
// Send audio frames
navigator.mediaDevices.getUserMedia({ audio: true })
.then(stream => {
const processor = new AudioProcessor(stream);
processor.onDataAvailable((chunk) => {
ws.send(chunk);
});
});
Call Lifecycle
Copy
Call Creation
↓
[INITIATED] → Establish connection
↓
[CONNECTING] → Dial phone number
↓
[CONNECTED] → Agent processes audio
↓
[ACTIVE] → Ongoing conversation
↓
[ENDING] → User hangs up or timeout
↓
[COMPLETED] → Call logged & recorded
Making Calls
Simple Outbound Call
Copy
from Service.CallService import CallService
from models.models import OutboundCallRequest
call_service = CallService()
call_request = OutboundCallRequest(
agent_id="agent-123",
to_phone="+1234567890",
from_phone="+0987654321"
)
call = await call_service.initiate_call(call_request)
print(f"Call initiated: {call.id}")
Call with Context Data
Copy
call_request = OutboundCallRequest(
agent_id="agent-123",
to_phone="+1234567890",
from_phone="+0987654321",
customer_context={
"customer_id": "cust-123",
"customer_name": "John Doe",
"order_id": "ORD-456",
"reason": "Order status inquiry",
},
metadata={
"campaign_id": "camp-789",
"priority": "high",
}
)
call = await call_service.initiate_call(call_request)
Scheduled Calls
Copy
from scheduler.scheduler import ScheduledCallService
scheduler = ScheduledCallService()
# Schedule a single call
await scheduler.schedule_call(
agent_id="agent-123",
phone_number="+1234567890",
scheduled_time="2024-01-20 14:30:00",
timezone="America/New_York"
)
# Schedule recurring calls
await scheduler.schedule_recurring_call(
agent_id="agent-123",
phone_number="+1234567890",
cron_expression="0 9 * * MON", # Every Monday at 9 AM
timezone="America/New_York"
)
Receiving Calls
Webhook Configuration
Configure your Twilio/Plivo webhook to point to:Copy
https://yourdomain.com/webhook/twilio/incoming
TwiML Response
Copy
from twilio.twiml.voice_response import VoiceResponse, Stream, Say
def generate_twiml(agent_id: str) -> str:
response = VoiceResponse()
# Play greeting
response.say("Thank you for calling. Please wait while we connect you.")
# Start stream to AI agent
stream = response.stream(
url=f"wss://yourdomain.com/ws/stream/{agent_id}"
)
return str(response)
Incoming Call Handler
Copy
@app.post("/webhook/twilio/incoming")
async def handle_incoming_call(request: Request):
data = await request.form()
from_phone = data.get("From")
to_phone = data.get("To")
call_sid = data.get("CallSid")
# Route to appropriate agent
agent_id = await route_to_agent(to_phone)
# Generate TwiML response
twiml = generate_twiml(agent_id)
# Log incoming call
await log_call({
"direction": "inbound",
"from": from_phone,
"to": to_phone,
"twilio_sid": call_sid,
"agent_id": agent_id,
})
return Response(content=twiml, media_type="application/xml")
Audio Streaming
WebSocket Streaming Handler
Copy
@app.websocket("/ws/stream/{agent_id}")
async def websocket_endpoint(websocket: WebSocket, agent_id: str):
await websocket.accept()
# Start audio stream processing
call_id = str(uuid.uuid4())
try:
while True:
# Receive audio chunk from Twilio
data = await websocket.receive_bytes()
# Process with AI agent
response = await agent_service.process_audio(
agent_id=agent_id,
audio_chunk=data,
call_id=call_id
)
# Send response back
await websocket.send_bytes(response)
except WebSocketDisconnect:
# Call ended
await call_service.end_call(call_id)
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.close(code=1011)
Audio Processing
Copy
from utils.audio_processing import AudioProcessor
processor = AudioProcessor(
sample_rate=16000,
channels=1,
format="pcm"
)
# Encode/decode audio
audio_bytes = processor.encode(raw_audio)
raw_audio = processor.decode(audio_bytes)
# Add silence detection
has_speech = processor.detect_speech(audio_chunk)
Call Monitoring
Real-time Call Status
Copy
# Get call status
curl -X GET http://localhost:8010/api/calls/{call_id} \
-H "Authorization: Bearer YOUR_TOKEN"
Copy
{
"call_id": "call-456",
"agent_id": "agent-123",
"status": "active",
"duration": 45,
"transcript": "Agent: Hello... Customer: Hi...",
"recording_url": "https://storage.../call-456.wav",
"metrics": {
"transcription_latency_ms": 150,
"response_latency_ms": 250,
"audio_quality": 0.92
}
}
Call Metrics
Track important metrics:Copy
call_metrics = {
"call_id": call_id,
"duration": 180, # seconds
"speaker_changes": 5,
"ai_response_time": 250, # ms
"audio_quality": 0.95, # 0-1
"customer_sentiment": "positive",
"resolution_achieved": True,
}
await call_service.log_metrics(call_metrics)
Call Recording
Enable Recording
Copy
call_request = OutboundCallRequest(
agent_id="agent-123",
to_phone="+1234567890",
from_phone="+0987654321",
record_call=True,
record_storage="s3", # or "local"
)
call = await call_service.initiate_call(call_request)
Recording Retrieval
Copy
# Get recording URL
curl -X GET http://localhost:8010/api/calls/{call_id}/recording \
-H "Authorization: Bearer YOUR_TOKEN"
Copy
{
"call_id": "call-456",
"recording_url": "https://storage.s3.amazonaws.com/calls/call-456.wav",
"duration": 180,
"size_bytes": 5242880,
"format": "wav",
"created_at": "2024-01-15T10:35:00Z"
}
Call Transcription
Automatic Transcription
Copy
call_request = OutboundCallRequest(
agent_id="agent-123",
to_phone="+1234567890",
from_phone="+0987654321",
enable_transcription=True,
transcription_provider="deepgram", # or "google"
)
call = await call_service.initiate_call(call_request)
Get Transcript
Copy
curl -X GET http://localhost:8010/api/calls/{call_id}/transcript \
-H "Authorization: Bearer YOUR_TOKEN"
Copy
{
"call_id": "call-456",
"transcript": [
{
"speaker": "agent",
"text": "Thank you for calling, how can I help?",
"timestamp": "0:00:00",
"confidence": 0.99
},
{
"speaker": "customer",
"text": "Hi, I'd like to check my order status",
"timestamp": "0:00:03",
"confidence": 0.97
}
]
}
Call Routing
Intelligent Routing
Copy
from Service.PhoneService import PhoneService
phone_service = PhoneService()
# Route based on DID
async def route_to_agent(called_number: str) -> str:
# Get agent for this DID
agent = await phone_service.get_agent_for_number(called_number)
if agent:
# Check if available
if agent.is_available():
return agent.id
# Queue or route to backup
return await route_to_backup_agent()
return await route_to_default_agent()
Custom Routing Logic
Copy
async def custom_route(call_data: dict) -> str:
from_phone = call_data["from_phone"]
to_phone = call_data["to_phone"]
# Route VIP customers to premium agent
if is_vip_customer(from_phone):
return "agent-vip"
# Route based on time of day
hour = datetime.now().hour
if hour < 9:
return "agent-night-support"
# Load balance
return await get_least_busy_agent()
Ending Calls
Disconnect Call
Copy
curl -X POST http://localhost:8010/api/calls/{call_id}/disconnect \
-H "Authorization: Bearer YOUR_TOKEN"
Timeout Configuration
Copy
call_request = OutboundCallRequest(
agent_id="agent-123",
to_phone="+1234567890",
from_phone="+0987654321",
timeout_seconds=600, # 10 minutes
silence_timeout_seconds=30, # 30 seconds of silence
)
Call Cleanup
Copy
async def end_call(call_id: str):
# Close WebSocket connections
await websocket_manager.disconnect(call_id)
# Stop recording
await recording_service.stop_recording(call_id)
# Process transcription
await transcription_service.process(call_id)
# Log final metrics
await metrics_service.log_call_end(call_id)
# Update call record
await call_service.mark_completed(call_id)
Error Handling
Common Errors
Copy
# Invalid phone number
try:
validate_phone_number("+1234567890")
except InvalidPhoneNumber:
return {"error": "Invalid phone number format"}
# Agent not found
try:
agent = await agent_service.get_agent(agent_id)
except AgentNotFound:
return {"error": "Agent does not exist"}
# Call connection failed
try:
await call_service.initiate_call(call_request)
except CallConnectionError as e:
logger.error(f"Failed to connect call: {e}")
# Retry or escalate
Retry Logic
Copy
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def initiate_call_with_retry(call_request):
return await call_service.initiate_call(call_request)
Performance Optimization
Connection Pooling
Copy
# Reuse Twilio client
twilio_client = Client(
account_sid,
auth_token,
http_client=requests.Session()
)
Streaming Optimization
- Use appropriate sample rate (8000 Hz for phone, 16000 Hz for clarity)
- Implement audio buffering
- Compress audio frames
- Minimize latency with CDN
Parallel Call Processing
Copy
import asyncio
async def process_multiple_calls(call_requests: list):
tasks = [
call_service.initiate_call(req)
for req in call_requests
]
results = await asyncio.gather(*tasks)
return results