Skip to main content
Complete working examples for common use cases using the Lens Events API.
Prerequisites: Set your API key as an environment variable:
export ATAI_API_KEY="your-api-key-here"

CSV File Streaming Example

Stream data from a CSV file through a lens for time-series analysis.

Step 1: Download Sample Data

# Download example CSV file
curl -L -o ~/Downloads/jena-climate.zip \
  https://www.kaggle.com/api/v1/datasets/download/mnassrib/jena-climate

# Extract the CSV file
unzip ~/Downloads/jena-climate.zip

Step 2: Upload CSV File

import requests
import argparse

def upload_csv_file(api_key, filename):
    """Upload a CSV file to the Newton platform."""
    
    with open(filename, 'rb') as f:
        response = requests.post(
            'https://api.u1.archetypeai.app/v0.5/files/upload',
            headers={'Authorization': f'Bearer {api_key}'},
            files={'file': f},
            data={
                'description': f'Climate data CSV: {filename}',
                'tags': ['climate', 'time-series', 'example']
            }
        )
    
    if response.status_code == 200:
        file_data = response.json()
        print(f"✅ File uploaded successfully!")
        print(f"   File ID: {file_data['file_id']}")
        print(f"   Size: {file_data['size_bytes'] / 1024 / 1024:.2f} MB")
        return file_data['file_id']
    else:
        print(f"❌ Upload failed: {response.status_code}")
        print(response.text)
        return None

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--filename", required=True, help="CSV file to upload")
    args = parser.parse_args()
    
    file_id = upload_csv_file(args.api_key, args.filename)
    if file_id:
        print(f"\n🎯 Use this file_id in your lens streaming: {file_id}")

Step 3: Stream CSV Through Lens

import websocket
import json
import argparse
import time
import requests

def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
    """Create a new lens session."""
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": lens_id}
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to create session: {response.text}")

def send_and_receive_event(socket, event_data):
    """Send event and wait for response."""
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response

def stream_csv_data(api_key, file_id, max_run_time_sec=60):
    """Stream CSV data through a Newton lens."""
    
    print("🚀 Creating lens session...")
    session = create_lens_session(api_key)
    print(f"   Session ID: {session['session_id']}")
    
    # Connect to WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    print("   Connected!")
    
    try:
        # Validate session
        print("✅ Validating session...")
        response = send_and_receive_event(socket, {"type": "session.validate"})
        if not response.get("valid"):
            raise Exception(f"Session validation failed: {response}")
        print("   Session is valid!")
        
        # Configure CSV input stream
        print(f"📊 Configuring CSV stream for file: {file_id}")
        event_message = {
            "type": "input_stream.set",
            "event_data": {
                "stream_type": "csv_file_reader",
                "stream_config": {
                    "file_id": file_id
                }
            }
        }
        response = send_and_receive_event(socket, event_message)
        print(f"   Stream configured: {response}")
        
        # Read streaming results
        print("📖 Reading streaming results...")
        client_id = f"csv_client_{int(time.time())}"
        start_time = time.time()
        
        while time.time() - start_time < max_run_time_sec:
            # Read any pending messages
            read_event = {
                "type": "session.read",
                "event_data": {"client_id": client_id}
            }
            response = send_and_receive_event(socket, read_event)
            
            # Process messages
            messages = response.get("messages", [])
            for message in messages:
                timestamp = message.get("timestamp", "unknown")
                msg_type = message.get("type", "unknown")
                data = message.get("data", {})
                
                if msg_type == "inference.result":
                    print(f"🔬 [{timestamp}] Analysis: {data.get('result', 'No result')}")
                elif msg_type == "log.info":
                    print(f"ℹ️  [{timestamp}] Info: {data}")
                elif msg_type == "error":
                    print(f"❌ [{timestamp}] Error: {data}")
            
            time.sleep(2)  # Poll every 2 seconds
        
        print("⏰ Max runtime reached, stopping...")
        
    finally:
        # Clean up
        print("🧹 Cleaning up...")
        try:
            send_and_receive_event(socket, {"type": "session.destroy"})
            print("   Session destroyed")
        except:
            pass
        
        socket.close()
        print("   WebSocket closed")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--file_id", required=True, help="CSV file ID from upload")
    parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
    args = parser.parse_args()
    
    try:
        stream_csv_data(args.api_key, args.file_id, args.max_run_time_sec)
    except Exception as e:
        print(f"❌ Error: {e}")
Expected Output:
🚀 Creating lens session...
   Session ID: lsn-abc123def456
🔌 Connecting to WebSocket...
   Connected!
✅ Validating session...
   Session is valid!
📊 Configuring CSV stream for file: jena_climate_2009_2016.csv
   Stream configured: {'status': 'configured', 'stream_id': 'stream_789'}
