Skip to main content
The Lens Events API enables real-time control and data streaming for active lens sessions via WebSocket connections. This allows you to interact with Newton models, configure input streams, and receive real-time inference results.

Connection Setup

1. Create a Lens Session

First, create a lens session using the REST API:
curl https://api.u1.archetypeai.app/v0.5/lens/sessions/create \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b9"}'
The response includes a session_endpoint for WebSocket connection:
{
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "lens_id": "lns-fd669361822b07e2-237ab3ffd79199b9",
  "session_endpoint": "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90",
  "status": "SESSION_STATUS_CREATED"
}

2. Connect to WebSocket

import websocket
import json

session_endpoint = "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90"
socket = websocket.create_connection(
    session_endpoint, 
    header={"Authorization": f"Bearer {API_KEY}"}
)

def send_and_receive_event(socket, event_data):
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response
For JavaScript, use the Sec-WebSocket-Protocol header to pass authentication and protocol version.

Event Message Format

All event messages are JSON objects with a required type field and optional event_data field:
{
  "type": "event.type",
  "event_data": {
    // Event-specific data
  }
}

Session Events

session.status

Query the current status of the active session.
event_message = {"type": "session.status"}
response = send_and_receive_event(socket, event_message)
print(f"Session status: {response}")
Response Example:
{
  "status": "SESSION_STATUS_RUNNING",
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "uptime_seconds": 145.2,
  "last_activity": 1741843135.1848276
}

session.validate

Run validation checks on the current session, including health checks and model connectivity.
event_message = {"type": "session.validate"}
response = send_and_receive_event(socket, event_message)

if response.get("valid"):
    print("Session is valid and ready")
else:
    print(f"Validation failed: {response.get('errors')}")
Response Example:
{
  "valid": true,
  "checks": {
    "session_health": "pass",
    "model_connectivity": "pass",
    "sensor_nodes": "pass"
  },
  "errors": []
}

session.read

Read pending event messages from the platform, such as log messages or asynchronous inference results.
Each client should generate a unique client_id to enable parallel reading. Use a new client_id to reset message stream playback.
import uuid

client_id = str(uuid.uuid4())[:8]  # Generate unique 8-character ID

event_message = {
    "type": "session.read",
    "event_data": {"client_id": client_id}
}
response = send_and_receive_event(socket, event_message)

for message in response.get("messages", []):
    print(f"[{message['timestamp']}] {message['type']}: {message['data']}")
Response Example:
{
  "messages": [
    {
      "timestamp": 1741843200.123,
      "type": "inference.result",
      "data": {
        "result": "A construction worker wearing a hard hat and safety vest",
        "confidence": 0.95,
        "processing_time_ms": 245
      }
    },
    {
      "timestamp": 1741843201.456,
      "type": "log.info",
      "data": "Frame processed successfully"
    }
  ],
  "client_id": "a1b2c3d4"
}

session.destroy

Destroy the active session directly via the control stream.
event_message = {"type": "session.destroy"}
response = send_and_receive_event(socket, event_message)
print(f"Session destroyed: {response}")

# Close WebSocket connection
socket.close()

Input Stream Configuration

input_stream.set

Configure input data streams to automatically feed data through the lens.

RTSP Video Stream

Connect a real-time video camera using RTSP protocol:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "rtsp_video_streamer",
        "stream_config": {
            "rtsp_url": "rtsp://camera.example.com:554/stream",
            "target_image_size": [360, 640],
            "target_frame_rate_hz": 0.2
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"RTSP stream configured: {response}")

CSV File Reader

Connect a CSV file for data streaming:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "csv_file_reader",
        "stream_config": {
            "file_id": "jena_climate_2009_2016.csv"
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"CSV stream configured: {response}")

Available Input Stream Types

Description: Real-time video streaming from RTSP camerasConfiguration Parameters:
  • rtsp_url (string): RTSP stream URL
  • target_image_size (array): [height, width] in pixels
  • target_frame_rate_hz (float): Frames per second to process
Use Cases: Security cameras, live monitoring, real-time analysis
Description: Stream data from uploaded CSV filesConfiguration Parameters:
  • file_id (string): ID of uploaded CSV file
Use Cases: Time series analysis, batch data processing, historical data analysis
Description: Direct sensor data streamingConfiguration Parameters:
  • Configuration varies by sensor type
Use Cases: IoT sensors, accelerometers, temperature sensors

Direct Model Queries

model.query

Send direct queries to Newton models with custom data payloads.

Image QA Example

import base64

# Load and encode image
with open("construction_site.jpg", "rb") as img_handle:
    base64_img = base64.b64encode(img_handle.read()).decode("utf-8")

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c1_6_241117d4362cc9",
        "template_name": "image_qa_template_task",
        "instruction": "Answer the following question about the image:",
        "focus": "Describe the safety equipment visible in the image.",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Newton response: {response['result']}")

Object Detection Example

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c1_6_241117d4362cc9",
        "template_name": "image_bbox_template_task",
        "instruction": "Localize all objects in the picture and build bounding box around them.",
        "focus": "Input: [person,car,hard_hat,safety_vest]",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Detected objects: {response['result']}")

Newton Model Templates

Available Templates

Template Name: image_qa_template_taskPurpose: Generate narrative descriptions of imagesParameters:
  • instruction: Question or instruction for the model
  • focus: Specific aspect to focus on (e.g., “safety equipment”, “project status”)
  • max_new_tokens: Maximum response length (typically 512)
Output: Natural language descriptionExample Response:
{
  "result": "The image shows construction workers wearing yellow hard hats and high-visibility safety vests. Safety equipment is properly worn according to regulations.",
  "confidence": 0.92,
  "processing_time_ms": 340
}

Error Handling

Handle WebSocket errors and connection issues:
import websocket
import json
import time

def robust_websocket_connection(session_endpoint, api_key):
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            socket = websocket.create_connection(
                session_endpoint,
                header={"Authorization": f"Bearer {api_key}"},
                timeout=30
            )
            return socket
        except Exception as e:
            retry_count += 1
            print(f"Connection attempt {retry_count} failed: {e}")
            if retry_count < max_retries:
                time.sleep(2 ** retry_count)  # Exponential backoff
            else:
                raise e

def safe_send_and_receive(socket, event_data, timeout=30):
    try:
        socket.settimeout(timeout)
        socket.send_binary(json.dumps(event_data).encode())
        response = json.loads(socket.recv())
        return response
    except websocket.WebSocketTimeoutException:
        raise Exception("WebSocket timeout - no response received")
    except websocket.WebSocketConnectionClosedException:
        raise Exception("WebSocket connection closed unexpectedly")

Best Practices

  • Always authenticate WebSocket connections with your API key
  • Implement connection retry logic with exponential backoff
  • Set appropriate timeouts for requests (30 seconds recommended)
  • Close connections properly to free resources
  • Use unique client_id for each client to enable parallel processing
  • Handle asynchronous events properly with session.read
  • Implement error handling for malformed responses
  • Log events for debugging and monitoring
  • Use appropriate image sizes for video streams (360x640 recommended)
  • Set reasonable frame rates (0.2 Hz for most use cases)
  • Monitor processing times and adjust accordingly
  • Batch similar queries when possible

Next Steps

Examples Repository

Explore complete working examples

Session Management

Monitor and manage active sessions

File Upload API

Upload files for CSV streaming

Troubleshooting

Solve common WebSocket issues