Core Functions

request_notification

Creates and sends a notification to merchant admins.

async def request_notification(
    notification_type: NotificationType,
    conversation_id: int,
    context: dict,
    session: Session,
) -> Notification

Purpose: Request merchant admin input for customer questions or support tickets.

Parameters: - notification_type: Type of notification (user_question_request | user_support_request) - conversation_id: ID of the conversation where notification was triggered - context: Dictionary containing required fields for the notification type - session: Async SQLAlchemy session for database operations

Returns: Created Notification object

Process Flow:

  1. Validate Input
  2. Check notification type exists in NOTIFICATION_CONFIG
  3. Verify all requested_fields present in context
  4. Load conversation and conversation_user

  5. Type-Specific Processing

For user_question_request: - Create Document (category: UserQuestion) with question as title - Store document_id in context - Validate context against UserQuestionContext schema

For user_support_request: - Fetch merchant's support classification documents - AI-classify support request using GPT-4.1: - Determine appropriate merchant user type (sales, support, returns, etc.) - Extract readable support_request_type for display - Set target merchant user types for routing - Fallback to all admins if no classification or docs

  1. Create Notification Record
  2. Store in database with status=requested
  3. Include answer_model_json_schema for later validation
  4. Add allow_multiple_answers flag from config

  5. Build Assistant Instruction Message

  6. Extract notification content from requested_fields
  7. Generate prefixed notification ID via add_prefix
  8. Create instruction text with:
    • Notification context (question/reason)
    • Prefixed notification ID
    • Usage instructions for AnswerNotificationTool
    • JSON schema with required fields
  9. Store notification IDs in message other_data

  10. Send Campaign to Admins

  11. Add notification IDs to context for message tagging
  12. Call send_notification_messages
  13. Campaign sent to targeted merchant users
  14. Assistant instruction inserted per recipient conversation

  15. Persist Recipient List

  16. Store merchant_user_conversation_user_ids in notification context
  17. Used later to send closure notifications to other admins
  18. Silent failure (logged warning) if persist fails

Example Usage:

from app.api.notification.service import request_notification
from app.api.notification.models import NotificationType

# Request question notification
notification = await request_notification(
    notification_type=NotificationType.user_question_request,
    conversation_id=123,
    context={"question": "What are your return hours?"},
    session=session
)

# Request support notification
notification = await request_notification(
    notification_type=NotificationType.user_support_request,
    conversation_id=456,
    context={"support_request_reason": "Item arrived damaged"},
    session=session
)

Error Cases: - Raises exception if notification type invalid - Raises exception if required fields missing - Raises exception if conversation not found


answer_notification

Process merchant admin's answer and send response to customer.

async def answer_notification(
    session: Session,
    context: dict,
    notification_id: int = None,
    notification_type: NotificationType = None,
    conversation_id: int = None,
)

Purpose: Process admin answer, localize response, and deliver to customer via campaign or Voyager.

Parameters: - session: Async SQLAlchemy session for database operations - context: Answer dictionary with all answered_fields populated - notification_id: ID of notification to answer (primary method) - notification_type: Type if creating new answered notification - conversation_id: Conversation ID if creating new notification

Returns: None (updates notification in-place)

Process Flow:

  1. Load Notification
  2. Fetch by notification_id OR create new answered notification
  3. Merge answer context into notification context
  4. Load associated conversation, merchant, and user

  5. Validate Answer

  6. Convert notification_type string to Enum for downstream use
  7. Verify all answered_fields present in context
  8. Check single-answer enforcement (allow_multiple_answers=False)
  9. Raise exception if already answered

  10. Type-Specific Processing

For user_question_request: - Fetch Document by document_id from notification context - Update Document content with answer - Add question to answer context for campaign variable

  1. Localize Response
  2. Extract primary admin response from first answered_field
  3. Generate AI localization using GPT-4.1 Mini:
    • Target language from conversation.language (default: "en")
    • Create continuation_fragment for template variables
    • Create full_sentence for direct messaging
  4. Different prompts for questions vs. support requests

  5. Determine 24-Hour Policy

  6. Query last inbound user message timestamp
  7. Calculate: use_campaign = (now - last_user_message_time) > 24 hours
  8. Default to True if no user messages found

  9. Send Response to Customer

If use_campaign=True (>24 hours): - Populate context with localized text: - Support: customer_response_text = continuation_fragment - Question: answer = full_sentence - Call send_notification_messages with status=answered - Sends customer_support_response or merchant_user_question_answer campaign

