Prerequisites: Set your API key as an environment variable:
Report incorrect code
Copy
Ask AI
export ATAI_API_KEY="your-api-key-here"
CSV File Streaming Example
Stream data from a CSV file through a lens for time-series analysis.Step 1: Download Sample Data
Report incorrect code
Copy
Ask AI
# Download example CSV file
curl -L -o ~/Downloads/jena-climate.zip \
https://www.kaggle.com/api/v1/datasets/download/mnassrib/jena-climate
# Extract the CSV file
unzip ~/Downloads/jena-climate.zip
Step 2: Upload CSV File
Report incorrect code
Copy
Ask AI
import requests
import argparse
def upload_csv_file(api_key, filename):
"""Upload a CSV file to the Newton platform."""
with open(filename, 'rb') as f:
response = requests.post(
'https://api.u1.archetypeai.app/v0.5/files/upload',
headers={'Authorization': f'Bearer {api_key}'},
files={'file': f},
data={
'description': f'Climate data CSV: {filename}',
'tags': ['climate', 'time-series', 'example']
}
)
if response.status_code == 200:
file_data = response.json()
print(f"✅ File uploaded successfully!")
print(f" File ID: {file_data['file_id']}")
print(f" Size: {file_data['size_bytes'] / 1024 / 1024:.2f} MB")
return file_data['file_id']
else:
print(f"❌ Upload failed: {response.status_code}")
print(response.text)
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--api_key", required=True, help="Your Newton API key")
parser.add_argument("--filename", required=True, help="CSV file to upload")
args = parser.parse_args()
file_id = upload_csv_file(args.api_key, args.filename)
if file_id:
print(f"\n🎯 Use this file_id in your lens streaming: {file_id}")
Step 3: Stream CSV Through Lens
Report incorrect code
Copy
Ask AI
import websocket
import json
import argparse
import time
import requests
def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
"""Create a new lens session."""
response = requests.post(
"https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={"lens_id": lens_id}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to create session: {response.text}")
def send_and_receive_event(socket, event_data):
"""Send event and wait for response."""
socket.send_binary(json.dumps(event_data).encode())
response = json.loads(socket.recv())
return response
def stream_csv_data(api_key, file_id, max_run_time_sec=60):
"""Stream CSV data through a Newton lens."""
print("🚀 Creating lens session...")
session = create_lens_session(api_key)
print(f" Session ID: {session['session_id']}")
# Connect to WebSocket
print("🔌 Connecting to WebSocket...")
socket = websocket.create_connection(
session['session_endpoint'],
header={"Authorization": f"Bearer {api_key}"}
)
print(" Connected!")
try:
# Validate session
print("✅ Validating session...")
response = send_and_receive_event(socket, {"type": "session.validate"})
if not response.get("valid"):
raise Exception(f"Session validation failed: {response}")
print(" Session is valid!")
# Configure CSV input stream
print(f"📊 Configuring CSV stream for file: {file_id}")
event_message = {
"type": "input_stream.set",
"event_data": {
"stream_type": "csv_file_reader",
"stream_config": {
"file_id": file_id
}
}
}
response = send_and_receive_event(socket, event_message)
print(f" Stream configured: {response}")
# Read streaming results
print("📖 Reading streaming results...")
client_id = f"csv_client_{int(time.time())}"
start_time = time.time()
while time.time() - start_time < max_run_time_sec:
# Read any pending messages
read_event = {
"type": "session.read",
"event_data": {"client_id": client_id}
}
response = send_and_receive_event(socket, read_event)
# Process messages
messages = response.get("messages", [])
for message in messages:
timestamp = message.get("timestamp", "unknown")
msg_type = message.get("type", "unknown")
data = message.get("data", {})
if msg_type == "inference.result":
print(f"🔬 [{timestamp}] Analysis: {data.get('result', 'No result')}")
elif msg_type == "log.info":
print(f"ℹ️ [{timestamp}] Info: {data}")
elif msg_type == "error":
print(f"❌ [{timestamp}] Error: {data}")
time.sleep(2) # Poll every 2 seconds
print("⏰ Max runtime reached, stopping...")
finally:
# Clean up
print("🧹 Cleaning up...")
try:
send_and_receive_event(socket, {"type": "session.destroy"})
print(" Session destroyed")
except:
pass
socket.close()
print(" WebSocket closed")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--api_key", required=True, help="Your Newton API key")
parser.add_argument("--file_id", required=True, help="CSV file ID from upload")
parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
args = parser.parse_args()
try:
stream_csv_data(args.api_key, args.file_id, args.max_run_time_sec)
except Exception as e:
print(f"❌ Error: {e}")
Report incorrect code
Copy
Ask AI
🚀 Creating lens session...
Session ID: lsn-abc123def456
🔌 Connecting to WebSocket...
Connected!
✅ Validating session...
Session is valid!
📊 Configuring CSV stream for file: jena_climate_2009_2016.csv
Stream configured: {'status': 'configured', 'stream_id': 'stream_789'}
📖 Reading streaming results...
🔬 [1741843200.123] Analysis: Temperature trend shows seasonal variation with winter lows of -10°C
🔬 [1741843202.456] Analysis: Humidity levels correlate with temperature patterns
ℹ️ [1741843204.789] Info: Processed 1000 data points
⏰ Max runtime reached, stopping...
🧹 Cleaning up...
Session destroyed
WebSocket closed
RTSP Camera Streaming Example
Stream real-time video from an RTSP camera for live analysis.Report incorrect code
Copy
Ask AI
import websocket
import json
import argparse
import time
import requests
import uuid
def create_lens_session(api_key, lens_id="lns-fd669361822b07e2-237ab3ffd79199b0"):
"""Create a new lens session for video analysis."""
response = requests.post(
"https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={"lens_id": lens_id}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to create session: {response.text}")
def send_and_receive_event(socket, event_data):
"""Send event and wait for response."""
socket.send_binary(json.dumps(event_data).encode())
response = json.loads(socket.recv())
return response
def stream_rtsp_camera(api_key, rtsp_url, max_run_time_sec=60):
"""Stream RTSP camera feed through Newton for real-time analysis."""
print("🚀 Creating lens session for video analysis...")
session = create_lens_session(api_key)
print(f" Session ID: {session['session_id']}")
# Connect to WebSocket
print("🔌 Connecting to WebSocket...")
socket = websocket.create_connection(
session['session_endpoint'],
header={"Authorization": f"Bearer {api_key}"}
)
print(" Connected!")
try:
# Validate session
print("✅ Validating session...")
response = send_and_receive_event(socket, {"type": "session.validate"})
if not response.get("valid"):
raise Exception(f"Session validation failed: {response}")
print(" Session is valid!")
# Configure RTSP input stream
print(f"📹 Configuring RTSP stream: {rtsp_url}")
event_message = {
"type": "input_stream.set",
"event_data": {
"stream_type": "rtsp_video_streamer",
"stream_config": {
"rtsp_url": rtsp_url,
"target_image_size": [360, 640], # [height, width]
"target_frame_rate_hz": 0.2 # Process 1 frame every 5 seconds
}
}
}
response = send_and_receive_event(socket, event_message)
print(f" RTSP stream configured: {response}")
# Read streaming analysis results
print("🔬 Reading real-time analysis...")
client_id = str(uuid.uuid4())[:8]
start_time = time.time()
while time.time() - start_time < max_run_time_sec:
# Read any pending messages
read_event = {
"type": "session.read",
"event_data": {"client_id": client_id}
}
response = send_and_receive_event(socket, read_event)
# Process messages
messages = response.get("messages", [])
for message in messages:
timestamp = message.get("timestamp", "unknown")
msg_type = message.get("type", "unknown")
data = message.get("data", {})
if msg_type == "inference.result":
result = data.get('result', 'No analysis')
confidence = data.get('confidence', 0)
processing_time = data.get('processing_time_ms', 0)
print(f"🎥 [{timestamp}] Analysis: {result}")
print(f" Confidence: {confidence:.2f}, Processing: {processing_time}ms")
elif msg_type == "frame.processed":
frame_num = data.get('frame_number', 0)
print(f"🖼️ [{timestamp}] Frame {frame_num} processed")
elif msg_type == "stream.status":
status = data.get('status', 'unknown')
print(f"📡 [{timestamp}] Stream status: {status}")
elif msg_type == "log.info":
print(f"ℹ️ [{timestamp}] {data}")
elif msg_type == "error":
print(f"❌ [{timestamp}] Error: {data}")
if not messages:
print("⏳ Waiting for video frames...")
time.sleep(3) # Poll every 3 seconds
print("⏰ Max runtime reached, stopping stream...")
finally:
# Clean up
print("🧹 Cleaning up...")
try:
send_and_receive_event(socket, {"type": "session.destroy"})
print(" Session destroyed")
except:
pass
socket.close()
print(" WebSocket closed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Stream RTSP camera through Newton")
parser.add_argument("--api_key", required=True, help="Your Newton API key")
parser.add_argument("--rtsp_url", required=True, help="RTSP camera URL")
parser.add_argument("--max_run_time_sec", type=int, default=60, help="Max runtime in seconds")
args = parser.parse_args()
try:
stream_rtsp_camera(args.api_key, args.rtsp_url, args.max_run_time_sec)
except Exception as e:
print(f"❌ Error: {e}")
Report incorrect code
Copy
Ask AI
🚀 Creating lens session for video analysis...
Session ID: lsn-xyz789abc123
🔌 Connecting to WebSocket...
Connected!
✅ Validating session...
Session is valid!
📹 Configuring RTSP stream: rtsp://123456789.stream
RTSP stream configured: {'status': 'configured', 'stream_id': 'rtsp_456'}
🔬 Reading real-time analysis...
📡 [1741843200.123] Stream status: connected
🖼️ [1741843205.456] Frame 1 processed
🎥 [1741843205.789] Analysis: Multiple vehicles visible on the road including cars and trucks
Confidence: 0.89, Processing: 1240ms
🖼️ [1741843210.123] Frame 2 processed
🎥 [1741843210.456] Analysis: Traffic is flowing normally with vehicles maintaining safe distances
Confidence: 0.92, Processing: 1180ms
⏰ Max runtime reached, stopping stream...
🧹 Cleaning up...
Session destroyed
WebSocket closed
Direct Image Analysis Example
Send individual images for immediate analysis without setting up streams.Report incorrect code
Copy
Ask AI
import websocket
import json
import base64
import argparse
import requests
def analyze_image_direct(api_key, image_path, focus="Describe what you see"):
"""Analyze a single image using direct model query."""
# Create session
print("🚀 Creating lens session...")
response = requests.post(
"https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
)
session = response.json()
# Connect WebSocket
print("🔌 Connecting to WebSocket...")
socket = websocket.create_connection(
session['session_endpoint'],
header={"Authorization": f"Bearer {api_key}"}
)
try:
# Load and encode image
print(f"📸 Loading image: {image_path}")
with open(image_path, "rb") as img_handle:
base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
# Send direct query
print("🔬 Analyzing image...")
event_message = {
"type": "model.query",
"event_data": {
"model_version": "Newton::c1_6_241117d4362cc9",
"template_name": "image_qa_template_task",
"instruction": "Answer the following question about the image:",
"focus": focus,
"max_new_tokens": 512,
"data": [{
"type": "base64_img",
"base64_img": base64_img
}],
"sensor_metadata": {}
}
}
socket.send_binary(json.dumps(event_message).encode())
response = json.loads(socket.recv())
# Display results
print("\n🎯 Analysis Results:")
print("=" * 50)
print(f"Focus: {focus}")
print(f"Result: {response.get('result', 'No result')}")
if 'confidence' in response:
print(f"Confidence: {response['confidence']:.2f}")
if 'processing_time_ms' in response:
print(f"Processing Time: {response['processing_time_ms']}ms")
print("=" * 50)
return response
finally:
# Cleanup
try:
socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
socket.recv() # Wait for destroy confirmation
except:
pass
socket.close()
def detect_objects_in_image(api_key, image_path, objects_to_find=["person", "car", "truck"]):
"""Detect specific objects in an image with bounding boxes."""
# Create session
response = requests.post(
"https://api.u1.archetypeai.app/v0.5/lens/sessions/create",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={"lens_id": "lns-fd669361822b07e2-237ab3ffd79199b0"}
)
session = response.json()
# Connect WebSocket
socket = websocket.create_connection(
session['session_endpoint'],
header={"Authorization": f"Bearer {api_key}"}
)
try:
# Load and encode image
with open(image_path, "rb") as img_handle:
base64_img = base64.b64encode(img_handle.read()).decode("utf-8")
# Object detection query
objects_str = ",".join(objects_to_find)
event_message = {
"type": "model.query",
"event_data": {
"model_version": "Newton::c1_6_241117d4362cc9",
"template_name": "image_bbox_template_task",
"instruction": "Localize all objects in the picture and build bounding box around them.",
"focus": f"Input: [{objects_str}]",
"max_new_tokens": 512,
"data": [{
"type": "base64_img",
"base64_img": base64_img
}],
"sensor_metadata": {}
}
}
socket.send_binary(json.dumps(event_message).encode())
response = json.loads(socket.recv())
# Parse and display bounding boxes
print(f"\n🎯 Object Detection Results:")
print("=" * 50)
result = response.get('result', '')
if result:
for line in result.split('\n'):
if ':' in line:
obj_name, bbox = line.split(':', 1)
print(f"📦 {obj_name.strip()}: {bbox.strip()}")
else:
print("No objects detected")
print("=" * 50)
return response
finally:
# Cleanup
try:
socket.send_binary(json.dumps({"type": "session.destroy"}).encode())
socket.recv()
except:
pass
socket.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--api_key", required=True, help="Your Newton API key")
parser.add_argument("--image_path", required=True, help="Path to image file")
parser.add_argument("--mode", choices=["describe", "detect"], default="describe",
help="Analysis mode: describe or detect objects")
parser.add_argument("--focus", default="Describe what you see in detail",
help="Focus question for description mode")
parser.add_argument("--objects", nargs="+", default=["person", "car", "truck"],
help="Objects to detect in detect mode")
args = parser.parse_args()
try:
if args.mode == "describe":
analyze_image_direct(args.api_key, args.image_path, args.focus)
else:
detect_objects_in_image(args.api_key, args.image_path, args.objects)
except Exception as e:
print(f"❌ Error: {e}")
Repository and Additional Examples
Python Client Examples
Complete example repository with more use cases
WebSocket Events API
Detailed WebSocket event documentation
File Upload API
Upload files for CSV streaming
Troubleshooting
Common issues and solutions
Common Use Cases
- Security Monitoring
- Quality Control
- Environmental Monitoring
Application: Real-time security camera analysisSetup: RTSP camera stream with person/vehicle detectionFocus Examples:
- “Detect unauthorized personnel”
- “Monitor for safety violations”
- “Count people in restricted areas”
Application: Manufacturing defect detectionSetup: Direct image analysis of product photosFocus Examples:
- “Identify product defects”
- “Verify assembly completeness”
- “Check packaging quality”
Application: Climate and sensor data analysisSetup: CSV file streaming of sensor readingsFocus Examples:
- “Analyze temperature trends”
- “Detect anomalous readings”
- “Predict equipment failures”