Algorithm Input/Output
How algorithms read input data and write results
Algorithm Input/Output
Understanding how data flows into and out of your algorithm is crucial for successful development. This guide explains the input and output specifications in detail.
Overview
When your algorithm container runs:
- Input - Platform sets
ALGORITHM_INPUT_PATHenvironment variable pointing toalgo_input.json - Processing - Your algorithm reads input, processes data, writes output
- Output - Your algorithm writes
algo_output.jsonto the specified output directory
Reading Input
Step 1: Get Input File Path
The platform provides the input file path via an environment variable:
import os
import json
# Get path to input file
input_path = os.environ.get('ALGORITHM_INPUT_PATH')
# Read input specification
with open(input_path) as f:
algo_input = json.load(f)Step 2: Understand Input Structure
The algo_input.json file contains:
{
"version": "0.1.0",
"input_data_path": "/work/input",
"output_path": "/work/output",
"config": {
"parameters": {
"look_back_time": 3600,
"look_forward_time": 3600
}
},
"input_data": [...]
}Key fields:
version- Input schema versioninput_data_path- Directory containing input data filesoutput_path- Directory where you should write outputconfig.parameters- User-specified parameter valuesinput_data- Array of data to process
Step 3: Extract Configuration
Get user-specified parameters:
config = algo_input['config']
parameters = config['parameters']
# Access parameter values
look_back = parameters.get('look_back_time', 3600) # Use default if not specified
look_forward = parameters.get('look_forward_time', 3600)
override = parameters.get('override_visit_time', False)
print(f"Using look_back_time: {look_back}")Step 4: Process Input Data
The input_data array contains one entry per AOI:
for input_item in algo_input['input_data']:
aoi_version = input_item['aoi_version']
aoi_wkb = input_item.get('aoi_wkb') # WKB-encoded geometry
time_ranges = input_item.get('time_ranges', [])
print(f"Processing AOI version: {aoi_version}")
# Process each data source
for data_source in input_item['data_sources']:
data_source_ids = data_source['data_source_ids']
# Process each data file
for data in data_source['data']:
file_path = data['file_path']
details = data.get('details', {})
# Read and process the data
process_data_file(file_path, aoi_version, details)Complete Input Example
Here's what a complete algo_input.json looks like:
{
"version": "0.1.0",
"input_data_path": "/work/input",
"output_path": "/work/output",
"config": {
"parameters": {
"look_back_time": 3600,
"look_forward_time": 3600
}
},
"input_data": [
{
"aoi_version": 36810008,
"aoi_wkb": "AQMAAAABAAAABAAAANZvJqYLeV7A...",
"time_ranges": [
[
{
"start_local": "2021-03-01T08:00:00Z",
"finish_local": "2021-03-08T08:00:00Z"
}
]
],
"data_sources": [
{
"data_source_ids": ["safegraph_pings"],
"data": [
{
"file_path": "/work/input/safegraph_pings_abc123.parquet",
"details": {
"id": "abc123",
"geom_wkb": "AQMAAAABAAAABAAAANZv...",
"time_ranges": [
{
"start": "2021-03-01T08:00:00Z",
"end": "2021-03-08T08:00:00Z"
}
],
"aoi_identifier": {
"id": "8a68ae09-f4e3-4c38-93b0-0e2cc21d7529",
"version": 36810008
}
}
}
]
}
]
}
]
}Reading Data Files
Data files are referenced by file_path and conform to the input Data Type schema.
Reading Parquet Files
Most data is provided as Parquet files:
import pandas as pd
def read_input_data(file_path):
"""Read parquet file with automatic schema validation"""
df = pd.read_parquet(file_path)
# Data Types guarantee these columns exist
# For 'pings' Data Type:
assert 'device_id' in df.columns
assert 'latitude' in df.columns
assert 'longitude' in df.columns
assert 'timestamp' in df.columns
return df
# Usage
df = read_input_data(data['file_path'])
print(f"Loaded {len(df)} pings")Reading CSV Files
Some data may be provided as CSV:
import pandas as pd
def read_csv_input(file_path):
df = pd.read_csv(file_path)
return dfHandling Multiple Time Ranges
Input data can span multiple time ranges (for recurrence rules):
time_ranges = input_item.get('time_ranges', [])
# time_ranges is a list of lists:
# Each outer list element represents one observation
# Each observation can have multiple time ranges
for observation_idx, observation_ranges in enumerate(time_ranges):
print(f"Processing observation {observation_idx}")
for time_range in observation_ranges:
start = time_range['start_local']
end = time_range['finish_local']
print(f" Time range: {start} to {end}")
# Filter data to this time range
filtered_df = df[
(df['timestamp'] >= start) &
(df['timestamp'] <= end)
]
# Process this time slice...Writing Output
Step 1: Create Results Structure
Your algorithm must write an algo_output.json file:
results = []
# Add results as you process data
results.append({
'source_aoi_version': aoi_version,
'data_type': 'device_visits',
'observations': observations
})
# Write output file
output_path = Path(algo_input['output_path']) / 'algo_output.json'
with open(output_path, 'w') as f:
json.dump({'results': results}, f, indent=2)Step 2: Structure Results
Each result contains:
result = {
'source_aoi_version': 36810008, # Required: Source AOI
'data_type': 'device_visits', # Required: Output Data Type
'observations': [...] # Required: List of observations
}Optional fields:
dest_aoi_version- For cross-AOI analysisalgo_config_class- Classification class (e.g., "car", "truck")algo_config_subclass- Sub-classification (e.g., "boeing-737")
Step 3: Create Observations
Each observation represents a single time period:
observation = {
'observation_start_ts': 1614607200, # Required: Unix timestamp
# Option 1: Include summary values only
'observation_values': [{
'visit_count': 42,
'total_dwell_time': 3600
}],
# Option 2: Link to detailed measurement file
'measurement_path': 'visits_abc123.parquet',
# Option 3: Include both
'observation_values': [{'visit_count': 42}],
'measurement_path': 'visits_abc123.parquet'
}Requirements:
- Must include at least one of
observation_valuesormeasurement_path - Can include both
observation_valuesmust matchobservation_value_columnsfrom manifest
Step 4: Write Measurement Files
If your algorithm produces detailed results, write them as Parquet files:
import uuid
import pandas as pd
from pathlib import Path
def write_measurement_file(data_df, output_dir, data_type):
"""Write measurement data to parquet file"""
# Generate unique filename
filename = f"meas_{uuid.uuid4()}.parquet"
filepath = Path(output_dir) / filename
# Ensure data conforms to Data Type schema
# For 'device_visits':
required_columns = ['aoi_id', 'device_id', 'start', 'finish']
assert all(col in data_df.columns for col in required_columns)
# Write parquet file
data_df.to_parquet(filepath, index=False)
return filename
# Usage
visits_df = pd.DataFrame({
'aoi_id': ['aoi1'] * 10,
'device_id': device_ids,
'start': start_times,
'finish': end_times
})
measurement_path = write_measurement_file(
visits_df,
algo_input['output_path'],
'device_visits'
)Complete Output Examples
Example 1: Simple Aggregated Results
Algorithm that just produces summary counts:
{
"results": [
{
"source_aoi_version": 36810008,
"data_type": "device_visits",
"observations": [
{
"observation_start_ts": 1614607200,
"observation_values": [{
"unique_device_count": 10
}]
}
]
}
]
}Example 2: Detailed Results with Files
Algorithm that outputs detailed measurements:
{
"results": [
{
"source_aoi_version": 36809989,
"data_type": "device_visits",
"observations": [
{
"observation_start_ts": 1546832297,
"measurement_path": "meas_34e2dc28-0a28-4dbd-891c-7e3ae49b614b.parquet"
},
{
"observation_start_ts": 1546835897,
"measurement_path": "meas_37549e93-00d6-473e-bfb0-00aec9d4addf.parquet"
}
]
}
]
}Example 3: Combined Summary and Detailed Results
Algorithm that outputs both aggregates and detailed data:
{
"results": [
{
"source_aoi_version": 36809989,
"data_type": "object_detections",
"algo_config_class": "car",
"observations": [
{
"observation_start_ts": 1546832297,
"measurement_path": "meas_34e2dc28-0a28-4dbd-891c-7e3ae49b614b.parquet",
"observation_values": [{
"count": 29
}]
},
{
"observation_start_ts": 1546832297,
"measurement_path": "meas_37549e93-00d6-473e-bfb0-00aec9d4addf.parquet",
"observation_values": [{
"count": 31
}]
}
]
}
]
}Helper Utilities
The platform provides an AlgorithmOutput helper class:
from algorithm_utils.algo_output import AlgorithmOutput
# Initialize
output = AlgorithmOutput(output_path=algo_input['output_path'])
# Add a result
output.add_result(
source_aoi_version=aoi_version,
data_type='device_visits'
)
# Add observations
output.add_observation(
observation_start_ts=start_timestamp,
measurement_df=visits_df,
observation_values={'visit_count': len(visits_df)}
)
# Write output file
output.write()See AlgorithmOutput source code for complete documentation.
Best Practices
Input Processing
- Validate early - Check required fields exist before processing
- Handle missing data - Gracefully handle missing or empty input files
- Log progress - Print/log as you process each AOI and time range
- Fail gracefully - Write empty results rather than crashing
def process_safely(input_data):
try:
# Process data
result = process_data(input_data)
return result
except Exception as e:
print(f"Error processing data: {e}")
# Return empty result
return create_empty_result()Output Writing
- Validate schema - Ensure output data matches Data Type schema
- Use unique filenames - Use UUIDs for measurement files
- Write atomically - Write to temp file, then rename
- Include all fields - Don't skip required fields even if empty
import tempfile
import shutil
def write_output_atomically(data, output_path):
# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
json.dump(data, tmp)
tmp_path = tmp.name
# Move to final location
shutil.move(tmp_path, output_path)Error Handling
- Log errors - Print to stdout/stderr for debugging
- Write partial results - Save what you can if processing partially fails
- Set appropriate exit codes - Non-zero for failures
import sys
try:
main()
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
# Still try to write empty output
write_empty_output()
sys.exit(1)Testing Your I/O
Create test input files to validate your implementation:
# Create test algo_input.json
test_input = {
"version": "0.1.0",
"input_data_path": "/tmp/input",
"output_path": "/tmp/output",
"config": {
"parameters": {
"look_back_time": 1800
}
},
"input_data": [...]
}
with open('/tmp/algo_input.json', 'w') as f:
json.dump(test_input, f)
# Set environment variable
os.environ['ALGORITHM_INPUT_PATH'] = '/tmp/algo_input.json'
# Run your algorithm
main()
# Verify output
with open('/tmp/output/algo_output.json') as f:
output = json.load(f)
assert 'results' in output
assert len(output['results']) > 0Next Steps
- Review the complete Algorithm Input Schema reference
- Review the complete Algorithm Output Schema reference
- Learn about Container Images to package your algorithm
- See Creating Algorithms for the full workflow
Updated 5 months ago