If use_campaign=False (≤24 hours): - Build notification context summary from requested_fields - Build admin answer summary from answered_fields - Create assistant message with full context for Voyager - Fetch merchant's billing plan - Build Voyager instance - Generate response via voyager.navigate() - Send messages directly via Twilio (WhatsApp) or Instagram - No campaign template needed

  1. Mark as Answered
  2. Update notification status to answered
  3. Update updated_at timestamp
  4. Use DBClient.db_update_rows with external session

  5. Notify Other Admins (Single-Answer Only)

  6. Fetch merchant_user_conversation_user_ids from notification context
  7. Build closure message: "Notification NOTIF_XXX has been answered and is now closed."
  8. Find conversations for each recipient
  9. Insert assistant messages to prevent duplicate answers

Example Usage:

from app.api.notification.service import answer_notification

# Answer question
await answer_notification(
    session=db_session,
    notification_id=123,
    context={"answer": "We are open Mon-Fri 9am-5pm EST"}
)

# Answer support request
await answer_notification(
    session=db_session,
    notification_id=456,
    context={
        "agent_response_to_customer": "Please send your order number",
        "resolution_notes": "Customer eligible for refund"
    }
)

Error Cases: - Raises if notification not found - Raises if answered_fields missing - Raises if already answered (single-answer notifications) - Raises if document not found (questions) - Raises if conversation/merchant/user not found

Edge Cases Handled: - No billing plan: Voyager still works (can be None) - No last user message: Defaults to campaign send - Failed closure notifications: Logged warning, doesn't fail main flow


send_notification_messages

Send notification campaigns to appropriate recipients (merchant admins or customers).

async def send_notification_messages(
    merchant_id: int,
    notification_type: NotificationType,
    notification_status: NotificationStatus,
    conversation_id: int,
    context: dict = {},
    session: Session = None,
    target_merchant_users: Optional[List[str]] = None,
    assistant_message_override: Message = None,
) -> List[int]

Purpose: Route and send notification campaigns based on status and type.

Parameters: - merchant_id: ID of merchant - notification_type: Type of notification (Enum) - notification_status: Current status (requested | answered) - conversation_id: Associated conversation - context: Campaign variable context - session: Database session - target_merchant_users: Optional list of merchant user types to target - assistant_message_override: Optional assistant message to insert per recipient

Returns: List of conversation_user_ids that received the campaign

Process Flow:

  1. Determine Recipients
  2. Look up <status>_to in config: merchant_user or conversation_user
  3. Look up <status>_campaign in config

  4. Route Based on Target

If target=conversation_user: - Use conversation's conversation_user_id directly - Send campaign to customer

If target=merchant_user: - If target_merchant_users specified: - Fetch merchant users of each specified type - Collect phone numbers - Fallback to 'admin' type if no users found - If no target_merchant_users: - Fetch all merchant users - Collect phone numbers

  1. Create ConversationUser Records
  2. Check which merchant phone numbers have ConversationUser records
  3. Create missing ones automatically
  4. Build mapping of phone → conversation_user_id

  5. Send Campaign

  6. Call send_service_campaign_messages
  7. Pass conversation_user_ids, context, and assistant override
  8. Campaign creates leads, sends messages, inserts assistant instructions

  9. Return Recipients

  10. Fetch conversation_user_ids from created CampaignLead records
  11. Returned for storage in notification context

Example Usage:

# Send request to merchant admins
recipient_ids = await send_notification_messages(
    merchant_id=140,
    notification_type=NotificationType.user_support_request,
    notification_status=NotificationStatus.requested,
    conversation_id=789,
    context={...},
    session=session,
    target_merchant_users=["support", "returns"],
    assistant_message_override=instruction_message
)

# Send answer to customer (>24h)
await send_notification_messages(
    merchant_id=140,
    notification_type=NotificationType.user_question_request,
    notification_status=NotificationStatus.answered,
    conversation_id=123,
    context={"answer": "Localized answer text"},
    session=session
)

Edge Cases: - No campaign configured: Logs warning, returns early - No merchant users found: Logs warning, returns empty - Failed to create ConversationUser: Logs error, continues with existing users - Target type has no users: Falls back to 'admin' type


Helper Functions

request_notification_job

Enqueue notification request as background job.

async def request_notification_job(
    notification_type: NotificationType,
    conversation_id: int,
    context: dict,
)

Purpose: Queue notification request for async processing.

Parameters: - notification_type: Type of notification - conversation_id: Conversation ID - context: Notification context

Returns: None (enqueues job)