📖 Reading streaming results...
🔬 [1741843200.123] Analysis: Temperature trend shows seasonal variation with winter lows of -10°C
🔬 [1741843202.456] Analysis: Humidity levels correlate with temperature patterns
ℹ️  [1741843204.789] Info: Processed 1000 data points
⏰ Max runtime reached, stopping...
🧹 Cleaning up...
   Session destroyed
   WebSocket closed

RTSP Camera Streaming Example

Stream real-time video from an RTSP camera for live analysis.
import websocket
import json
import argparse
import time
import requests
import uuid

def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
    """Create a new lens session for video analysis."""
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": lens_id}
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to create session: {response.text}")

def send_and_receive_event(socket, event_data):
    """Send event and wait for response."""
    socket.send_binary(json.dumps(event_data).encode())
    response = json.loads(socket.recv())
    return response

def stream_rtsp_camera(api_key, rtsp_url, max_run_time_sec=60):
    """Stream RTSP camera feed through Newton for real-time analysis."""
    
    print("🚀 Creating lens session for video analysis...")
    session = create_lens_session(api_key)
    print(f"   Session ID: {session['session_id']}")
    
    # Connect to WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    print("   Connected!")
    
    try:
        # Validate session
        print("✅ Validating session...")
        response = send_and_receive_event(socket, {"type": "session.validate"})
        if not response.get("valid"):
            raise Exception(f"Session validation failed: {response}")
        print("   Session is valid!")
        
        # Configure RTSP input stream
        print(f"📹 Configuring RTSP stream: {rtsp_url}")
        event_message = {
            "type": "input_stream.set",
            "event_data": {
                "stream_type": "rtsp_video_streamer",
                "stream_config": {
                    "rtsp_url": rtsp_url,
                    "target_image_size": [360, 640],  # [height, width]
                    "target_frame_rate_hz": 0.2       # Process 1 frame every 5 seconds
                }
            }
        }
        response = send_and_receive_event(socket, event_message)
        print(f"   RTSP stream configured: {response}")
        
        # Read streaming analysis results
        print("🔬 Reading real-time analysis...")
        client_id = str(uuid.uuid4())[:8]
        start_time = time.time()
        
        while time.time() - start_time < max_run_time_sec:
            # Read any pending messages
            read_event = {
                "type": "session.read",
                "event_data": {"client_id": client_id}
            }
            response = send_and_receive_event(socket, read_event)
            
            # Process messages
            messages = response.get("messages", [])
            for message in messages:
                timestamp = message.get("timestamp", "unknown")
                msg_type = message.get("type", "unknown")
                data = message.get("data", {})
                
                if msg_type == "inference.result":
                    result = data.get('result', 'No analysis')
                    confidence = data.get('confidence', 0)
                    processing_time = data.get('processing_time_ms', 0)
                    print(f"🎥 [{timestamp}] Analysis: {result}")
                    print(f"   Confidence: {confidence:.2f}, Processing: {processing_time}ms")
                    
                elif msg_type == "frame.processed":
                    frame_num = data.get('frame_number', 0)
                    print(f"🖼️  [{timestamp}] Frame {frame_num} processed")
                    
                elif msg_type == "stream.status":
                    status = data.get('status', 'unknown')
                    print(f"📡 [{timestamp}] Stream status: {status}")
                    
                elif msg_type == "log.info":
                    print(f"ℹ️  [{timestamp}] {data}")
                    
                elif msg_type == "error":
                    print(f"❌ [{timestamp}] Error: {data}")
            
            if not messages:
                print("⏳ Waiting for video frames...")
            
            time.sleep(3)  # Poll every 3 seconds
        
        print("⏰ Max runtime reached, stopping stream...")
        
    finally:
        # Clean up
        print("🧹 Cleaning up...")
        try:
            send_and_receive_event(socket, {"type": "session.destroy"})
            print("   Session destroyed")
        except:
            pass
        
        socket.close()
        print("   WebSocket closed")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Stream RTSP camera through Newton")
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--rtsp_url", required=True, help="RTSP camera URL")
    parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
    args = parser.parse_args()
    
    try:
        stream_rtsp_camera(args.api_key, args.rtsp_url, args.max_run_time_sec)
    except Exception as e:
        print(f"❌ Error: {e}")
Expected Output:
🚀 Creating lens session for video analysis...
   Session ID: lsn-xyz789abc123
🔌 Connecting to WebSocket...
   Connected!
✅ Validating session...
   Session is valid!
📹 Configuring RTSP stream: rtsp://123456789.stream
   RTSP stream configured: {'status': 'configured', 'stream_id': 'rtsp_456'}
🔬 Reading real-time analysis...
📡 [1741843200.123] Stream status: connected
🖼️  [1741843205.456] Frame 1 processed
🎥 [1741843205.789] Analysis: Multiple vehicles visible on the road including cars and trucks
   Confidence: 0.89, Processing: 1240ms
