Core Functions
request_notification
Creates and sends a notification to merchant admins.
async def request_notification(
notification_type: NotificationType,
conversation_id: int,
context: dict,
session: Session,
) -> Notification
Purpose: Request merchant admin input for customer questions or support tickets.
Parameters:
- notification_type: Type of notification (user_question_request | user_support_request)
- conversation_id: ID of the conversation where notification was triggered
- context: Dictionary containing required fields for the notification type
- session: Async SQLAlchemy session for database operations
Returns: Created Notification object
Process Flow:
- Validate Input
- Check notification type exists in
NOTIFICATION_CONFIG - Verify all
requested_fieldspresent in context -
Load conversation and conversation_user
-
Type-Specific Processing
For user_question_request:
- Create Document (category: UserQuestion) with question as title
- Store document_id in context
- Validate context against UserQuestionContext schema
For user_support_request:
- Fetch merchant's support classification documents
- AI-classify support request using GPT-4.1:
- Determine appropriate merchant user type (sales, support, returns, etc.)
- Extract readable support_request_type for display
- Set target merchant user types for routing
- Fallback to all admins if no classification or docs
- Create Notification Record
- Store in database with
status=requested - Include
answer_model_json_schemafor later validation -
Add
allow_multiple_answersflag from config -
Build Assistant Instruction Message
- Extract notification content from
requested_fields - Generate prefixed notification ID via add_prefix
- Create instruction text with:
- Notification context (question/reason)
- Prefixed notification ID
- Usage instructions for
AnswerNotificationTool - JSON schema with required fields
-
Store notification IDs in message
other_data -
Send Campaign to Admins
- Add notification IDs to context for message tagging
- Call send_notification_messages
- Campaign sent to targeted merchant users
-
Assistant instruction inserted per recipient conversation
-
Persist Recipient List
- Store
merchant_user_conversation_user_idsin notification context - Used later to send closure notifications to other admins
- Silent failure (logged warning) if persist fails
Example Usage:
from app.api.notification.service import request_notification
from app.api.notification.models import NotificationType
# Request question notification
notification = await request_notification(
notification_type=NotificationType.user_question_request,
conversation_id=123,
context={"question": "What are your return hours?"},
session=session
)
# Request support notification
notification = await request_notification(
notification_type=NotificationType.user_support_request,
conversation_id=456,
context={"support_request_reason": "Item arrived damaged"},
session=session
)
Error Cases: - Raises exception if notification type invalid - Raises exception if required fields missing - Raises exception if conversation not found
answer_notification
Process merchant admin's answer and send response to customer.
async def answer_notification(
session: Session,
context: dict,
notification_id: int = None,
notification_type: NotificationType = None,
conversation_id: int = None,
)
Purpose: Process admin answer, localize response, and deliver to customer via campaign or Voyager.
Parameters:
- session: Async SQLAlchemy session for database operations
- context: Answer dictionary with all answered_fields populated
- notification_id: ID of notification to answer (primary method)
- notification_type: Type if creating new answered notification
- conversation_id: Conversation ID if creating new notification
Returns: None (updates notification in-place)
Process Flow:
- Load Notification
- Fetch by
notification_idOR create new answered notification - Merge answer context into notification context
-
Load associated conversation, merchant, and user
-
Validate Answer
- Convert
notification_typestring to Enum for downstream use - Verify all
answered_fieldspresent in context - Check single-answer enforcement (
allow_multiple_answers=False) -
Raise exception if already answered
-
Type-Specific Processing
For user_question_request:
- Fetch Document by document_id from notification context
- Update Document content with answer
- Add question to answer context for campaign variable
- Localize Response
- Extract primary admin response from first
answered_field - Generate AI localization using GPT-4.1 Mini:
- Target language from
conversation.language(default: "en") - Create
continuation_fragmentfor template variables - Create
full_sentencefor direct messaging
- Target language from
-
Different prompts for questions vs. support requests
-
Determine 24-Hour Policy
- Query last inbound
usermessage timestamp - Calculate:
use_campaign = (now - last_user_message_time) > 24 hours -
Default to
Trueif no user messages found -
Send Response to Customer
If use_campaign=True (>24 hours):
- Populate context with localized text:
- Support: customer_response_text = continuation_fragment
- Question: answer = full_sentence
- Call send_notification_messages with status=answered
- Sends customer_support_response or merchant_user_question_answer campaign
If use_campaign=False (≤24 hours):
- Build notification context summary from requested_fields
- Build admin answer summary from answered_fields
- Create assistant message with full context for Voyager
- Fetch merchant's billing plan
- Build Voyager instance
- Generate response via voyager.navigate()
- Send messages directly via Twilio (WhatsApp) or Instagram
- No campaign template needed
- Mark as Answered
- Update notification
statustoanswered - Update
updated_attimestamp -
Use
DBClient.db_update_rowswith external session -
Notify Other Admins (Single-Answer Only)
- Fetch
merchant_user_conversation_user_idsfrom notification context - Build closure message: "Notification NOTIF_XXX has been answered and is now closed."
- Find conversations for each recipient
- Insert assistant messages to prevent duplicate answers
Example Usage:
from app.api.notification.service import answer_notification
# Answer question
await answer_notification(
session=db_session,
notification_id=123,
context={"answer": "We are open Mon-Fri 9am-5pm EST"}
)
# Answer support request
await answer_notification(
session=db_session,
notification_id=456,
context={
"agent_response_to_customer": "Please send your order number",
"resolution_notes": "Customer eligible for refund"
}
)
Error Cases: - Raises if notification not found - Raises if answered_fields missing - Raises if already answered (single-answer notifications) - Raises if document not found (questions) - Raises if conversation/merchant/user not found
Edge Cases Handled: - No billing plan: Voyager still works (can be None) - No last user message: Defaults to campaign send - Failed closure notifications: Logged warning, doesn't fail main flow
send_notification_messages
Send notification campaigns to appropriate recipients (merchant admins or customers).
async def send_notification_messages(
merchant_id: int,
notification_type: NotificationType,
notification_status: NotificationStatus,
conversation_id: int,
context: dict = {},
session: Session = None,
target_merchant_users: Optional[List[str]] = None,
assistant_message_override: Message = None,
) -> List[int]
Purpose: Route and send notification campaigns based on status and type.
Parameters:
- merchant_id: ID of merchant
- notification_type: Type of notification (Enum)
- notification_status: Current status (requested | answered)
- conversation_id: Associated conversation
- context: Campaign variable context
- session: Database session
- target_merchant_users: Optional list of merchant user types to target
- assistant_message_override: Optional assistant message to insert per recipient
Returns: List of conversation_user_ids that received the campaign
Process Flow:
- Determine Recipients
- Look up
<status>_toin config:merchant_userorconversation_user -
Look up
<status>_campaignin config -
Route Based on Target
If target=conversation_user:
- Use conversation's conversation_user_id directly
- Send campaign to customer
If target=merchant_user:
- If target_merchant_users specified:
- Fetch merchant users of each specified type
- Collect phone numbers
- Fallback to 'admin' type if no users found
- If no target_merchant_users:
- Fetch all merchant users
- Collect phone numbers
- Create ConversationUser Records
- Check which merchant phone numbers have
ConversationUserrecords - Create missing ones automatically
-
Build mapping of phone → conversation_user_id
-
Send Campaign
- Call send_service_campaign_messages
- Pass conversation_user_ids, context, and assistant override
-
Campaign creates leads, sends messages, inserts assistant instructions
-
Return Recipients
- Fetch conversation_user_ids from created
CampaignLeadrecords - Returned for storage in notification context
Example Usage:
# Send request to merchant admins
recipient_ids = await send_notification_messages(
merchant_id=140,
notification_type=NotificationType.user_support_request,
notification_status=NotificationStatus.requested,
conversation_id=789,
context={...},
session=session,
target_merchant_users=["support", "returns"],
assistant_message_override=instruction_message
)
# Send answer to customer (>24h)
await send_notification_messages(
merchant_id=140,
notification_type=NotificationType.user_question_request,
notification_status=NotificationStatus.answered,
conversation_id=123,
context={"answer": "Localized answer text"},
session=session
)
Edge Cases: - No campaign configured: Logs warning, returns early - No merchant users found: Logs warning, returns empty - Failed to create ConversationUser: Logs error, continues with existing users - Target type has no users: Falls back to 'admin' type
Helper Functions
request_notification_job
Enqueue notification request as background job.
async def request_notification_job(
notification_type: NotificationType,
conversation_id: int,
context: dict,
)
Purpose: Queue notification request for async processing.
Parameters:
- notification_type: Type of notification
- conversation_id: Conversation ID
- context: Notification context
Returns: None (enqueues job)
Implementation:
- Serializes context as JSON string
- Enqueues request_notification_job to SAQ queue
- 120 second timeout
- Job handled by saq/functions.py
Usage: Called by Voyager tools to avoid blocking customer responses.
answer_notification_job
Enqueue notification answer as background job.
async def answer_notification_job(
notification_id: int = None,
notification_type: NotificationType = None,
conversation_id: int = None,
context: dict = {},
)
Purpose: Queue notification answer for async processing.
Parameters:
- notification_id: ID of notification to answer
- notification_type: Type (if creating new notification)
- conversation_id: Conversation ID (if creating new)
- context: Answer context
Returns: None (enqueues job)
Implementation:
- Serializes context as JSON string
- Converts enum to string for queue serialization
- Enqueues answer_notification_job to SAQ queue
- 120 second timeout
- Job handled by saq/functions.py
Usage: Called by AnswerNotificationTool to process admin responses asynchronously.
get_notifications
Fetch notifications for a merchant with optional filtering.
async def get_notifications(
merchant_id: int,
status: Optional[NotificationStatus] = None,
notification_types: Optional[List[NotificationType]] = [],
) -> List[NotificationSchema]
Purpose: Query notifications for display in merchant dashboard.
Parameters:
- merchant_id: Merchant to fetch notifications for
- status: Optional status filter (requested | answered)
- notification_types: Optional list of types to include
Returns: List of NotificationSchema objects with populated requested/answered fields
Process:
1. Build query joining Notification and Conversation
2. Filter by merchant_id via conversation relationship
3. Apply optional status filter
4. Apply optional type filter
5. Execute query via DBClient.db_run_query
6. Map to NotificationSchema and populate config-derived fields
Example:
# Get all unanswered notifications
notifications = await get_notifications(
merchant_id=140,
status=NotificationStatus.requested
)
# Get support tickets only
support_tickets = await get_notifications(
merchant_id=140,
notification_types=[NotificationType.user_support_request]
)
can_answer_notification
Check if a notification can be answered.
async def can_answer_notification(
notification_id: int,
) -> bool
Purpose: Validate whether a notification is answerable before attempting to answer.
Parameters:
- notification_id: ID of notification to check
Returns: True if answerable, False otherwise
Logic:
1. Fetch notification from database
2. Return False if not found
3. Check status:
- requested: Always answerable → True
- answered: Check allow_multiple_answers flag → True if flag set, else False
- Other statuses: False
Example:
if await can_answer_notification(123):
await answer_notification(notification_id=123, context={...})
else:
print("Notification cannot be answered")
Usage: Can be called before attempting answer to provide better UX in dashboard.
get_notifications_by_document_id
Fetch notifications associated with a specific document.
def get_notifications_by_document_id(
session: Session,
document_id: int
) -> List[Notification]
Purpose: Find all question notifications linked to a document.
Parameters:
- session: SQLAlchemy session (synchronous)
- document_id: ID of document to find notifications for
Returns: List of Notification objects
Implementation:
- Uses JSON operator to query context['document_id']
- Casts to Integer for comparison
- Synchronous session (not async)
Usage: Link documents to their originating notifications for tracking.
Note: Only works for user_question_request notifications (only type with document_id).
Support Functions
AI Classification
For user_support_request notifications, support classification determines routing:
Process:
1. Fetch merchant's support classification documents (category: SupportClassification)
2. Format documents for AI:
Support classifications:
User type: sales
Type description: Handles product inquiries, pricing, availability
User type: returns
Type description: Processes returns, exchanges, refunds
- Call
ai_client.get_structured_outputwith: - Model: GPT-4.1
- Schema:
SupportClassificationOutput - Max tokens: 100
-
Messages: System prompt + support request reason + classifications
-
Extract results:
target_user_type: Merchant user type key (e.g., "returns", "sales")support_request_type: Readable type (e.g., "Returns", "Product Inquiry")
Output Schema:
class SupportClassificationOutput(BaseModel):
target_user_type: Optional[str] # Key for merchant user type
support_request_type: Optional[str] # Display-friendly type
Fallback Behavior: - No documents: Uses base classification, sets type to "Admin" - No target_user_type: Sends to all merchant users - No users for target type: Falls back to 'admin' type
Response Localization
All customer-facing responses are localized via AI:
Input:
- Admin's response text (from answered_fields)
- Customer's language (from conversation.language)
- Customer name
- Notification context
Output Schema:
class LocalizedResponse(BaseModel):
continuation_fragment: str # Fits after template clause
full_sentence: str # Complete standalone response
Process: 1. Build system prompt based on notification type: - Questions: "Rewrite into clear reply... suitable for template and standalone" - Support: "Rewrite... fits after 'has reviewed your request and '"
- Call
ai_client.get_structured_output: - Model: GPT-4.1 Mini
- Schema:
LocalizedResponse -
Context: Target language, customer name, notification context, admin response
-
Extract both response variants:
continuation_fragment: For campaign template variablesfull_sentence: For Voyager-generated messages
Example:
Admin input: "we accept returns mon-fri 9-5"
Localized output (Spanish):
{
"continuation_fragment": "aceptamos devoluciones de lunes a viernes de 9am a 5pm",
"full_sentence": "Aceptamos devoluciones de lunes a viernes de 9am a 5pm."
}
Used in:
- Campaign path (>24h): Uses continuation_fragment in template
- Voyager path (≤24h): Uses full_sentence as context for natural response generation
24-Hour Policy Implementation
WhatsApp enforces a 24-hour messaging window for non-template messages.
Determination Logic
last_user_message_time = await session.execute(
select(Message.created_at)
.where(
Message.conversation_id == conversation_id,
Message.role == MessageRole.user,
)
.order_by(Message.created_at.desc())
.limit(1)
).scalar_one_or_none()
now_utc = datetime.now(timezone.utc)
use_campaign = True # Default to template
if last_user_message_time:
use_campaign = (now_utc - last_user_message_time) > timedelta(hours=24)
Logic:
- Query last inbound user message (ignoring assistant, response, admin roles)
- Compare to current time
- If difference >24 hours OR no user messages: use campaign template
- If difference ≤24 hours: use Voyager for free-form messaging
Why Default to True: - Safe for conversations with no user messages (e.g., new notifications) - Ensures compliance with WhatsApp policies - Template messages always allowed
Campaign Path (>24h)
Support Request:
context["customer_response_text"] = localized.continuation_fragment
# Template: "Our team has reviewed your request and {{customer_response_text}}"
Question:
context["answer"] = localized.full_sentence
# Template: "Thank you for waiting... The team says '{{answer}}'"
Voyager Path (≤24h)
Context Message:
Notification context (ID: NOTIF_123): support_request_reason=damaged item.
Admin answer: agent_response_to_customer=send order number for refund
Process: 1. Create assistant message with notification + admin answer context 2. Insert into conversation (Voyager will see it in history) 3. Build Voyager with fresh conversation state 4. Generate response maintaining tone 5. Send directly via Twilio or Instagram
Advantages: - More natural, conversational responses - Can use emoji, formatting, multiple messages - Incorporates full conversation history
Integration Points
Campaign Integration
Uses campaign system for message delivery:
Campaigns Used:
- merchant_user_question_request: Notify admins of question
- merchant_user_override_request: Notify admins of support ticket
- merchant_user_question_answer: Send answer to customer (>24h)
- customer_support_response: Send support response to customer (>24h)
Service Campaigns:
- All notification campaigns are CampaignType.service
- Auto-enabled when created via /campaign/create-default route
- Support dynamic recipient lists (not pre-defined leads)
See: Campaign Service for campaign details
Document Integration
Question notifications create and update documents:
Flow:
1. Request creates Document with question as title
2. Answer updates Document content with answer
3. Document becomes part of merchant knowledge base
4. Category: DocumentCategory.UserQuestion
5. Source: DocumentSource.FreeInput
See: Document Service for document management
Merchant User Routing
Support requests route to specific merchant user types:
Classification: - AI reads support classification documents - Matches request reason to user type descriptions - Returns target type key (e.g., "returns", "sales", "billing")
Routing: - Calls get_merchant_users with type key - Fetches users with matching type - Falls back to 'admin' type if no users found
See: Merchant Service for user type management
Voyager Integration
For ≤24h responses, uses Voyager to generate contextual replies:
Context Provided:
- Assistant message with notification summary + admin answer
- Full conversation history
- Template response messages now included (via whatsapp_payload.type == "template")
Tool Available:
- AnswerNotificationTool accessible to admins in their conversations
- Tool description updated to consider notification context from history
See: Voyager Documentation for navigation system
Performance Characteristics
Database Operations
Request Notification (read-heavy): - 1 query: Fetch conversation + user - 1-2 queries: Fetch support docs (support requests only) - 1 query: Fetch target merchant users - 1 query: Create ConversationUsers (if needed) - Total: 3-5 queries
Answer Notification (write-heavy): - 1 query: Fetch notification - 1 query: Fetch conversation - 1 query: Fetch merchant - 1 query: Fetch conversation_user - 1 query: Fetch/update document (questions only) - 1 query: Fetch last user message time - 1 query: Fetch billing plan (≤24h only) - 1 query: Update notification status - 1-2 queries: Fetch merchant user conversations for closure (single-answer) - Total: 7-11 queries
AI API Calls
Per Notification: - Support classification: 1 call (GPT-4.1, ~100 tokens) - Response localization: 1 call (GPT-4.1 Mini, ~200 tokens) - Voyager response (≤24h only): 1-3 calls (varies with tool usage)
Cost Optimization: - Use GPT-4.1 Mini for localization (cheaper) - Use GPT-4.1 for classification (better accuracy needed) - Cache classification results via logging_params
Async Processing
- Both request and answer run in SAQ background jobs
- Don't block customer-facing API calls
- Allow retry on failure
- 120 second timeout per job
Error Handling Patterns
Service-Level Errors
All service functions raise exceptions on failure: - Allow caller to handle errors appropriately - Include descriptive error messages - Rollback handled by caller (routes or SAQ jobs)
Common Exception Types:
- Exception("Notification not found")
- Exception("Missing required field: <field>")
- Exception("Notification already answered")
- Exception("Document not found")
Logged Warnings (Non-Fatal)
Some errors are logged but don't fail the operation: - Failed to persist recipient list - Failed to insert closure assistant messages - No merchant users found for type - No campaign configured
Pattern:
try:
# Optional operation
...
except Exception as e:
logger.warning(f"Non-critical failure: {e}")
# Continue execution
Silent Fallbacks
Some operations have built-in fallbacks:
- No users for classified type → Use 'admin' type
- No billing plan → Pass None to Voyager (works fine)
- No last user message → Default to campaign send
Testing Considerations
Unit Test Coverage
Key functions to test:
1. request_notification with all notification types
2. answer_notification for both >24h and ≤24h paths
3. send_notification_messages for merchant_user and conversation_user routing
4. AI classification with various support reasons
5. Response localization for different languages
6. Single-answer enforcement
7. Closure notification delivery
Integration Test Scenarios
- Full question flow: Request → Admin answers → Customer receives answer
- Support ticket flow: Request → AI classifies → Routed admin answers → Customer receives
- 24h boundary: Answer just before 24h (Voyager) vs. just after (campaign)
- Multi-admin: Multiple admins receive request, one answers, others get closure
- Fallback routing: Classified type has no users, falls back to admin
- Missing ConversationUser: Merchant phone doesn't have ConversationUser, auto-created
Edge Cases to Test
- Notification already answered (single-answer)
- Invalid notification ID format/range
- Answer doesn't match JSON schema
- No merchant users configured
- No billing plan configured
- No support classification documents
- Conversation deleted during notification processing
- Empty answer string
- Very long question/answer text
- Special characters in answers
Debugging Tips
Enable Debug Logging
Temporarily add print statements in key locations: - After notification creation - Before/after AI classification - Before campaign send - After Voyager response
Check Notification Status
notification = await DBClient.db_run_query(
select(Notification).where(Notification.id == notification_id)
)
print(f"Status: {notification[0].status}")
print(f"Context: {notification[0].context}")
Verify Campaign Delivery
Check if campaign is enabled:
campaign = await DBClient.db_run_query(
select(Campaign).where(
Campaign.name == "merchant_user_override_request",
Campaign.merchant_id == merchant_id
)
)
print(f"Enabled: {campaign[0].enabled}")
Trace Message History
messages = await DBClient.get_messages(
conversation_id=conversation_id,
n=20
)
for msg in messages:
print(f"{msg.role}: {msg.content[:50]}... | other_data: {msg.other_data}")
Look for:
- response messages with whatsapp_payload.type == "template"
- assistant messages with notification_id in other_data
- Message roles and content for Voyager history
Check 24h Calculation
last_user_msg = await session.execute(
select(Message.created_at)
.where(
Message.conversation_id == conversation_id,
Message.role == MessageRole.user
)
.order_by(Message.created_at.desc())
.limit(1)
).scalar_one_or_none()
now = datetime.now(timezone.utc)
hours_since = (now - last_user_msg).total_seconds() / 3600
use_campaign = hours_since > 24
print(f"Hours since last user message: {hours_since}")
print(f"Use campaign: {use_campaign}")