Algorithm Input/Output

How algorithms read input data and write results

Algorithm Input/Output

Understanding how data flows into and out of your algorithm is crucial for successful development. This guide explains the input and output specifications in detail.

Overview

When your algorithm container runs:

  1. Input - Platform sets ALGORITHM_INPUT_PATH environment variable pointing to algo_input.json
  2. Processing - Your algorithm reads input, processes data, writes output
  3. Output - Your algorithm writes algo_output.json to the specified output directory

Reading Input

Step 1: Get Input File Path

The platform provides the input file path via an environment variable:

import os
import json

# Get path to input file
input_path = os.environ.get('ALGORITHM_INPUT_PATH')

# Read input specification
with open(input_path) as f:
    algo_input = json.load(f)

Step 2: Understand Input Structure

The algo_input.json file contains:

{
  "version": "0.1.0",
  "input_data_path": "/work/input",
  "output_path": "/work/output",
  "config": {
    "parameters": {
      "look_back_time": 3600,
      "look_forward_time": 3600
    }
  },
  "input_data": [...]
}

Key fields:

  • version - Input schema version
  • input_data_path - Directory containing input data files
  • output_path - Directory where you should write output
  • config.parameters - User-specified parameter values
  • input_data - Array of data to process

Step 3: Extract Configuration

Get user-specified parameters:

config = algo_input['config']
parameters = config['parameters']

# Access parameter values
look_back = parameters.get('look_back_time', 3600)  # Use default if not specified
look_forward = parameters.get('look_forward_time', 3600)
override = parameters.get('override_visit_time', False)

print(f"Using look_back_time: {look_back}")

Step 4: Process Input Data

The input_data array contains one entry per AOI:

for input_item in algo_input['input_data']:
    aoi_version = input_item['aoi_version']
    aoi_wkb = input_item.get('aoi_wkb')  # WKB-encoded geometry
    time_ranges = input_item.get('time_ranges', [])

    print(f"Processing AOI version: {aoi_version}")

    # Process each data source
    for data_source in input_item['data_sources']:
        data_source_ids = data_source['data_source_ids']

        # Process each data file
        for data in data_source['data']:
            file_path = data['file_path']
            details = data.get('details', {})

            # Read and process the data
            process_data_file(file_path, aoi_version, details)

Complete Input Example

Here's what a complete algo_input.json looks like:

{
  "version": "0.1.0",
  "input_data_path": "/work/input",
  "output_path": "/work/output",
  "config": {
    "parameters": {
      "look_back_time": 3600,
      "look_forward_time": 3600
    }
  },
  "input_data": [
    {
      "aoi_version": 36810008,
      "aoi_wkb": "AQMAAAABAAAABAAAANZvJqYLeV7A...",
      "time_ranges": [
        [
          {
            "start_local": "2021-03-01T08:00:00Z",
            "finish_local": "2021-03-08T08:00:00Z"
          }
        ]
      ],
      "data_sources": [
        {
          "data_source_ids": ["safegraph_pings"],
          "data": [
            {
              "file_path": "/work/input/safegraph_pings_abc123.parquet",
              "details": {
                "id": "abc123",
                "geom_wkb": "AQMAAAABAAAABAAAANZv...",
                "time_ranges": [
                  {
                    "start": "2021-03-01T08:00:00Z",
                    "end": "2021-03-08T08:00:00Z"
                  }
                ],
                "aoi_identifier": {
                  "id": "8a68ae09-f4e3-4c38-93b0-0e2cc21d7529",
                  "version": 36810008
                }
              }
            }
          ]
        }
      ]
    }
  ]
}

Reading Data Files

Data files are referenced by file_path and conform to the input Data Type schema.

Reading Parquet Files

Most data is provided as Parquet files:

import pandas as pd

