WebSocket Events - Real-time Updates

What This Is: A live connection that streams updates as your workflow executes. URL Pattern: wss://lao-socket.studio.lyzr.ai/ws/{flow_name}/{run_name} Why You Need This: Instead of waiting for the entire workflow to finish, you get instant notifications as each step completes, fails, or progresses. Critical for enterprise applications where users need immediate feedback. How It Works:
  1. Start your workflow with run-dag API
  2. Immediately connect to WebSocket with the same flow_name/run_name
  3. Receive real-time events as each node executes
  4. Handle events in your application (update UI, trigger alerts, etc.)

Key Events You Care About

Event TypeWhen It FiresWhat You Should DoWhy It Matters
flow_startedWorkflow beginsUpdate UI: “Processing…”User knows their request is being handled
task_startedNode begins executionShow progress: “Step 1 of 3”Real-time progress indication
task_completedNode finishesUpdate progress: “Step 2 of 3”User sees continuous progress
flow_completedWorkflow doneProcess results, notify userHandle final results immediately
task_failedNode failsHandle error, maybe retryImmediate error response, no waiting
flow_errorWorkflow failsShow error, log for debuggingCritical failure handling
Enterprise Benefits:
  • Immediate Response: Users see progress instantly, not after 30+ seconds
  • Error Handling: Failed steps are caught immediately, not at the end
  • Better UX: Real-time progress bars instead of loading spinners
  • Monitoring: Operations teams get instant alerts on failures

Real WebSocket Event Examples

Enterprise Customer Support Pipeline

Flow Started:
{
  "event_type": "flow_started",
  "task_id": "cust-support-001",
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453",
  "timestamp": 1756806125.043,
  "message": "Starting customer support workflow"
}
Task Started - Sentiment Analysis:
{
  "event_type": "task_started",
  "task_id": "b29aa07f-9341-4ac5-9160-4f7e1f594654",
  "task_name": "sentiment_analyzer",
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453",
  "timestamp": 1756806127.234,
  "input": {
    "config": {
      "agent_id": "sentiment-v2",
      "api_key": "sk-prod-sentiment-key"
    },
    "customer_message": "Your new update broke our entire dashboard! Our team can't access reports and we have a board meeting in 2 hours. This is completely unacceptable!",
    "customer_email": "cto@enterprise-client.com",
    "priority": "urgent"
  }
}
Task Completed - Sentiment Analysis:
{
  "event_type": "task_completed",
  "task_id": "b29aa07f-9341-4ac5-9160-4f7e1f594654", 
  "task_name": "sentiment_analyzer",
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453",
  "timestamp": 1756806132.567,
  "execution_time": 5.33,
  "output": {
    "sentiment": "highly_negative",
    "confidence": 0.94,
    "urgency_score": 9.2,
    "key_issues": ["dashboard_broken", "report_access", "time_sensitive"],
    "escalation_recommended": true,
    "customer_tier": "enterprise"
  }
}
Task Started - CRM Update:
{
  "event_type": "task_started",
  "task_id": "crm-update-567",
  "task_name": "update_salesforce", 
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453",
  "timestamp": 1756806133.012,
  "input": {
    "config": {
      "url": "https://enterprise.salesforce.com/services/data/v54.0/sobjects/Case",
      "method": "POST",
      "headers": {
        "Authorization": "Bearer sf_prod_token_xyz"
      }
    },
    "case_data": {
      "subject": "URGENT: Dashboard access failure - Enterprise Client",
      "priority": "High",
      "origin": "API",
      "status": "New",
      "account_id": "001234567890ABC",
      "sentiment_score": 9.2,
      "auto_escalated": true
    }
  }
}
Task Completed - CRM Update:
{
  "event_type": "task_completed",
  "task_id": "crm-update-567",
  "task_name": "update_salesforce",
  "flow_name": "Customer_Support_Pipeline", 
  "run_name": "ticket_78453",
  "timestamp": 1756806134.789,
  "execution_time": 1.777,
  "output": {
    "case_id": "5003000001abCDEF",
    "case_number": "00012345",
    "assigned_to": "senior-support-team",
    "sla_breach_warning": false,
    "estimated_resolution": "2024-02-15T14:30:00Z"
  }
}
Task Started - Notification Service:
{
  "event_type": "task_started",
  "task_id": "notify-001",
  "task_name": "send_notifications",
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453", 
  "timestamp": 1756806135.123,
  "input": {
    "config": {
      "url": "https://api.enterprise-notif.com/v1/alerts",
      "method": "POST"
    },
    "notifications": [
      {
        "type": "slack",
        "channel": "#critical-support",
        "message": "🚨 URGENT: Enterprise client dashboard failure - Case #00012345"
      },
      {
        "type": "email", 
        "recipients": ["support-lead@company.com", "engineering-oncall@company.com"],
        "subject": "URGENT: Enterprise Client Issue - Immediate Action Required"
      },
      {
        "type": "pagerduty",
        "service_key": "prod-support-incidents",
        "severity": "critical"
      }
    ]
  }
}
Flow Completed - Full Pipeline:
{
  "event_type": "flow_completed",
  "task_id": "pipeline-complete",
  "task_name": "flow", 
  "flow_name": "Customer_Support_Pipeline",
  "run_name": "ticket_78453",
  "timestamp": 1756806138.456,
  "total_execution_time": 13.41,
  "output": {
    "sentiment_analyzer": {
      "sentiment": "highly_negative",
      "urgency_score": 9.2,
      "escalation_recommended": true
    },
    "update_salesforce": {
      "case_id": "5003000001abCDEF",
      "case_number": "00012345", 
      "assigned_to": "senior-support-team"
    },
    "send_notifications": {
      "slack_sent": true,
      "emails_sent": 2,
      "pagerduty_incident": "PD-12345",
      "all_notifications_successful": true
    },
    "summary": {
      "customer_notified": true,
      "internal_team_alerted": true,
      "case_created": true,
      "escalation_complete": true,
      "estimated_resolution_time": "2 hours"
    }
  }
}