Implementation: - Serializes context as JSON string - Enqueues request_notification_job to SAQ queue - 120 second timeout - Job handled by saq/functions.py

Usage: Called by Voyager tools to avoid blocking customer responses.


answer_notification_job

Enqueue notification answer as background job.

async def answer_notification_job(
    notification_id: int = None,
    notification_type: NotificationType = None,
    conversation_id: int = None,
    context: dict = {},
)

Purpose: Queue notification answer for async processing.

Parameters: - notification_id: ID of notification to answer - notification_type: Type (if creating new notification) - conversation_id: Conversation ID (if creating new) - context: Answer context

Returns: None (enqueues job)

Implementation: - Serializes context as JSON string - Converts enum to string for queue serialization - Enqueues answer_notification_job to SAQ queue - 120 second timeout - Job handled by saq/functions.py

Usage: Called by AnswerNotificationTool to process admin responses asynchronously.


get_notifications

Fetch notifications for a merchant with optional filtering.

async def get_notifications(
    merchant_id: int,
    status: Optional[NotificationStatus] = None,
    notification_types: Optional[List[NotificationType]] = [],
) -> List[NotificationSchema]

Purpose: Query notifications for display in merchant dashboard.

Parameters: - merchant_id: Merchant to fetch notifications for - status: Optional status filter (requested | answered) - notification_types: Optional list of types to include

Returns: List of NotificationSchema objects with populated requested/answered fields

Process: 1. Build query joining Notification and Conversation 2. Filter by merchant_id via conversation relationship 3. Apply optional status filter 4. Apply optional type filter 5. Execute query via DBClient.db_run_query 6. Map to NotificationSchema and populate config-derived fields

Example:

# Get all unanswered notifications
notifications = await get_notifications(
    merchant_id=140,
    status=NotificationStatus.requested
)

# Get support tickets only
support_tickets = await get_notifications(
    merchant_id=140,
    notification_types=[NotificationType.user_support_request]
)

can_answer_notification

Check if a notification can be answered.

async def can_answer_notification(
    notification_id: int,
) -> bool

Purpose: Validate whether a notification is answerable before attempting to answer.

Parameters: - notification_id: ID of notification to check

Returns: True if answerable, False otherwise

Logic: 1. Fetch notification from database 2. Return False if not found 3. Check status: - requested: Always answerable → True - answered: Check allow_multiple_answers flag → True if flag set, else False - Other statuses: False

Example:

if await can_answer_notification(123):
    await answer_notification(notification_id=123, context={...})
else:
    print("Notification cannot be answered")

Usage: Can be called before attempting answer to provide better UX in dashboard.


get_notifications_by_document_id

Fetch notifications associated with a specific document.

def get_notifications_by_document_id(
    session: Session,
    document_id: int
) -> List[Notification]

Purpose: Find all question notifications linked to a document.

Parameters: - session: SQLAlchemy session (synchronous) - document_id: ID of document to find notifications for

Returns: List of Notification objects

Implementation: - Uses JSON operator to query context['document_id'] - Casts to Integer for comparison - Synchronous session (not async)

Usage: Link documents to their originating notifications for tracking.

Note: Only works for user_question_request notifications (only type with document_id).


Support Functions

AI Classification

For user_support_request notifications, support classification determines routing:

Process: 1. Fetch merchant's support classification documents (category: SupportClassification) 2. Format documents for AI:

Support classifications:
User type: sales
Type description: Handles product inquiries, pricing, availability

User type: returns
Type description: Processes returns, exchanges, refunds

  1. Call ai_client.get_structured_output with:
  2. Model: GPT-4.1
  3. Schema: SupportClassificationOutput
  4. Max tokens: 100
  5. Messages: System prompt + support request reason + classifications

  6. Extract results:

  7. target_user_type: Merchant user type key (e.g., "returns", "sales")
  8. support_request_type: Readable type (e.g., "Returns", "Product Inquiry")

Output Schema:

class SupportClassificationOutput(BaseModel):
    target_user_type: Optional[str]  # Key for merchant user type
    support_request_type: Optional[str]  # Display-friendly type

Fallback Behavior: - No documents: Uses base classification, sets type to "Admin" - No target_user_type: Sends to all merchant users - No users for target type: Falls back to 'admin' type


Response Localization

All customer-facing responses are localized via AI:

Input: - Admin's response text (from answered_fields) - Customer's language (from conversation.language) - Customer name - Notification context

Output Schema:

class LocalizedResponse(BaseModel):
    continuation_fragment: str  # Fits after template clause
    full_sentence: str  # Complete standalone response