def read_input_data(file_path):
    """Read parquet file with automatic schema validation"""
    df = pd.read_parquet(file_path)

    # Data Types guarantee these columns exist
    # For 'pings' Data Type:
    assert 'device_id' in df.columns
    assert 'latitude' in df.columns
    assert 'longitude' in df.columns
    assert 'timestamp' in df.columns

    return df

# Usage
df = read_input_data(data['file_path'])
print(f"Loaded {len(df)} pings")

Reading CSV Files

Some data may be provided as CSV:

import pandas as pd

def read_csv_input(file_path):
    df = pd.read_csv(file_path)
    return df

Handling Multiple Time Ranges

Input data can span multiple time ranges (for recurrence rules):

time_ranges = input_item.get('time_ranges', [])

# time_ranges is a list of lists:
# Each outer list element represents one observation
# Each observation can have multiple time ranges

for observation_idx, observation_ranges in enumerate(time_ranges):
    print(f"Processing observation {observation_idx}")

    for time_range in observation_ranges:
        start = time_range['start_local']
        end = time_range['finish_local']
        print(f"  Time range: {start} to {end}")

        # Filter data to this time range
        filtered_df = df[
            (df['timestamp'] >= start) &
            (df['timestamp'] <= end)
        ]

        # Process this time slice...

Writing Output

Step 1: Create Results Structure

Your algorithm must write an algo_output.json file:

results = []

# Add results as you process data
results.append({
    'source_aoi_version': aoi_version,
    'data_type': 'device_visits',
    'observations': observations
})

# Write output file
output_path = Path(algo_input['output_path']) / 'algo_output.json'
with open(output_path, 'w') as f:
    json.dump({'results': results}, f, indent=2)

Step 2: Structure Results

Each result contains:

result = {
    'source_aoi_version': 36810008,  # Required: Source AOI
    'data_type': 'device_visits',    # Required: Output Data Type
    'observations': [...]             # Required: List of observations
}

Optional fields:

  • dest_aoi_version - For cross-AOI analysis
  • algo_config_class - Classification class (e.g., "car", "truck")
  • algo_config_subclass - Sub-classification (e.g., "boeing-737")

Step 3: Create Observations

Each observation represents a single time period:

observation = {
    'observation_start_ts': 1614607200,  # Required: Unix timestamp

    # Option 1: Include summary values only
    'observation_values': [{
        'visit_count': 42,
        'total_dwell_time': 3600
    }],

    # Option 2: Link to detailed measurement file
    'measurement_path': 'visits_abc123.parquet',

    # Option 3: Include both
    'observation_values': [{'visit_count': 42}],
    'measurement_path': 'visits_abc123.parquet'
}

Requirements:

  • Must include at least one of observation_values or measurement_path
  • Can include both
  • observation_values must match observation_value_columns from manifest

Step 4: Write Measurement Files

If your algorithm produces detailed results, write them as Parquet files:

import uuid
import pandas as pd
from pathlib import Path

def write_measurement_file(data_df, output_dir, data_type):
    """Write measurement data to parquet file"""
    # Generate unique filename
    filename = f"meas_{uuid.uuid4()}.parquet"
    filepath = Path(output_dir) / filename

    # Ensure data conforms to Data Type schema
    # For 'device_visits':
    required_columns = ['aoi_id', 'device_id', 'start', 'finish']
    assert all(col in data_df.columns for col in required_columns)

    # Write parquet file
    data_df.to_parquet(filepath, index=False)

    return filename

# Usage
visits_df = pd.DataFrame({
    'aoi_id': ['aoi1'] * 10,
    'device_id': device_ids,
    'start': start_times,
    'finish': end_times
})

measurement_path = write_measurement_file(
    visits_df,
    algo_input['output_path'],
    'device_visits'
)

Complete Output Examples

Example 1: Simple Aggregated Results

Algorithm that just produces summary counts:

{
  "results": [
    {
      "source_aoi_version": 36810008,
      "data_type": "device_visits",
      "observations": [
        {
          "observation_start_ts": 1614607200,
          "observation_values": [{
            "unique_device_count": 10
          }]
        }
      ]
    }
  ]
}

