Skip to main content
The Lens Events API enables real-time control and data streaming for active lens sessions via WebSocket connections. This allows you to interact with Newton models, configure input streams, and receive real-time inference results.

Connection Setup

1. Create a Lens Session

First, create a lens session using the REST API:
curl https://api.u1.archetypeai.app/v0.5/lens/sessions/create \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b9"}'
The response includes a session_endpoint for WebSocket connection:
{
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "lens_id": "lns-fd669361822b07e2-237ab3ffd79199b9",
  "session_endpoint": "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90",
  "status": "SESSION_STATUS_CREATED"
}

2. Connect to WebSocket

import websocket
import json

session_endpoint = "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90"
socket = websocket.create_connection(
    session_endpoint, 
    header={"Authorization": f"Bearer {API_KEY}"}
)

def send_and_receive_event(socket, event_data):
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response
For JavaScript, use the Sec-WebSocket-Protocol header to pass authentication and protocol version.

Event Message Format

All event messages are JSON objects with a required type field and optional event_data field:
{
  "type": "event.type",
  "event_data": {
    // Event-specific data
  }
}

Session Events

session.status

Query the current status of the active session.
event_message = {"type": "session.status"}
response = send_and_receive_event(socket, event_message)
print(f"Session status: {response}")
Response Example:
{
  "status": "SESSION_STATUS_RUNNING",
  "session_id": "lsn-250313c2177253cbaa7a73542edd90",
  "uptime_seconds": 145.2,
  "last_activity": 1741843135.1848276
}

session.validate

Run validation checks on the current session, including health checks and model connectivity.
event_message = {"type": "session.validate"}
response = send_and_receive_event(socket, event_message)

if response.get("valid"):
    print("Session is valid and ready")
else:
    print(f"Validation failed: {response.get('errors')}")
Response Example:
{
  "valid": true,
  "checks": {
    "session_health": "pass",
    "model_connectivity": "pass",
    "sensor_nodes": "pass"
  },
  "errors": []
}

session.read

Read pending event messages from the platform, such as log messages or asynchronous inference results.
Each client should generate a unique client_id to enable parallel reading. Use a new client_id to reset message stream playback.
import uuid

client_id = str(uuid.uuid4())[:8]  # Generate unique 8-character ID

event_message = {
    "type": "session.read",
    "event_data": {"client_id": client_id}
}
response = send_and_receive_event(socket, event_message)

for message in response.get("messages", []):
    print(f"[{message['timestamp']}] {message['type']}: {message['data']}")
Response Example:
{
  "messages": [
    {
      "timestamp": 1741843200.123,
      "type": "inference.result",
      "data": {
        "result": "A construction worker wearing a hard hat and safety vest",
        "confidence": 0.95,
        "processing_time_ms": 245
      }
    },
    {
      "timestamp": 1741843201.456,
      "type": "log.info",
      "data": "Frame processed successfully"
    }
  ],
  "client_id": "a1b2c3d4"
}

session.destroy

Destroy the active session directly via the control stream.
event_message = {"type": "session.destroy"}
response = send_and_receive_event(socket, event_message)
print(f"Session destroyed: {response}")

# Close WebSocket connection
socket.close()

Input Stream Configuration

input_stream.set

Configure input data streams to automatically feed data through the lens.

RTSP Video Stream

Connect a real-time video camera using RTSP protocol:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "rtsp_video_streamer",
        "stream_config": {
            "rtsp_url": "rtsp://camera.example.com:554/stream",
            "target_image_size": [360, 640],
            "target_frame_rate_hz": 0.2
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"RTSP stream configured: {response}")

CSV File Reader

Connect a CSV file for data streaming:
event_message = {
    "type": "input_stream.set",
    "event_data": {
        "stream_type": "csv_file_reader",
        "stream_config": {
            "file_id": "jena_climate_2009_2016.csv"
        }
    }
}
response = send_and_receive_event(socket, event_message)
print(f"CSV stream configured: {response}")

