Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.archetypeai.app/llms.txt

Use this file to discover all available pages before exploring further.

Complete working examples for common use cases using the Lens Events API.
Prerequisites: Set your API key as an environment variable:
export ATAI_API_KEY="your-api-key-here"

CSV File Streaming Example

Stream data from a CSV file through a lens for time-series analysis.

Step 1: Download Sample Data

# Download example CSV file
curl -L -o ~/Downloads/jena-climate.zip \
  https://www.kaggle.com/api/v1/datasets/download/mnassrib/jena-climate

# Extract the CSV file
unzip ~/Downloads/jena-climate.zip

Step 2: Upload CSV File

import requests
import argparse

def upload_csv_file(api_key, filename):
    """Upload a CSV file to the Newton platform."""
    
    with open(filename, 'rb') as f:
        response = requests.post(
            'https://api.u1.archetypeai.app/v0.5/files/upload',
            headers={'Authorization': f'Bearer {api_key}'},
            files={'file': f},
            data={
                'description': f'Climate data CSV: {filename}',
                'tags': ['climate', 'time-series', 'example']
            }
        )
    
    if response.status_code == 200:
        file_data = response.json()
        print(f"✅ File uploaded successfully!")
        print(f"   File ID: {file_data['file_id']}")
        print(f"   Size: {file_data['size_bytes'] / 1024 / 1024:.2f} MB")
        return file_data['file_id']
    else:
        print(f"❌ Upload failed: {response.status_code}")
        print(response.text)
        return None

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--filename", required=True, help="CSV file to upload")
    args = parser.parse_args()
    
    file_id = upload_csv_file(args.api_key, args.filename)
    if file_id:
        print(f"\n🎯 Use this file_id in your lens streaming: {file_id}")

Step 3: Stream CSV Through Lens

import websocket
import json
import argparse
import time
import requests

def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
    """Create a new lens session."""
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": lens_id}
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to create session: {response.text}")

def send_and_receive_event(socket, event_data):
    """Send event and wait for response."""
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response

def stream_csv_data(api_key, file_id, max_run_time_sec=60):
    """Stream CSV data through a Newton lens."""
    
    print("🚀 Creating lens session...")
    session = create_lens_session(api_key)
    print(f"   Session ID: {session['session_id']}")
    
    # Connect to WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    print("   Connected!")
    
    try:
        # Validate session
        print("✅ Validating session...")
        response = send_and_receive_event(socket, {"type": "session.validate"})
        if not response.get("valid"):
            raise Exception(f"Session validation failed: {response}")
        print("   Session is valid!")
        
        # Configure CSV input stream
        print(f"📊 Configuring CSV stream for file: {file_id}")
        event_message = {
            "type": "input_stream.set",
            "event_data": {
                "stream_type": "csv_file_reader",
                "stream_config": {
                    "file_id": file_id
                }
            }
        }
        response = send_and_receive_event(socket, event_message)
        print(f"   Stream configured: {response}")
        
        # Read streaming results
        print("📖 Reading streaming results...")
        client_id = f"csv_client_{int(time.time())}"
        start_time = time.time()
        
        while time.time() - start_time < max_run_time_sec:
            # Read any pending messages
            read_event = {
                "type": "session.read",
                "event_data": {"client_id": client_id}
            }
            response = send_and_receive_event(socket, read_event)
            
            # Process messages
            messages = response.get("messages", [])
            for message in messages:
                timestamp = message.get("timestamp", "unknown")
                msg_type = message.get("type", "unknown")
                data = message.get("data", {})
                
                if msg_type == "inference.result":
                    print(f"🔬 [{timestamp}] Analysis: {data.get('result', 'No result')}")
                elif msg_type == "log.info":
                    print(f"ℹ️  [{timestamp}] Info: {data}")
                elif msg_type == "error":
                    print(f"❌ [{timestamp}] Error: {data}")
            
            time.sleep(2)  # Poll every 2 seconds
        
        print("⏰ Max runtime reached, stopping...")
        
    finally:
        # Clean up
        print("🧹 Cleaning up...")
        try:
            send_and_receive_event(socket, {"type": "session.destroy"})
            print("   Session destroyed")
        except:
            pass
        
        socket.close()
        print("   WebSocket closed")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--file_id", required=True, help="CSV file ID from upload")
    parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
    args = parser.parse_args()
    
    try:
        stream_csv_data(args.api_key, args.file_id, args.max_run_time_sec)
    except Exception as e:
        print(f"❌ Error: {e}")
