Skip to main content
The Test Lens is a pass-through lens that repeats input data for testing and debugging data streaming pipelines. It uses the lens_echo_processor to buffer and echo data without modification, making it ideal for validating data flow through the system. Use the Test Lens when you need to:
  • Test data streaming pipelines (CSV, Kafka, SSE)
  • Validate that data correctly flows from input to output
  • Debug stream configurations without processing overhead
  • Test read timeout behavior with actual data buffering
  • Verify integration between different stream types
  • Test data flow end-to-end without transformation

Echo Processor

The Test Lens uses the lens_echo_processor, which implements a pass-through mechanism for data flow testing.

Processor Behavior

The Echo processor:
  • Stores incoming data in an internal queue
  • Returns data unchanged when read
  • Acts as a buffer between input and output streams
  • Tracks statistics inherited from LensProcessorBase

Implementation Details

@registry.register_lens_processor("lens_echo_processor")
class LensEchoProcessor(LensProcessorBase):
    """A simple echo processor that repeats the input for testing and debugging."""
    
    def __init__(self, config: config_dict.ConfigDict, redis_db_cls: RedisDatabase) -> None:
        super().__init__(config, redis_db_cls)
        self.data_queue = queue.Queue()
    
    async def read(self, timestamp: float = -1.0) -> list[dict]:
        timestamp = timestamp if timestamp >= 0.0 else time.time()
        self.num_reads += 1
        self.last_read_timestamp = timestamp
        
        if self.data_queue.empty():
            return None
        
        events = []
        while not self.data_queue.empty():
            events.append(self.data_queue.get())
        return events
    
    async def write(self, event_data: dict, timestamp: float = -1.0) -> dict:
        timestamp = timestamp if timestamp >= 0.0 else time.time()
        self.data_queue.put(event_data)
        self.num_writes += 1
        self.last_write_timestamp = timestamp
        return event_data

Creating a Test Lens

To create a Test Lens, register it with the Echo processor:
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)

lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})

# The lens_id is returned in the metadata
lens_id = lens_metadata["lens_id"]

Use Cases

The Test Lens is designed for testing data streaming and pipeline functionality:
  1. CSV File Streaming - Test streaming CSV files through the lens to validate file reader integration.
  2. Kafka Integration Testing - Test reading from and writing to Kafka topics through the lens.
  3. Server-Side Events (SSE) Testing - Validate that Server-Side Events streams correctly output data.
  4. Read Timeout Testing - Test blocking read behavior with configurable timeout parameters.
  5. Data Flow Validation - Verify that data correctly flows from input streams to output streams without modification.

Examples

CSV Reader Test

