The Lens Events API enables real-time control and data streaming for active lens sessions via WebSocket connections. This allows you to interact with Newton models, configure input streams, and receive real-time inference results.
Connection Setup
1. Create a Lens Session
First, create a lens session using the REST API:
curl https://api.u1.archetypeai.app/v0.5/lens/sessions/create \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b9"}'
The response includes a session_endpoint for WebSocket connection:
{
"session_id" : "lsn-250313c2177253cbaa7a73542edd90" ,
"lens_id" : "lns-fd669361822b07e2-237ab3ffd79199b9" ,
"session_endpoint" : "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90" ,
"status" : "SESSION_STATUS_CREATED"
}
2. Connect to WebSocket
import websocket
import json
session_endpoint = "wss://api.archetypeai.dev/v0.5/lens/sessions/lsn-250313c2177253cbaa7a73542edd90"
socket = websocket.create_connection(
session_endpoint,
header = { "Authorization" : f "Bearer { API_KEY } " }
)
def send_and_receive_event ( socket , event_data ):
socket.send_binary(json.dumps(event_data).encode())
response = json.loads(socket.recv())
return response
For JavaScript, use the Sec-WebSocket-Protocol header to pass authentication and protocol version.
All event messages are JSON objects with a required type field and optional event_data field:
{
"type" : "event.type" ,
"event_data" : {
// Event-specific data
}
}
Session Events
session.status
Query the current status of the active session.
event_message = { "type" : "session.status" }
response = send_and_receive_event(socket, event_message)
print ( f "Session status: { response } " )
Response Example:
{
"status" : "SESSION_STATUS_RUNNING" ,
"session_id" : "lsn-250313c2177253cbaa7a73542edd90" ,
"uptime_seconds" : 145.2 ,
"last_activity" : 1741843135.1848276
}
session.validate
Run validation checks on the current session, including health checks and model connectivity.
event_message = { "type" : "session.validate" }
response = send_and_receive_event(socket, event_message)
if response.get( "valid" ):
print ( "Session is valid and ready" )
else :
print ( f "Validation failed: { response.get( 'errors' ) } " )
Response Example:
{
"valid" : true ,
"checks" : {
"session_health" : "pass" ,
"model_connectivity" : "pass" ,
"sensor_nodes" : "pass"
},
"errors" : []
}
session.read
Read pending event messages from the platform, such as log messages or asynchronous inference results.
Each client should generate a unique client_id to enable parallel reading. Use a new client_id to reset message stream playback.
import uuid
client_id = str (uuid.uuid4())[: 8 ] # Generate unique 8-character ID
event_message = {
"type" : "session.read" ,
"event_data" : { "client_id" : client_id}
}
response = send_and_receive_event(socket, event_message)
for message in response.get( "messages" , []):
print ( f "[ { message[ 'timestamp' ] } ] { message[ 'type' ] } : { message[ 'data' ] } " )
Response Example:
{
"messages" : [
{
"timestamp" : 1741843200.123 ,
"type" : "inference.result" ,
"data" : {
"result" : "A construction worker wearing a hard hat and safety vest" ,
"confidence" : 0.95 ,
"processing_time_ms" : 245
}
},
{
"timestamp" : 1741843201.456 ,
"type" : "log.info" ,
"data" : "Frame processed successfully"
}
],
"client_id" : "a1b2c3d4"
}
session.destroy
Destroy the active session directly via the control stream.
event_message = { "type" : "session.destroy" }
response = send_and_receive_event(socket, event_message)
print ( f "Session destroyed: { response } " )
# Close WebSocket connection
socket.close()
Configure input data streams to automatically feed data through the lens.
RTSP Video Stream
Connect a real-time video camera using RTSP protocol:
event_message = {
"type" : "input_stream.set" ,
"event_data" : {
"stream_type" : "rtsp_video_streamer" ,
"stream_config" : {
"rtsp_url" : "rtsp://camera.example.com:554/stream" ,
"target_image_size" : [ 360 , 640 ],
"target_frame_rate_hz" : 0.2
}
}
}
response = send_and_receive_event(socket, event_message)
print ( f "RTSP stream configured: { response } " )
CSV File Reader
Connect a CSV file for data streaming:
event_message = {
"type" : "input_stream.set" ,
"event_data" : {
"stream_type" : "csv_file_reader" ,
"stream_config" : {
"file_id" : "jena_climate_2009_2016.csv"
}
}
}
response = send_and_receive_event(socket, event_message)
print ( f "CSV stream configured: { response } " )
Description : Real-time video streaming from RTSP camerasConfiguration Parameters :
rtsp_url (string): RTSP stream URL
target_image_size (array): [height, width] in pixels
target_frame_rate_hz (float): Frames per second to process
Use Cases : Security cameras, live monitoring, real-time analysis
Description : Stream data from uploaded CSV filesConfiguration Parameters :
file_id (string): ID of uploaded CSV file
Use Cases : Time series analysis, batch data processing, historical data analysis
Description : Direct sensor data streamingConfiguration Parameters :
Configuration varies by sensor type
Use Cases : IoT sensors, accelerometers, temperature sensors
Direct Model Queries
model.query
Send direct queries to Newton models with custom data payloads.
Image QA Example
import base64
# Load and encode image
with open ( "construction_site.jpg" , "rb" ) as img_handle:
base64_img = base64.b64encode(img_handle.read()).decode( "utf-8" )
event_message = {
"type" : "model.query" ,
"event_data" : {
"model_version" : "Newton::c1_6_241117d4362cc9" ,
"template_name" : "image_qa_template_task" ,
"instruction" : "Answer the following question about the image:" ,
"focus" : "Describe the safety equipment visible in the image." ,
"max_new_tokens" : 512 ,
"data" : [{
"type" : "base64_img" ,
"base64_img" : base64_img
}],
"sensor_metadata" : {}
}
}
response = send_and_receive_event(socket, event_message)
print ( f "Newton response: { response[ 'result' ] } " )
Object Detection Example
event_message = {
"type" : "model.query" ,
"event_data" : {
"model_version" : "Newton::c1_6_241117d4362cc9" ,
"template_name" : "image_bbox_template_task" ,
"instruction" : "Localize all objects in the picture and build bounding box around them." ,
"focus" : "Input: [person,car,hard_hat,safety_vest]" ,
"max_new_tokens" : 512 ,
"data" : [{
"type" : "base64_img" ,
"base64_img" : base64_img
}],
"sensor_metadata" : {}
}
}
response = send_and_receive_event(socket, event_message)
print ( f "Detected objects: { response[ 'result' ] } " )
Newton Model Templates
Available Templates
Template Name : image_qa_template_taskPurpose : Generate narrative descriptions of imagesParameters :
instruction: Question or instruction for the model
focus: Specific aspect to focus on (e.g., “safety equipment”, “project status”)
max_new_tokens: Maximum response length (typically 512)
Output : Natural language descriptionExample Response :{
"result" : "The image shows construction workers wearing yellow hard hats and high-visibility safety vests. Safety equipment is properly worn according to regulations." ,
"confidence" : 0.92 ,
"processing_time_ms" : 340
}
Template Name : image_bbox_template_taskPurpose : Detect and localize objects with bounding boxesParameters :
instruction: Object detection instruction
focus: List of objects to detect (e.g., “Input: [person,car,dog]”)
max_new_tokens: Maximum response length
Output : Structured bounding box coordinatesExample Response :{
"result" : "person: [0.15, 0.20, 0.45, 0.80] \n hard_hat: [0.25, 0.15, 0.35, 0.25] \n safety_vest: [0.20, 0.35, 0.40, 0.65]" ,
"confidence" : 0.88 ,
"processing_time_ms" : 280
}
Error Handling
Handle WebSocket errors and connection issues:
import websocket
import json
import time
def robust_websocket_connection ( session_endpoint , api_key ):
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try :
socket = websocket.create_connection(
session_endpoint,
header = { "Authorization" : f "Bearer { api_key } " },
timeout = 30
)
return socket
except Exception as e:
retry_count += 1
print ( f "Connection attempt { retry_count } failed: { e } " )
if retry_count < max_retries:
time.sleep( 2 ** retry_count) # Exponential backoff
else :
raise e
def safe_send_and_receive ( socket , event_data , timeout = 30 ):
try :
socket.settimeout(timeout)
socket.send_binary(json.dumps(event_data).encode())
response = json.loads(socket.recv())
return response
except websocket.WebSocketTimeoutException:
raise Exception ( "WebSocket timeout - no response received" )
except websocket.WebSocketConnectionClosedException:
raise Exception ( "WebSocket connection closed unexpectedly" )
Best Practices
Always authenticate WebSocket connections with your API key
Implement connection retry logic with exponential backoff
Set appropriate timeouts for requests (30 seconds recommended)
Close connections properly to free resources
Use unique client_id for each client to enable parallel processing
Handle asynchronous events properly with session.read
Implement error handling for malformed responses
Log events for debugging and monitoring
Next Steps