Available Input Stream Types

Description: Real-time video streaming from RTSP camerasConfiguration Parameters:
  • rtsp_url (string): RTSP stream URL
  • target_image_size (array): [height, width] in pixels
  • target_frame_rate_hz (float): Frames per second to process
Use Cases: Security cameras, live monitoring, real-time analysis
Description: Stream data from uploaded CSV filesConfiguration Parameters:
  • file_id (string): ID of uploaded CSV file
Use Cases: Time series analysis, batch data processing, historical data analysis
Description: Direct sensor data streamingConfiguration Parameters:
  • Configuration varies by sensor type
Use Cases: IoT sensors, accelerometers, temperature sensors

Direct Model Queries

model.query

Send direct queries to Newton models with custom data payloads.

Image QA Example

import base64

# Load and encode image
with open("construction_site.jpg", "rb") as img_handle:
    base64_img = base64.b64encode(img_handle.read()).decode("utf-8")

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c1_6_241117d4362cc9",
        "template_name": "image_qa_template_task",
        "instruction": "Answer the following question about the image:",
        "focus": "Describe the safety equipment visible in the image.",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Newton response: {response['result']}")

Object Detection Example

event_message = {
    "type": "model.query",
    "event_data": {
        "model_version": "Newton::c1_6_241117d4362cc9",
        "template_name": "image_bbox_template_task",
        "instruction": "Localize all objects in the picture and build bounding box around them.",
        "focus": "Input: [person,car,hard_hat,safety_vest]",
        "max_new_tokens": 512,
        "data": [{
            "type": "base64_img",
            "base64_img": base64_img
        }],
        "sensor_metadata": {}
    }
}

response = send_and_receive_event(socket, event_message)
print(f"Detected objects: {response['result']}")

Newton Model Templates

Available Templates

Template Name: image_qa_template_taskPurpose: Generate narrative descriptions of imagesParameters:
  • instruction: Question or instruction for the model
  • focus: Specific aspect to focus on (e.g., “safety equipment”, “project status”)
  • max_new_tokens: Maximum response length (typically 512)
Output: Natural language descriptionExample Response:
{
  "result": "The image shows construction workers wearing yellow hard hats and high-visibility safety vests. Safety equipment is properly worn according to regulations.",
  "confidence": 0.92,
  "processing_time_ms": 340
}

Error Handling

Handle WebSocket errors and connection issues:
import websocket
import json
import time

def robust_websocket_connection(session_endpoint, api_key):
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            socket = websocket.create_connection(
                session_endpoint,
                header={"Authorization": f"Bearer {api_key}"},
                timeout=30
            )
            return socket
        except Exception as e:
            retry_count += 1
            print(f"Connection attempt {retry_count} failed: {e}")
            if retry_count < max_retries:
                time.sleep(2 ** retry_count)  # Exponential backoff
            else:
                raise e

def safe_send_and_receive(socket, event_data, timeout=30):
    try:
        socket.settimeout(timeout)
        socket.send_binary(json.dumps(event_data).encode())
        response = json.loads(socket.recv())
        return response
    except websocket.WebSocketTimeoutException:
        raise Exception("WebSocket timeout - no response received")
    except websocket.WebSocketConnectionClosedException:
        raise Exception("WebSocket connection closed unexpectedly")

Best Practices

  • Always authenticate WebSocket connections with your API key
  • Implement connection retry logic with exponential backoff
  • Set appropriate timeouts for requests (30 seconds recommended)
  • Close connections properly to free resources
  • Use unique client_id for each client to enable parallel processing
  • Handle asynchronous events properly with session.read
  • Implement error handling for malformed responses
  • Log events for debugging and monitoring
  • Use appropriate image sizes for video streams (360x640 recommended)
  • Set reasonable frame rates (0.2 Hz for most use cases)
  • Monitor processing times and adjust accordingly
  • Batch similar queries when possible

Next Steps