This example demonstrates streaming a CSV file through a Test lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
    """Main function to run the logic of a custom lens session."""
    test_errors = []
    
    # Upload the test file
    response_data = client.files.local.upload(config.csv_filename)
    file_id = response_data["file_id"]
    
    # Attach a CSV file reader as input to the lens
    event = {
        "type": "input_stream.set",
        "event_data": {
            "stream_type": "csv_file_reader",
            "stream_config": {
                "file_id": file_id,
                "window_size": 1,
                "step_size": 1,
                "loop_recording": 0,
            }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    return test_errors

# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})
lens_id = lens_metadata["lens_id"]

# Create and run the session
test_errors = client.lens.create_and_run_session(
    lens_id, test_fn, auto_destroy=True, client=client, config=config)

# Cleanup
client.lens.delete(lens_id)

Kafka Reader Test

This example demonstrates reading from Kafka topics through a Test Lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
    """Main function to run the logic of a custom lens session."""
    test_errors = []
    
    test_topic = secrets.token_hex(8)  # Generate a unique test topic
    
    # Attach a Kafka input stream
    num_events_to_send = 16
    event = {
        "type": "input_stream.set",
        "event_data": {
            "stream_type": "kafka_reader",
            "stream_config": {
                "topic_ids": [test_topic],
                "max_messages_per_read": 1, 
            }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    # Send messages to the lens
    producer = client.kafka.create_producer(topic_ids=[test_topic])
    for i in range(num_events_to_send):
        producer.send(topic_id=test_topic, value={"counter": i})
    producer.flush()
    
    # Wait for lens to process messages
    time.sleep(10)
    
    # Query the session status
    event = {"type": "session.status"}
    response = client.lens.sessions.process_event(session_id, event)
    num_events_received = response["num_inputs"]
    
    if num_events_received != num_events_to_send:
        test_errors.append(f"Number of lens inputs ({num_events_received}) does not match expected ({num_events_to_send})!")
    
    return test_errors

# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})
lens_id = lens_metadata["lens_id"]

# Create and run the session
test_errors = client.lens.create_and_run_session(
    lens_id, test_fn, auto_destroy=True, client=client, config=config)

# Cleanup
client.lens.delete(lens_id)

Kafka Reader/Writer Test

This example demonstrates streaming data through a Test Lens from Kafka input to Kafka output:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
    """Main function to run the logic of a custom lens session."""
    test_errors = []
    
    test_uid = secrets.token_hex(8)
    input_test_topic = f"input_{test_uid}"
    output_test_topic = f"output_{test_uid}"
    
    # Attach Kafka input stream
    event = {
        "type": "input_stream.set",
        "event_data": {
            "stream_type": "kafka_reader",
            "stream_config": {
                "topic_ids": [input_test_topic],
                "max_messages_per_read": 1,
            }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    # Attach Kafka output stream
    event = {
        "type": "output_stream.set",
        "event_data": {
            "stream_type": "kafka_writer",
            "stream_config": {
                "topic_ids": [output_test_topic],
                "max_messages_per_read": 1,
            }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    if response["type"] != "output_stream.set.response":
        test_errors.append("Failed to receive output_stream.set response!")
        return test_errors
    
    # Send messages to input topic
    producer = client.kafka.create_producer(topic_ids=[input_test_topic])
    num_events_to_send = 16
    for i in range(num_events_to_send):
        producer.send(topic_id=input_test_topic, value={"counter": i})
    producer.flush()
    
    # Create consumer for output topic
    consumer = client.kafka.create_consumer(
        topic_id=output_test_topic,
        auto_offset_reset="earliest",
        consumer_timeout_ms=1000,
    )
    
    got_message = False
    for message in consumer:
        got_message = True
    
    # Verify session processed messages
    event = {"type": "session.status"}
    response = client.lens.sessions.process_event(session_id, event)
    num_events_received = response["num_inputs"]
    
    if num_events_received != num_events_to_send:
        test_errors.append(f"Number of lens inputs ({num_events_received}) does not match expected ({num_events_to_send})!")
    
    return test_errors

# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})
lens_id = lens_metadata["lens_id"]

# Create and run the session
test_errors = client.lens.create_and_run_session(
    lens_id, test_fn, auto_destroy=True, client=client, config=config)

# Cleanup
client.lens.delete(lens_id)

Server-Side Events Reader Test

This example demonstrates reading Lens output via Server-Side Events (SSE):
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> list[str]:
    """Main function to run the logic of a custom lens session."""
    test_errors = []
    
    # Attach a basic event emitter as input
    event = {
        "type": "input_stream.set",
        "event_data": {
            "stream_type": "event_emitter",
            "stream_config": { "max_events": 10 }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    # Attach SSE writer for output
    event = {
        "type": "output_stream.set",
        "event_data": {
            "stream_type": "server_sent_events_writer",
            "stream_config": {
                "max_messages_per_read": 1,
            }
        }
    }
    response = client.lens.sessions.process_event(session_id, event)
    
    if response["type"] != "output_stream.set.response":
        test_errors.append("Failed to receive output_stream.set response!")
        return test_errors
    
    # Connect to SSE stream
    api_endpoint = f"{config.api_endpoint}/lens/sessions/consumer/{session_id}"
    sse_event_stream = SSEClient(api_endpoint, headers={"Authorization": f"Bearer {config.api_key}"})
    
    # Read events from SSE stream
    event_counter = 0
    max_num_events = 10
    for event in sse_event_stream:
        event_counter += 1
        if event_counter >= max_num_events:
            break
    
    if event_counter == 0:
        test_errors.append("Failed to receive SSE events!")
    
    return test_errors

# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})
lens_id = lens_metadata["lens_id"]

# Create and run the session
test_errors = client.lens.create_and_run_session(
    lens_id, test_fn, auto_destroy=True, client=client, config=config)

# Cleanup
client.lens.delete(lens_id)

Read Timeout Test

This example demonstrates testing read timeout functionality with the Test Lens:
def test_fn(session_id: str, session_endpoint: str, client: ArchetypeAI, config: dict) -> bool:
    """Main function to run the logic of a custom lens session."""
    
    # Clear any pending messages
    client_id = "test_client"
    event_message = {"type": "session.read", "event_data": {"client_id": client_id}}
    max_read = 10
    while max_read > 0:
        response = client.lens.sessions.process_event(session_id, event_message)
        max_read = 0 if response["event_data"] is None else max_read - 1
    
    # Test with 2 second timeout
    event_message = {
        "type": "session.read", 
        "event_data": {"read_timeout_sec": 2.0, "client_id": client_id}
    }
    timer_start = time.time()
    response = client.lens.sessions.process_event(session_id, event_message)
    timer_end = time.time()
    timer_delta = timer_end - timer_start
    
    assert response["type"] == "session.read.response"
    assert response["event_data"] is None
    assert timer_delta > 2.0 and timer_delta < 3.0
    
    # Test with 5 second timeout
    event_message = {
        "type": "session.read", 
        "event_data": {"read_timeout_sec": 5.0, "client_id": client_id}
    }
    timer_start = time.time()
    response = client.lens.sessions.process_event(session_id, event_message)
    timer_end = time.time()
    timer_delta = timer_end - timer_start
    
    assert response["type"] == "session.read.response"
    assert response["event_data"] is None
    assert timer_delta > 5.0 and timer_delta < 6.0
    
    return True

# Create the Echo lens
client = ArchetypeAI(api_key, api_endpoint=api_endpoint)
lens_metadata = client.lens.register({
    "lens_name": "Echo Lens",
    "lens_config": {
        "model_pipeline": [{"processor_name": "lens_echo_processor", "processor_config": {}}],
        "model_parameters": {},
    },
})
lens_id = lens_metadata["lens_id"]

# Create and run the session
success = client.lens.create_and_run_session(
    lens_id, test_fn, auto_destroy=True, client=client, config=config)

# Cleanup
client.lens.delete(lens_id)

Supported Input Streams

The Test Lens can accept the following input stream types:
  • csv_file_reader - Read data from CSV files
  • kafka_reader - Read messages from Kafka topics
  • event_emitter - Generate test events

Supported Output Streams

The Test Lens can output to the following stream types:
  • kafka_writer - Write messages to Kafka topics
  • server_sent_events_writer - Stream events via SSE