Example 2: Detailed Results with Files

Algorithm that outputs detailed measurements:

{
  "results": [
    {
      "source_aoi_version": 36809989,
      "data_type": "device_visits",
      "observations": [
        {
          "observation_start_ts": 1546832297,
          "measurement_path": "meas_34e2dc28-0a28-4dbd-891c-7e3ae49b614b.parquet"
        },
        {
          "observation_start_ts": 1546835897,
          "measurement_path": "meas_37549e93-00d6-473e-bfb0-00aec9d4addf.parquet"
        }
      ]
    }
  ]
}

Example 3: Combined Summary and Detailed Results

Algorithm that outputs both aggregates and detailed data:

{
  "results": [
    {
      "source_aoi_version": 36809989,
      "data_type": "object_detections",
      "algo_config_class": "car",
      "observations": [
        {
          "observation_start_ts": 1546832297,
          "measurement_path": "meas_34e2dc28-0a28-4dbd-891c-7e3ae49b614b.parquet",
          "observation_values": [{
            "count": 29
          }]
        },
        {
          "observation_start_ts": 1546832297,
          "measurement_path": "meas_37549e93-00d6-473e-bfb0-00aec9d4addf.parquet",
          "observation_values": [{
            "count": 31
          }]
        }
      ]
    }
  ]
}

Helper Utilities

The platform provides an AlgorithmOutput helper class:

from algorithm_utils.algo_output import AlgorithmOutput

# Initialize
output = AlgorithmOutput(output_path=algo_input['output_path'])

# Add a result
output.add_result(
    source_aoi_version=aoi_version,
    data_type='device_visits'
)

# Add observations
output.add_observation(
    observation_start_ts=start_timestamp,
    measurement_df=visits_df,
    observation_values={'visit_count': len(visits_df)}
)

# Write output file
output.write()

See AlgorithmOutput source code for complete documentation.

Best Practices

Input Processing

  1. Validate early - Check required fields exist before processing
  2. Handle missing data - Gracefully handle missing or empty input files
  3. Log progress - Print/log as you process each AOI and time range
  4. Fail gracefully - Write empty results rather than crashing
def process_safely(input_data):
    try:
        # Process data
        result = process_data(input_data)
        return result
    except Exception as e:
        print(f"Error processing data: {e}")
        # Return empty result
        return create_empty_result()

Output Writing

  1. Validate schema - Ensure output data matches Data Type schema
  2. Use unique filenames - Use UUIDs for measurement files
  3. Write atomically - Write to temp file, then rename
  4. Include all fields - Don't skip required fields even if empty
import tempfile
import shutil

def write_output_atomically(data, output_path):
    # Write to temporary file
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
        json.dump(data, tmp)
        tmp_path = tmp.name

    # Move to final location
    shutil.move(tmp_path, output_path)

Error Handling

  1. Log errors - Print to stdout/stderr for debugging
  2. Write partial results - Save what you can if processing partially fails
  3. Set appropriate exit codes - Non-zero for failures
import sys

try:
    main()
except Exception as e:
    print(f"Fatal error: {e}", file=sys.stderr)
    # Still try to write empty output
    write_empty_output()
    sys.exit(1)

Testing Your I/O

Create test input files to validate your implementation:

# Create test algo_input.json
test_input = {
    "version": "0.1.0",
    "input_data_path": "/tmp/input",
    "output_path": "/tmp/output",
    "config": {
        "parameters": {
            "look_back_time": 1800
        }
    },
    "input_data": [...]
}

with open('/tmp/algo_input.json', 'w') as f:
    json.dump(test_input, f)

# Set environment variable
os.environ['ALGORITHM_INPUT_PATH'] = '/tmp/algo_input.json'

# Run your algorithm
main()

# Verify output
with open('/tmp/output/algo_output.json') as f:
    output = json.load(f)
    assert 'results' in output
    assert len(output['results']) > 0

Next Steps