Process: 1. Build system prompt based on notification type: - Questions: "Rewrite into clear reply... suitable for template and standalone" - Support: "Rewrite... fits after 'has reviewed your request and '"

  1. Call ai_client.get_structured_output:
  2. Model: GPT-4.1 Mini
  3. Schema: LocalizedResponse
  4. Context: Target language, customer name, notification context, admin response

  5. Extract both response variants:

  6. continuation_fragment: For campaign template variables
  7. full_sentence: For Voyager-generated messages

Example:

Admin input: "we accept returns mon-fri 9-5"

Localized output (Spanish):

{
    "continuation_fragment": "aceptamos devoluciones de lunes a viernes de 9am a 5pm",
    "full_sentence": "Aceptamos devoluciones de lunes a viernes de 9am a 5pm."
}

Used in: - Campaign path (>24h): Uses continuation_fragment in template - Voyager path (≤24h): Uses full_sentence as context for natural response generation


24-Hour Policy Implementation

WhatsApp enforces a 24-hour messaging window for non-template messages.

Determination Logic

last_user_message_time = await session.execute(
    select(Message.created_at)
    .where(
        Message.conversation_id == conversation_id,
        Message.role == MessageRole.user,
    )
    .order_by(Message.created_at.desc())
    .limit(1)
).scalar_one_or_none()

now_utc = datetime.now(timezone.utc)
use_campaign = True  # Default to template
if last_user_message_time:
    use_campaign = (now_utc - last_user_message_time) > timedelta(hours=24)

Logic: - Query last inbound user message (ignoring assistant, response, admin roles) - Compare to current time - If difference >24 hours OR no user messages: use campaign template - If difference ≤24 hours: use Voyager for free-form messaging

Why Default to True: - Safe for conversations with no user messages (e.g., new notifications) - Ensures compliance with WhatsApp policies - Template messages always allowed

Campaign Path (>24h)

Support Request:

context["customer_response_text"] = localized.continuation_fragment
# Template: "Our team has reviewed your request and {{customer_response_text}}"

Question:

context["answer"] = localized.full_sentence
# Template: "Thank you for waiting... The team says '{{answer}}'"

Voyager Path (≤24h)

Context Message:

Notification context (ID: NOTIF_123): support_request_reason=damaged item. 
Admin answer: agent_response_to_customer=send order number for refund

Process: 1. Create assistant message with notification + admin answer context 2. Insert into conversation (Voyager will see it in history) 3. Build Voyager with fresh conversation state 4. Generate response maintaining tone 5. Send directly via Twilio or Instagram

Advantages: - More natural, conversational responses - Can use emoji, formatting, multiple messages - Incorporates full conversation history


Integration Points

Campaign Integration

Uses campaign system for message delivery:

Campaigns Used: - merchant_user_question_request: Notify admins of question - merchant_user_override_request: Notify admins of support ticket - merchant_user_question_answer: Send answer to customer (>24h) - customer_support_response: Send support response to customer (>24h)

Service Campaigns: - All notification campaigns are CampaignType.service - Auto-enabled when created via /campaign/create-default route - Support dynamic recipient lists (not pre-defined leads)

See: Campaign Service for campaign details

Document Integration

Question notifications create and update documents:

Flow: 1. Request creates Document with question as title 2. Answer updates Document content with answer 3. Document becomes part of merchant knowledge base 4. Category: DocumentCategory.UserQuestion 5. Source: DocumentSource.FreeInput

See: Document Service for document management

Merchant User Routing

Support requests route to specific merchant user types:

Classification: - AI reads support classification documents - Matches request reason to user type descriptions - Returns target type key (e.g., "returns", "sales", "billing")

Routing: - Calls get_merchant_users with type key - Fetches users with matching type - Falls back to 'admin' type if no users found

See: Merchant Service for user type management

Voyager Integration

For ≤24h responses, uses Voyager to generate contextual replies:

Context Provided: - Assistant message with notification summary + admin answer - Full conversation history - Template response messages now included (via whatsapp_payload.type == "template")

Tool Available: - AnswerNotificationTool accessible to admins in their conversations - Tool description updated to consider notification context from history

See: Voyager Documentation for navigation system


Performance Characteristics

Database Operations

Request Notification (read-heavy): - 1 query: Fetch conversation + user - 1-2 queries: Fetch support docs (support requests only) - 1 query: Fetch target merchant users - 1 query: Create ConversationUsers (if needed) - Total: 3-5 queries