🖼️  [1741843210.123] Frame 2 processed  
🎥 [1741843210.456] Analysis: Traffic is flowing normally with vehicles maintaining safe distances
   Confidence: 0.92, Processing: 1180ms
⏰ Max runtime reached, stopping stream...
🧹 Cleaning up...
   Session destroyed
   WebSocket closed

Direct Image Analysis Example

Send individual images for immediate analysis without setting up streams.
import websocket
import json
import base64
import argparse
import requests

def analyze_image_direct(api_key, image_path, focus="Describe what you see"):
    """Analyze a single image using direct model query."""
    
    # Create session
    print("🚀 Creating lens session...")
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
    )
    session = response.json()
    
    # Connect WebSocket
    print("🔌 Connecting to WebSocket...")
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    
    try:
        # Load and encode image
        print(f"📸 Loading image: {image_path}")
        with open(image_path, "rb") as img_handle:
            base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
        
        # Send direct query
        print("🔬 Analyzing image...")
        event_message = {
            "type": "model.query",
            "event_data": {
                "model_version": "Newton::c1_6_241117d4362cc9",
                "template_name": "image_qa_template_task",
                "instruction": "Answer the following question about the image:",
                "focus": focus,
                "max_new_tokens": 512,
                "data": [{
                    "type": "base64_img",
                    "base64_img": base64_img
                }],
                "sensor_metadata": {}
            }
        }
        
        socket.send_binary(json.dumps(event_message).encode())
        response = json.loads(socket.recv())
        
        # Display results
        print("\n🎯 Analysis Results:")
        print("=" * 50)
        print(f"Focus: {focus}")
        print(f"Result: {response.get('result', 'No result')}")
        if 'confidence' in response:
            print(f"Confidence: {response['confidence']:.2f}")
        if 'processing_time_ms' in response:
            print(f"Processing Time: {response['processing_time_ms']}ms")
        print("=" * 50)
        
        return response
        
    finally:
        # Cleanup
        try:
            socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
            socket.recv()  # Wait for destroy confirmation
        except:
            pass
        socket.close()

def detect_objects_in_image(api_key, image_path, objects_to_find=["person", "car", "truck"]):
    """Detect specific objects in an image with bounding boxes."""
    
    # Create session
    response = requests.post(
        "https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
    )
    session = response.json()
    
    # Connect WebSocket
    socket = websocket.create_connection(
        session['session_endpoint'],
        header={"Authorization": f"Bearer {api_key}"}
    )
    
    try:
        # Load and encode image
        with open(image_path, "rb") as img_handle:
            base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
        
        # Object detection query
        objects_str = ",".join(objects_to_find)
        event_message = {
            "type": "model.query",
            "event_data": {
                "model_version": "Newton::c1_6_241117d4362cc9",
                "template_name": "image_bbox_template_task",
                "instruction": "Localize all objects in the picture and build bounding box around them.",
                "focus": f"Input: [{objects_str}]",
                "max_new_tokens": 512,
                "data": [{
                    "type": "base64_img",
                    "base64_img": base64_img
                }],
                "sensor_metadata": {}
            }
        }
        
        socket.send_binary(json.dumps(event_message).encode())
        response = json.loads(socket.recv())
        
        # Parse and display bounding boxes
        print(f"\n🎯 Object Detection Results:")
        print("=" * 50)
        result = response.get('result', '')
        if result:
            for line in result.split('\n'):
                if ':' in line:
                    obj_name, bbox = line.split(':', 1)
                    print(f"📦 {obj_name.strip()}: {bbox.strip()}")
        else:
            print("No objects detected")
        print("=" * 50)
        
        return response
        
    finally:
        # Cleanup
        try:
            socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
            socket.recv()
        except:
            pass
        socket.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--api_key", required=True, help="Your Newton API key")
    parser.add_argument("--image_path", required=True, help="Path to image file")
    parser.add_argument("--mode", choices=["describe", "detect"], default="describe", 
                       help="Analysis mode: describe or detect objects")
    parser.add_argument("--focus", default="Describe what you see in detail", 
                       help="Focus question for description mode")
    parser.add_argument("--objects", nargs="+", default=["person", "car", "truck"],
                       help="Objects to detect in detect mode")
    args = parser.parse_args()
    
    try:
        if args.mode == "describe":
            analyze_image_direct(args.api_key, args.image_path, args.focus)
        else:
            detect_objects_in_image(args.api_key, args.image_path, args.objects)
    except Exception as e:
        print(f"❌ Error: {e}")

Repository and Additional Examples

Common Use Cases

Application: Real-time security camera analysisSetup: RTSP camera stream with person/vehicle detectionFocus Examples:
  • “Detect unauthorized personnel”
  • “Monitor for safety violations”
  • “Count people in restricted areas”