Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.archetypeai.app/llms.txt

Use this file to discover all available pages before exploring further.

The Lens Events API enables real-time control and data streaming for active lens sessions via WebSocket connections. This allows you to interact with Newton models, configure input streams, and receive real-time inference results.

Connection Setup

1. Create a Lens Session

First, create a lens session using the REST API:
curl https://api.u1.archetypeai.app/v0.5/lens/sessions/create \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"lens_id": "lns-fd669361822b07e2-bc608aa3fdf8b4f9"}'
The response includes a session_endpoint for WebSocket connection (see Create Lens Session for the full response shape):
{
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "lens_id": "lns-fd669361822b07e2-bc608aa3fdf8b4f9",
  "session_endpoint": "wss://api.u1.archetypeai.app/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90",
  "session_status": "LensSessionStatus.SESSION_STATUS_REGISTERED"
}

2. Connect to WebSocket

import websocket
import json

session_endpoint = "wss://api.u1.archetypeai.app/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90"
socket = websocket.create_connection(
    session_endpoint, 
    header={"Authorization": f"Bearer {API_KEY}"}
)

def send_and_receive_event(socket, event_data):
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response
For JavaScript, use the Sec-WebSocket-Protocol header to pass authentication and protocol version.

Event Message Format

All event messages are JSON objects with a required type field and optional event_data field:
{
  "type": "event.type",
  "event_data": {
    // Event-specific data
  }
}
Response envelope: with the single exception of session.status (see below), every server response carries a type field suffixed with .response and wraps the result in event_data:
{
  "type": "event.type.response",
  "event_data": {
    "is_valid": true,
    "error_messages": []
  }
}
When event_data would be empty, it is returned as null.

Session Events

session.status

Query the current status of the active session.
event_message = {"type": "session.status"}
response = send_and_receive_event(socket, event_message)
print(f"Session status: {response}")
Response Example: Unlike other events, session.status returns the full session metadata record (the same shape as Create Lens Session), not the {type, event_data} envelope.
The response currently includes an api_key field that echoes back the caller’s real API key. Treat this field as a secret — do not log, persist, or expose it in client-side code. (Server-side scrub in progress.)
{
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "lens_id": "lns-fd669361822b07e2-bc608aa3fdf8b4f9",
  "lens_name": "Activity Monitor",
  "session_status": "LensSessionStatus.SESSION_STATUS_RUNNING",
  "session_endpoint": "wss://api.u1.archetypeai.app/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90",
  "session_ttl_sec": 86400,
  "registration_timestamp": 1779430869.7525718,
  "last_event_timestamp": 1779430869.7525718,
  "last_update_timestamp": 1779430869.9271567,
  "session_start_timestamp": 1779430869.8115742,
  "session_destroyed_timestamp": null,
  "num_active_connections": 0,
  "num_updates": 1,
  "num_inputs": 0,
  "num_outputs": 0,
  "num_events": 0,
  "input_data_stream_id": null,
  "output_data_stream_id": null,
  "lens_config": { /* full lens configuration */ }
}

session.validate

Run validation checks on the current session, including health checks and model connectivity.
event_message = {"type": "session.validate"}
response = send_and_receive_event(socket, event_message)

if response.get("event_data", {}).get("is_valid"):
    print("Session is valid and ready")
else:
    print(f"Validation failed: {response.get('event_data', {}).get('error_messages')}")
Response Example:
{
  "type": "session.validate.response",
  "event_data": {
    "is_valid": true,
    "error_messages": []
  }
}

session.read

Read pending event messages from the platform, such as log messages or asynchronous inference results.
session.read only returns messages for lenses whose model_pipeline writes to the WebSocket mailbox. Lenses that use a server_sent_events_writer output processor (Activity Monitor and most cookbook lenses) route inference results to the SSE consumer endpoint instead — for those, session.read returns event_data: null even when GET /lens/sessions/metadata shows non-zero num_outputs. Inspect the lens’s model_pipeline via GET /lens/metadata to confirm which output channel it uses.
Each client should generate a unique client_id to enable parallel reading. Use a new client_id to reset message stream playback.
import uuid

client_id = str(uuid.uuid4())[:8]  # Generate unique 8-character ID

event_message = {
    "type": "session.read",
    "event_data": {"client_id": client_id}
}
response = send_and_receive_event(socket, event_message)

messages = (response.get("event_data") or {}).get("messages", [])
for message in messages:
    print(f"[{message['timestamp']}] {message['type']}: {message['data']}")
Response Example (no pending messages):
{
  "type": "session.read.response",
  "event_data": null
}
Response Example (with pending messages):
{
  "type": "session.read.response",
  "event_data": {
    "messages": [
      {
        "timestamp": 1741843200.123,
        "type": "inference.result",
        "data": {
          "result": "A construction worker wearing a hard hat and safety vest"
        }
      },
      {
        "timestamp": 1741843201.456,
        "type": "log.info",
        "data": "Frame processed successfully"
      }
    ],
    "client_id": "a1b2c3d4"
  }
}

session.destroy

Destroy the active session directly via the control stream.
event_message = {"type": "session.destroy"}
response = send_and_receive_event(socket, event_message)
print(f"Session destroyed: {response}")

# Close WebSocket connection
socket.close()