What you should expect:
  • Creating lens session… then Connecting to WebSocket… → both succeed in under 1s.
  • Validating session… → Session is valid! returns {"type": "session.validate.response", "event_data": {"is_valid": true, "error_messages": []}}.
  • Configuring CSV stream… → Stream configured: {'type': 'input_stream.set.response', 'event_data': {'is_valid': true, 'error_messages': []}}.
  • Reading streaming results… → the content depends entirely on the lens you connect the stream to. A lens whose pipeline matches the CSV’s schema (column names, sampling rate) will emit inference.result messages over time; an incompatible lens will return {"type": "session.read.response", "event_data": null} on each poll. Confirm the lens’s expected schema from GET /lens/metadata before running.

RTSP Camera Streaming Example

Stream real-time video from an RTSP camera for live analysis.
import websocket
import json
import argparse
import time
import requests
import uuid

def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
    """Create a new lens session for video analysis."""
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": lens_id}
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to create session: {response.text}")

def send_and_receive_event(socket, event_data):
    """Send event and wait for response."""
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response

def stream_rtsp_camera(api_key, rtsp_url, max_run_time_sec=60):
    """Stream RTSP camera feed through Newton for real-time analysis."""
    
    print("🚀 Creating lens session for video analysis...")
    session = create_lens_session(api_key)
    print(f"   Session ID: {session['session_id']}")
    
    # Connect to WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    print("   Connected!")
    
    try:
        # Validate session
        print("✅ Validating session...")
        response = send_and_receive_event(socket, {"type": "session.validate"})
        if not response.get("valid"):
            raise Exception(f"Session validation failed: {response}")
        print("   Session is valid!")
        
        # Configure RTSP input stream
        print(f"📹 Configuring RTSP stream: {rtsp_url}")
        event_message = {
            "type": "input_stream.set",
            "event_data": {
                "stream_type": "rtsp_video_streamer",
                "stream_config": {
                    "rtsp_url": rtsp_url,
                    "target_image_size": [360, 640],  # [height, width]
                    "target_frame_rate_hz": 0.2       # Process 1 frame every 5 seconds
                }
            }
        }
        response = send_and_receive_event(socket, event_message)
        print(f"   RTSP stream configured: {response}")
        
        # Read streaming analysis results
        print("🔬 Reading real-time analysis...")
        client_id = str(uuid.uuid4())[:8]
        start_time = time.time()
        
        while time.time() - start_time < max_run_time_sec:
            # Read any pending messages
            read_event = {
                "type": "session.read",
                "event_data": {"client_id": client_id}
            }
            response = send_and_receive_event(socket, read_event)
            
            # Process messages
            messages = response.get("messages", [])
            for message in messages:
                timestamp = message.get("timestamp", "unknown")
                msg_type = message.get("type", "unknown")
                data = message.get("data", {})
                
                if msg_type == "inference.result":
                    result = data.get('result', 'No analysis')
                    confidence = data.get('confidence', 0)
                    processing_time = data.get('processing_time_ms', 0)
                    print(f"🎥 [{timestamp}] Analysis: {result}")
                    print(f"   Confidence: {confidence:.2f}, Processing: {processing_time}ms")
                    
                elif msg_type == "frame.processed":
                    frame_num = data.get('frame_number', 0)
                    print(f"🖼️  [{timestamp}] Frame {frame_num} processed")
                    
                elif msg_type == "stream.status":
                    status = data.get('status', 'unknown')
                    print(f"📡 [{timestamp}] Stream status: {status}")
                    
                elif msg_type == "log.info":
                    print(f"ℹ️  [{timestamp}] {data}")
                    
                elif msg_type == "error":
                    print(f"❌ [{timestamp}] Error: {data}")
            
            if not messages:
                print("⏳ Waiting for video frames...")
            
            time.sleep(3)  # Poll every 3 seconds
        
        print("⏰ Max runtime reached, stopping stream...")
        
    finally:
        # Clean up
        print("🧹 Cleaning up...")
        try:
            send_and_receive_event(socket, {"type": "session.destroy"})
            print("   Session destroyed")
        except:
            pass
        
        socket.close()
        print("   WebSocket closed")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Stream RTSP camera through Newton")
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--rtsp_url", required=True, help="RTSP camera URL")
    parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
    args = parser.parse_args()
    
    try:
        stream_rtsp_camera(args.api_key, args.rtsp_url, args.max_run_time_sec)
    except Exception as e:
        print(f"❌ Error: {e}")
What you should expect:
  • Session creation, WebSocket connection, and session.validate all succeed (same envelope shapes as the CSV example above).
  • RTSP stream configured: {'type': 'input_stream.set.response', 'event_data': {'is_valid': true, ...}}.
  • inference.result payloads arrive on session.read polls — typical content is a short natural-language description of the latest frame batch (no confidence or processing_time_ms fields are guaranteed by the model). Frequency depends on the lens’s camera_buffer_size and the target_frame_rate_hz you configured on the stream.
  • An unreachable RTSP URL surfaces as an error message in subsequent session.read polls rather than a synchronous error on input_stream.set.