Data Processing Pipeline Events

Flow Started - Data Validation:
{
  "event_type": "flow_started",
  "flow_name": "Financial_Data_Pipeline",
  "run_name": "daily_batch_20240215",
  "timestamp": 1756806200.123,
  "message": "Processing 45,000 financial records"
}
Task Started - Data Validation:
{
  "event_type": "task_started",
  "task_name": "validate_financial_data",
  "flow_name": "Financial_Data_Pipeline", 
  "run_name": "daily_batch_20240215",
  "timestamp": 1756806201.456,
  "input": {
    "source_file": "s3://financial-data/2024/02/15/transactions.csv",
    "record_count": 45000,
    "validation_rules": [
      "amount_positive",
      "valid_account_format", 
      "date_within_range",
      "currency_code_valid"
    ]
  }
}
Task Failed - Data Quality Issue:
{
  "event_type": "task_failed",
  "task_name": "validate_financial_data",
  "flow_name": "Financial_Data_Pipeline",
  "run_name": "daily_batch_20240215", 
  "timestamp": 1756806245.789,
  "execution_time": 44.33,
  "error": {
    "type": "DataValidationError",
    "message": "1,247 records failed validation checks",
    "code": "VALIDATION_FAILED",
    "details": {
      "total_records": 45000,
      "failed_records": 1247,
      "failure_rate": 0.0277,
      "common_errors": [
        {"type": "invalid_amount", "count": 834},
        {"type": "missing_currency", "count": 413}
      ]
    }
  },
  "partial_results": {
    "valid_records": 43753,
    "invalid_records_quarantined": true,
    "quarantine_location": "s3://quarantine/2024/02/15/"
  }
}
Flow Error - Pipeline Stopped:
{
  "event_type": "flow_error",
  "flow_name": "Financial_Data_Pipeline",
  "run_name": "daily_batch_20240215",
  "timestamp": 1756806246.123,
  "error": {
    "type": "PipelineHaltedError",
    "message": "Data quality threshold not met, stopping pipeline",
    "code": "QUALITY_THRESHOLD_FAILED",
    "threshold": 0.02,
    "actual_failure_rate": 0.0277,
    "action_taken": "quarantine_and_alert"
  },
  "partial_results": {
    "processed_records": 43753,
    "quarantined_records": 1247,
    "notifications_sent": ["data-quality-team@company.com"]
  }
}

Production WebSocket Code

class WorkflowMonitor {
  constructor(flowName, runName) {
    this.url = `wss://lao-socket.studio.lyzr.ai/ws/${flowName}/${runName}`;
    this.ws = null;
    this.reconnectAttempts = 0;
  }

  connect() {
    this.ws = new WebSocket(this.url);
    
    this.ws.onopen = () => {
      console.log('✅ Connected to workflow');
      this.reconnectAttempts = 0;
      
      // Keep connection alive
      this.pingInterval = setInterval(() => {
        this.ws.send(JSON.stringify({type: 'ping'}));
      }, 30000);
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      
      switch(data.event_type) {
        case 'flow_started':
          this.onWorkflowStart(data);
          break;
        case 'task_completed':
          this.onTaskComplete(data);
          break;
        case 'flow_completed':
          this.onWorkflowComplete(data);
          this.ws.close();
          break;
        case 'task_failed':
        case 'flow_error':
          this.onError(data);
          break;
      }
    };

    this.ws.onclose = () => {
      clearInterval(this.pingInterval);
      this.handleReconnect();
    };
  }

  handleReconnect() {
    if (this.reconnectAttempts < 3) {
      this.reconnectAttempts++;
      setTimeout(() => this.connect(), 2000 * this.reconnectAttempts);
    }
  }

  // Implement these based on your needs
  onWorkflowStart(data) { /* Update UI */ }
  onTaskComplete(data) { /* Update progress */ }
  onWorkflowComplete(data) { /* Process results */ }
  onError(data) { /* Handle errors */ }
}

// Usage
const monitor = new WorkflowMonitor('MyWorkflow', 'run_001');
monitor.connect();