Input Stream Configuration

input_stream.set

Configure input data streams to automatically feed data through the lens.

RTSP Video Stream

Connect a real-time video camera using RTSP protocol:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "rtsp_video_streamer",
        "stream_config": {
            "rtsp_url": "rtsp://camera.example.com:554/stream",
            "target_image_size": [360, 640],
            "target_frame_rate_hz": 0.2
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"RTSP stream configured: {response}")

CSV File Reader

Connect a CSV file for data streaming:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "csv_file_reader",
        "stream_config": {
            "file_id": "jena_climate_2009_2016.csv"
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"CSV stream configured: {response}")

Available Input Stream Types

Description: Real-time video streaming from RTSP camerasConfiguration Parameters:
  • rtsp_url (string): RTSP stream URL
  • target_image_size (array): [height, width] in pixels
  • target_frame_rate_hz (float): Frames per second to process
Use Cases: Security cameras, live monitoring, real-time analysis
Description: Stream data from uploaded CSV filesConfiguration Parameters:
  • file_id (string): ID of uploaded CSV file
Use Cases: Time series analysis, batch data processing, historical data analysis
Description: Stream a pre-recorded video file (e.g. .mp4) that was previously uploaded via the Files APIConfiguration Parameters:
  • file_id (string): file_id returned by the upload — typically the original filename (e.g. "my_video.mp4"), not the file_uid UUID
Use Cases: Offline analysis of recorded footage, replaying dashcam / camera archives through a vision lensSee Video File Reader for the full data-connector reference.
Description: Direct sensor data streamingConfiguration Parameters:
  • Configuration varies by sensor type
Use Cases: IoT sensors, accelerometers, temperature sensors

Direct Model Queries

model.query

Send direct queries to Newton models with custom data payloads.
model.query is only handled by lenses whose pipeline includes a model-query processor. Lenses that wrap a streaming processor (e.g. lens_camera_processor, lens_timeseries_state_processor) expect input via input_stream.set and will time out on model.query. Responses arrive asynchronously — long-running queries return a timeout response with message: "Response timed out for query" if the model has not replied within the WebSocket read window.

Image QA Example

import base64

# Load and encode image
with open("construction_site.jpg", "rb") as img_handle:
    base64_img = base64.b64encode(img_handle.read()).decode("utf-8")

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c2_4_7b_251215a172f6d7",
        "template_name": "image_qa_template_task",
        "instruction": "Answer the following question about the image:",
        "focus": "Describe the safety equipment visible in the image.",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Newton response: {response.get('event_data', {}).get('result')}")

Object Detection Example

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c2_4_7b_251215a172f6d7",
        "template_name": "image_bbox_template_task",
        "instruction": "Localize all objects in the picture and build bounding box around them.",
        "focus": "Input: [person,car,hard_hat,safety_vest]",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Detected objects: {response.get('event_data', {}).get('result')}")

Newton Model Templates

Available Templates

Template Name: image_qa_template_taskPurpose: Generate narrative descriptions of imagesParameters:
  • instruction: Question or instruction for the model
  • focus: Specific aspect to focus on (e.g., “safety equipment”, “project status”)
  • max_new_tokens: Maximum response length (typically 512)
Output: Natural language description, wrapped in the WS response envelope.Example Response:
{
  "type": "model.query.response",
  "event_data": {
    "result": "The image shows construction workers wearing yellow hard hats and high-visibility safety vests."
  }
}
Timeout Response (model did not reply within the read window):
{
  "type": "model.query.response",
  "message": "Response timed out for query"
}

Error Handling

Handle WebSocket errors and connection issues:
import websocket
import json
import time

def robust_websocket_connection(session_endpoint, api_key):
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            socket = websocket.create_connection(
                session_endpoint,
                header={"Authorization": f"Bearer {api_key}"},
                timeout=30
            )
            return socket
        except Exception as e:
            retry_count += 1
            print(f"Connection attempt {retry_count} failed: {e}")
            if retry_count < max_retries:
                time.sleep(2 ** retry_count)  # Exponential backoff
            else:
                raise e

def safe_send_and_receive(socket, event_data, timeout=30):
    try:
        socket.settimeout(timeout)
        socket.send_binary(json.dumps(event_data).encode())
        response = json.loads(socket.recv())
        return response
    except websocket.WebSocketTimeoutException:
        raise Exception("WebSocket timeout - no response received")
    except websocket.WebSocketConnectionClosedException:
        raise Exception("WebSocket connection closed unexpectedly")

Best Practices

  • Always authenticate WebSocket connections with your API key
  • Implement connection retry logic with exponential backoff
  • Set appropriate timeouts for requests (30 seconds recommended)
  • Close connections properly to free resources
  • Use unique client_id for each client to enable parallel processing
  • Handle asynchronous events properly with session.read
  • Implement error handling for malformed responses
  • Log events for debugging and monitoring
  • Use appropriate image sizes for video streams (360x640 recommended)
  • Set reasonable frame rates (0.2 Hz for most use cases)
  • Monitor processing times and adjust accordingly
  • Batch similar queries when possible

Next Steps

Examples Repository

Explore complete working examples

Session Management

Monitor and manage active sessions

File Upload API

Upload files for CSV streaming

Troubleshooting

Solve common WebSocket issues