Documentation Index
Fetch the complete documentation index at: https://docs.archetypeai.app/llms.txt
Use this file to discover all available pages before exploring further.
The Test Lens is a pass-through lens that repeats input data for testing and debugging data streaming pipelines. It uses the lens_echo_processor to buffer and echo data without modification, making it ideal for validating data flow through the system.
Use the Test Lens when you need to:
- Test data streaming pipelines (CSV, Kafka, SSE)
- Validate that data correctly flows from input to output
- Debug stream configurations without processing overhead
- Test read timeout behavior with actual data buffering
- Verify integration between different stream types
- Test data flow end-to-end without transformation
Echo Processor
The Test Lens uses the lens_echo_processor, which implements a pass-through mechanism for data flow testing.
Processor Behavior
The Echo processor:
- Stores incoming data in an internal queue
- Returns data unchanged when read
- Acts as a buffer between input and output streams
- Tracks statistics inherited from LensProcessorBase
Implementation Details
@registry.register_lens_processor("lens_echo_processor")
class LensEchoProcessor(LensProcessorBase):
"""A simple echo processor that repeats the input for testing and debugging."""
def __init__(self, config: config_dict.ConfigDict, redis_db_cls: RedisDatabase) -> None:
super().__init__(config, redis_db_cls)
self.data_queue = queue.Queue()
async def read(self, timestamp: float = -1.0) -> list[dict]:
timestamp = timestamp if timestamp >= 0.0 else time.time()
self.num_reads += 1
self.last_read_timestamp = timestamp
if self.data_queue.empty():
return None
events = []
while not self.data_queue.empty():
events.append(self.data_queue.get())
return events
async def write(self, event_data: dict, timestamp: float = -1.0) -> dict:
timestamp = timestamp if timestamp >= 0.0 else time.time()
self.data_queue.put(event_data)
self.num_writes += 1
self.last_write_timestamp = timestamp
return event_data
Creating a Test Lens
To create a Test Lens, register it with the Echo processor:
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
# The lens_id is returned in the metadata
lens_id = lens_metadata["lens_id"]
Use Cases
The Test Lens is designed for testing data streaming and pipeline functionality:
- CSV File Streaming - Test streaming CSV files through the lens to validate file reader integration.
- Kafka Integration Testing - Test reading from and writing to Kafka topics through the lens.
- Server-Side Events (SSE) Testing - Validate that Server-Side Events streams correctly output data.
- Read Timeout Testing - Test blocking read behavior with configurable timeout parameters.
- Data Flow Validation - Verify that data correctly flows from input streams to output streams without modification.
Examples
CSV Reader Test
This example demonstrates streaming a CSV file through a Test lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
"""Main function to run the logic of a custom lens session."""
test_errors = []
# Upload the test file
response_data = client.files.local.upload(config.csv_filename)
file_id = response_data["file_id"]
# Attach a CSV file reader as input to the lens
event = {
"type": "input_stream.set",
"event_data": {
"stream_type": "csv_file_reader",
"stream_config": {
"file_id": file_id,
"window_size": 1,
"step_size": 1,
"loop_recording": 0,
}
}
}
response = client.lens.sessions.process_event(session_id, event)
return test_errors
# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
lens_id = lens_metadata["lens_id"]
# Create and run the session
test_errors = client.lens.create_and_run_session(
lens_id, test_fn, auto_destroy=True, client=client, config=config)
# Cleanup
client.lens.delete(lens_id)
Kafka Reader Test
This example demonstrates reading from Kafka topics through a Test Lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
"""Main function to run the logic of a custom lens session."""
test_errors = []
test_topic = secrets.token_hex(8) # Generate a unique test topic
# Attach a Kafka input stream
num_events_to_send = 16
event = {
"type": "input_stream.set",
"event_data": {
"stream_type": "kafka_reader",
"stream_config": {
"topic_ids": [test_topic],
"max_messages_per_read": 1,
}
}
}
response = client.lens.sessions.process_event(session_id, event)
# Send messages to the lens
producer = client.kafka.create_producer(topic_ids=[test_topic])
for i in range(num_events_to_send):
producer.send(topic_id=test_topic, value={"counter": i})
producer.flush()
# Wait for lens to process messages
time.sleep(10)
# Query the session status
event = {"type": "session.status"}
response = client.lens.sessions.process_event(session_id, event)
num_events_received = response["num_inputs"]
if num_events_received != num_events_to_send:
test_errors.append(f"Number of lens inputs ({num_events_received}) does not match expected ({num_events_to_send})!")
return test_errors
# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
lens_id = lens_metadata["lens_id"]
# Create and run the session
test_errors = client.lens.create_and_run_session(
lens_id, test_fn, auto_destroy=True, client=client, config=config)
# Cleanup
client.lens.delete(lens_id)
Kafka Reader/Writer Test
This example demonstrates streaming data through a Test Lens from Kafka input to Kafka output:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
"""Main function to run the logic of a custom lens session."""
test_errors = []
test_uid = secrets.token_hex(8)
input_test_topic = f"input_{test_uid}"
output_test_topic = f"output_{test_uid}"
# Attach Kafka input stream
event = {
"type": "input_stream.set",
"event_data": {
"stream_type": "kafka_reader",
"stream_config": {
"topic_ids": [input_test_topic],
"max_messages_per_read": 1,
}
}
}
response = client.lens.sessions.process_event(session_id, event)
# Attach Kafka output stream
event = {
"type": "output_stream.set",
"event_data": {
"stream_type": "kafka_writer",
"stream_config": {
"topic_ids": [output_test_topic],
"max_messages_per_read": 1,
}
}
}
response = client.lens.sessions.process_event(session_id, event)
if response["type"] != "output_stream.set.response":
test_errors.append("Failed to receive output_stream.set response!")
return test_errors
# Send messages to input topic
producer = client.kafka.create_producer(topic_ids=[input_test_topic])
num_events_to_send = 16
for i in range(num_events_to_send):
producer.send(topic_id=input_test_topic, value={"counter": i})
producer.flush()
# Create consumer for output topic
consumer = client.kafka.create_consumer(
topic_id=output_test_topic,
auto_offset_reset="earliest",
consumer_timeout_ms=1000,
)
got_message = False
for message in consumer:
got_message = True
# Verify session processed messages
event = {"type": "session.status"}
response = client.lens.sessions.process_event(session_id, event)
num_events_received = response["num_inputs"]
if num_events_received != num_events_to_send:
test_errors.append(f"Number of lens inputs ({num_events_received}) does not match expected ({num_events_to_send})!")
return test_errors
# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
lens_id = lens_metadata["lens_id"]
# Create and run the session
test_errors = client.lens.create_and_run_session(
lens_id, test_fn, auto_destroy=True, client=client, config=config)
# Cleanup
client.lens.delete(lens_id)
Server-Side Events Reader Test
This example demonstrates reading Lens output via Server-Side Events (SSE):
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
"""Main function to run the logic of a custom lens session."""
test_errors = []
# Attach a basic event emitter as input
event = {
"type": "input_stream.set",
"event_data": {
"stream_type": "event_emitter",
"stream_config": { "max_events": 10 }
}
}
response = client.lens.sessions.process_event(session_id, event)
# Attach SSE writer for output
event = {
"type": "output_stream.set",
"event_data": {
"stream_type": "server_sent_events_writer",
"stream_config": {
"max_messages_per_read": 1,
}
}
}
response = client.lens.sessions.process_event(session_id, event)
if response["type"] != "output_stream.set.response":
test_errors.append("Failed to receive output_stream.set response!")
return test_errors
# Connect to SSE stream
api_endpoint = f"{config.api_endpoint}/lens/sessions/consumer/{session_id}"
sse_event_stream = SSEClient(api_endpoint, headers={"Authorization": f"Bearer {config.api_key}"})
# Read events from SSE stream
event_counter = 0
max_num_events = 10
for event in sse_event_stream:
event_counter += 1
if event_counter >= max_num_events:
break
if event_counter == 0:
test_errors.append("Failed to receive SSE events!")
return test_errors
# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
lens_id = lens_metadata["lens_id"]
# Create and run the session
test_errors = client.lens.create_and_run_session(
lens_id, test_fn, auto_destroy=True, client=client, config=config)
# Cleanup
client.lens.delete(lens_id)
Read Timeout Test
This example demonstrates testing read timeout functionality with the Test Lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> bool:
"""Main function to run the logic of a custom lens session."""
# Clear any pending messages
client_id = "test_client"
event_message = {"type": "session.read", "event_data": {"client_id": client_id}}
max_read = 10
while max_read > 0:
response = client.lens.sessions.process_event(session_id, event_message)
max_read = 0 if response["event_data"] is None else max_read - 1
# Test with 2 second timeout
event_message = {
"type": "session.read",
"event_data": {"read_timeout_sec": 2.0, "client_id": client_id}
}
timer_start = time.time()
response = client.lens.sessions.process_event(session_id, event_message)
timer_end = time.time()
timer_delta = timer_end - timer_start
assert response["type"] == "session.read.response"
assert response["event_data"] is None
assert timer_delta > 2.0 and timer_delta < 3.0
# Test with 5 second timeout
event_message = {
"type": "session.read",
"event_data": {"read_timeout_sec": 5.0, "client_id": client_id}
}
timer_start = time.time()
response = client.lens.sessions.process_event(session_id, event_message)
timer_end = time.time()
timer_delta = timer_end - timer_start
assert response["type"] == "session.read.response"
assert response["event_data"] is None
assert timer_delta > 5.0 and timer_delta < 6.0
return True
# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
"lens_name": "Echo Lens",
"lens_config": {
"model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
"model_parameters": {},
},
})
lens_id = lens_metadata["lens_id"]
# Create and run the session
success = client.lens.create_and_run_session(
lens_id, test_fn, auto_destroy=True, client=client, config=config)
# Cleanup
client.lens.delete(lens_id)
The Test Lens can accept the following input stream types:
- csv_file_reader - Read data from CSV files
- kafka_reader - Read messages from Kafka topics
- event_emitter - Generate test events
Supported Output Streams
The Test Lens can output to the following stream types:
- kafka_writer - Write messages to Kafka topics
- server_sent_events_writer - Stream events via SSE