Direct Image Analysis Example

Send individual images for immediate analysis without setting up streams.
model.query only works against lenses whose pipeline includes a model-query processor (i.e. not the Activity Monitor — which uses lens_camera_processor and expects camera input). If your model.query returns {"type": "model.query.response", "message": "Response timed out for query"}, the lens is not configured to handle direct queries. Register a model-query lens first via POST /lens/register or use a streaming approach with input_stream.set instead.
import websocket
import json
import base64
import argparse
import requests

def analyze_image_direct(api_key, image_path, focus="Describe what you see"):
    """Analyze a single image using direct model query."""
    
    # Create session
    print("🚀 Creating lens session...")
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
    )
    session = response.json()
    
    # Connect WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    
    try:
        # Load and encode image
        print(f"📸 Loading image: {image_path}")
        with open(image_path, "rb") as img_handle:
            base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
        
        # Send direct query
        print("🔬 Analyzing image...")
        event_message = {
            "type": "model.query",
            "event_data": {
                "model_version": "Newton::c2_4_7b_251215a172f6d7",
                "template_name": "image_qa_template_task",
                "instruction": "Answer the following question about the image:",
                "focus": focus,
                "max_new_tokens": 512,
                "data": [{
                    "type": "base64_img",
                    "base64_img": base64_img
                }],
                "sensor_metadata": {}
            }
        }
        
        socket.send_binary(json.dumps(event_message).encode())
        response = json.loads(socket.recv())
        
        # Display results (model.query responses are wrapped in event_data)
        result_data = response.get('event_data') or {}
        print("\n🎯 Analysis Results:")
        print("=" * 50)
        print(f"Focus: {focus}")
        print(f"Result: {result_data.get('result', 'No result')}")
        print("=" * 50)
        
        return response
        
    finally:
        # Cleanup
        try:
            socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
            socket.recv()  # Wait for destroy confirmation
        except:
            pass
        socket.close()

def detect_objects_in_image(api_key, image_path, objects_to_find=["person", "car", "truck"]):
    """Detect specific objects in an image with bounding boxes."""
    
    # Create session
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
    )
    session = response.json()
    
    # Connect WebSocket
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    
    try:
        # Load and encode image
        with open(image_path, "rb") as img_handle:
            base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
        
        # Object detection query
        objects_str = ",".join(objects_to_find)
        event_message = {
            "type": "model.query",
            "event_data": {
                "model_version": "Newton::c2_4_7b_251215a172f6d7",
                "template_name": "image_bbox_template_task",
                "instruction": "Localize all objects in the picture and build bounding box around them.",
                "focus": f"Input: [{objects_str}]",
                "max_new_tokens": 512,
                "data": [{
                    "type": "base64_img",
                    "base64_img": base64_img
                }],
                "sensor_metadata": {}
            }
        }
        
        socket.send_binary(json.dumps(event_message).encode())
        response = json.loads(socket.recv())
        
        # Parse and display bounding boxes (model.query response is wrapped in event_data)
        print(f"\n🎯 Object Detection Results:")
        print("=" * 50)
        result = (response.get('event_data') or {}).get('result', '')
        if result:
            for line in result.split('\n'):
                if ':' in line:
                    obj_name, bbox = line.split(':', 1)
                    print(f"📦 {obj_name.strip()}: {bbox.strip()}")
        else:
            print("No objects detected")
        print("=" * 50)
        
        return response
        
    finally:
        # Cleanup
        try:
            socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
            socket.recv()
        except:
            pass
        socket.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--image_path", required=True, help="Path to image file")
    parser.add_argument("--mode", choices=["describe", "detect"], default="describe", 
                       help="Analysis mode: describe or detect objects")
    parser.add_argument("--focus", default="Describe what you see in detail", 
                       help="Focus question for description mode")
    parser.add_argument("--objects", nargs="+", default=["person", "car", "truck"],
                       help="Objects to detect in detect mode")
    args = parser.parse_args()
    
    try:
        if args.mode == "describe":
            analyze_image_direct(args.api_key, args.image_path, args.focus)
        else:
            detect_objects_in_image(args.api_key, args.image_path, args.objects)
    except Exception as e:
        print(f"❌ Error: {e}")

Repository and Additional Examples

Python Client Examples

Complete example repository with more use cases

WebSocket Events API

Detailed WebSocket event documentation

File Upload API

Upload files for CSV streaming

Troubleshooting

Common issues and solutions

Common Use Cases

Application: Real-time security camera analysisSetup: RTSP camera stream with person/vehicle detectionFocus Examples:
  • “Detect unauthorized personnel”
  • “Monitor for safety violations”
  • “Count people in restricted areas”