Answer Notification (write-heavy): - 1 query: Fetch notification - 1 query: Fetch conversation - 1 query: Fetch merchant - 1 query: Fetch conversation_user - 1 query: Fetch/update document (questions only) - 1 query: Fetch last user message time - 1 query: Fetch billing plan (≤24h only) - 1 query: Update notification status - 1-2 queries: Fetch merchant user conversations for closure (single-answer) - Total: 7-11 queries

AI API Calls

Per Notification: - Support classification: 1 call (GPT-4.1, ~100 tokens) - Response localization: 1 call (GPT-4.1 Mini, ~200 tokens) - Voyager response (≤24h only): 1-3 calls (varies with tool usage)

Cost Optimization: - Use GPT-4.1 Mini for localization (cheaper) - Use GPT-4.1 for classification (better accuracy needed) - Cache classification results via logging_params

Async Processing

  • Both request and answer run in SAQ background jobs
  • Don't block customer-facing API calls
  • Allow retry on failure
  • 120 second timeout per job

Error Handling Patterns

Service-Level Errors

All service functions raise exceptions on failure: - Allow caller to handle errors appropriately - Include descriptive error messages - Rollback handled by caller (routes or SAQ jobs)

Common Exception Types: - Exception("Notification not found") - Exception("Missing required field: <field>") - Exception("Notification already answered") - Exception("Document not found")

Logged Warnings (Non-Fatal)

Some errors are logged but don't fail the operation: - Failed to persist recipient list - Failed to insert closure assistant messages - No merchant users found for type - No campaign configured

Pattern:

try:
    # Optional operation
    ...
except Exception as e:
    logger.warning(f"Non-critical failure: {e}")
    # Continue execution

Silent Fallbacks

Some operations have built-in fallbacks: - No users for classified type → Use 'admin' type - No billing plan → Pass None to Voyager (works fine) - No last user message → Default to campaign send


Testing Considerations

Unit Test Coverage

Key functions to test: 1. request_notification with all notification types 2. answer_notification for both >24h and ≤24h paths 3. send_notification_messages for merchant_user and conversation_user routing 4. AI classification with various support reasons 5. Response localization for different languages 6. Single-answer enforcement 7. Closure notification delivery

Integration Test Scenarios

  1. Full question flow: Request → Admin answers → Customer receives answer
  2. Support ticket flow: Request → AI classifies → Routed admin answers → Customer receives
  3. 24h boundary: Answer just before 24h (Voyager) vs. just after (campaign)
  4. Multi-admin: Multiple admins receive request, one answers, others get closure
  5. Fallback routing: Classified type has no users, falls back to admin
  6. Missing ConversationUser: Merchant phone doesn't have ConversationUser, auto-created

Edge Cases to Test

  1. Notification already answered (single-answer)
  2. Invalid notification ID format/range
  3. Answer doesn't match JSON schema
  4. No merchant users configured
  5. No billing plan configured
  6. No support classification documents
  7. Conversation deleted during notification processing
  8. Empty answer string
  9. Very long question/answer text
  10. Special characters in answers

Debugging Tips

Enable Debug Logging

Temporarily add print statements in key locations: - After notification creation - Before/after AI classification - Before campaign send - After Voyager response

Check Notification Status

notification = await DBClient.db_run_query(
    select(Notification).where(Notification.id == notification_id)
)
print(f"Status: {notification[0].status}")
print(f"Context: {notification[0].context}")

Verify Campaign Delivery

Check if campaign is enabled:

campaign = await DBClient.db_run_query(
    select(Campaign).where(
        Campaign.name == "merchant_user_override_request",
        Campaign.merchant_id == merchant_id
    )
)
print(f"Enabled: {campaign[0].enabled}")

Trace Message History

messages = await DBClient.get_messages(
    conversation_id=conversation_id,
    n=20
)
for msg in messages:
    print(f"{msg.role}: {msg.content[:50]}... | other_data: {msg.other_data}")

Look for: - response messages with whatsapp_payload.type == "template" - assistant messages with notification_id in other_data - Message roles and content for Voyager history

Check 24h Calculation

last_user_msg = await session.execute(
    select(Message.created_at)
    .where(
        Message.conversation_id == conversation_id,
        Message.role == MessageRole.user
    )
    .order_by(Message.created_at.desc())
    .limit(1)
).scalar_one_or_none()

now = datetime.now(timezone.utc)
hours_since = (now - last_user_msg).total_seconds() / 3600
use_campaign = hours_since > 24
print(f"Hours since last user message: {hours_since}")
print(f"Use campaign: {use_campaign}")