Creating Algorithms

Step-by-step guide to building custom algorithms

Creating Algorithms

This guide walks you through creating a custom algorithm. We'll use a device visits algorithm as a example.

Overview

Creating an algorithm involves five steps:

  1. Define Requirements - Specify inputs, outputs, and parameters
  2. Create Manifest - Document the algorithm in a JSON manifest
  3. Implement Algorithm - Write the algorithm code
  4. Build Container - Package as a Docker image
  5. Register Algorithm - Publish to the platform

Example: Device Visits Algorithm

We'll build a device visits algorithm that processes geolocation ping data to determine how long devices spend in user-specified locations (AOIs).

Requirements

Input:

  • Data Type: pings (device_id, latitude, longitude, timestamp)
  • Filtered by AOI and TOI

Processing:

  • Group consecutive pings from the same device
  • Consider pings part of a visit if they're within a specified time threshold
  • Support configurable look-back and look-forward times

Output:

  • Data Type: device_visits (aoi_id, device_id, start_time, end_time)
  • Summary: Count of unique visitors per AOI

Parameters:

  • look_back_time - Seconds to look back before first ping (default: 3600)
  • look_forward_time - Seconds to look forward after last ping (default: 3600)
  • override_visit_time - Boolean to use observation time boundaries (default: false)

Parallelization:

  • By AOI - Each AOI can be processed independently
  • By time - If grouping by hour/day, each time period can be processed independently

Step 1: Define Requirements

Before writing any code, clearly define:

Input Data

What data does your algorithm need?

Data Types: pings
Data Sources: safegraph_pings, cuebiq_pings (or specific sources)
Spatial filter: AOI boundaries
Temporal filter: TOI time ranges

Processing Logic

What does your algorithm do?

1. Read filtered ping data
2. Group pings by device_id
3. Sort pings by timestamp
4. Identify visit boundaries using time thresholds
5. Calculate visit start and end times
6. Write visit records

Output Data

What results does your algorithm produce?

Data Type: device_visits
Observation values: visit_count (number of unique devices)
Output files: Parquet files with detailed visit records

Parameters

What can users configure?

look_back_time: integer (0-2592000 seconds)
look_forward_time: integer (0-2592000 seconds)
override_visit_time: boolean

Resource Requirements

What resources does your algorithm need?

CPU: 200 millicores (0.2 cores)
Memory: 5 GB
GPU: 0

Step 2: Create A Manifest

Create a manifest that describes your algorithm. See Algorithm Manifests for more details.

The basic manifest structure is:

{
  "manifest_version": "0.1.0",
  "metadata": {
    "description": "Produce a list of AOI visits per device",
    "tags": ["device_visits"],
    "version": "0.0.1"
  },
  "inputs": [...],
  "outputs": {...},
  "parameters": [...],
  "container_parameters": {...},
  "interface": {...}
}

Step 3: Implement the Algorithm

Write your algorithm in your preferred programming language. It must:

  1. Read input from the path specified in algo_input.json
  2. Process data according to your logic
  3. Write output to the path specified in algo_input.json

See Algorithm Input/Output for complete details on reading and writing data.

Sample Implementation Structure (Python)

import json
import os
import pandas as pd
from pathlib import Path

def main():
    # 1. Read algorithm input
    input_path = os.environ.get('ALGORITHM_INPUT_PATH')
    with open(input_path) as f:
        algo_input = json.load(f)

    # 2. Extract configuration
    config = algo_input['config']
    look_back = config['parameters']['look_back_time']
    look_forward = config['parameters']['look_forward_time']

    # 3. Process each AOI
    results = []
    for input_data in algo_input['input_data']:
        aoi_version = input_data['aoi_version']

        # Read input data
        for data_source in input_data['data_sources']:
            for data in data_source['data']:
                pings_df = pd.read_parquet(data['file_path'])

                # Process pings into visits
                visits_df = process_pings_to_visits(
                    pings_df,
                    aoi_version,
                    look_back,
                    look_forward
                )

                # Save visit details
                measurement_path = save_visits(visits_df, algo_input['output_path'])

                # Create observation
                observation = {
                    'observation_start_ts': int(data['details']['time_ranges'][0]['start']),
                    'measurement_path': measurement_path,
                    'observation_values': [{
                        'visit_count': len(visits_df['device_id'].unique())
                    }]
                }

                results.append({
                    'source_aoi_version': aoi_version,
                    'data_type': 'device_visits',
                    'observations': [observation]
                })

    # 4. Write algorithm output
    output = {'results': results}
    output_path = Path(algo_input['output_path']) / 'algo_output.json'
    with open(output_path, 'w') as f:
        json.dump(output, f)

def process_pings_to_visits(pings_df, aoi_version, look_back, look_forward):
    # Your visit detection logic here
    visits = []
    # ... implementation ...
    return pd.DataFrame(visits)

def save_visits(visits_df, output_dir):
    # Save to parquet file
    filename = f"visits_{uuid.uuid4()}.parquet"
    filepath = Path(output_dir) / filename
    visits_df.to_parquet(filepath)
    return filename

if __name__ == '__main__':
    main()

Step 4: Build A Container

Package your algorithm as a Docker container. See Container Images for complete details.

Sample Dockerfile

FROM python:3.9-slim

# Install dependencies
COPY requirements.txt /app/
RUN pip install -r /app/requirements.txt

# Copy algorithm code
COPY device_visits.py /app/

# Set working directory
WORKDIR /app

# Algorithm will be invoked via manifest command

Build and Push

# Build the image
docker build -t myorg/device-visits:1.0 .

# Push to registry
docker push myorg/device-visits:1.0

Step 5: Register Algorithm

Register your algorithm with the platform. See Registering Algorithms for complete details.

from elements.sdk.elements_sdk import ElementsSDK
import json

sdk = ElementsSDK()

# Load manifest
with open('manifest.json') as f:
    manifest = json.load(f)

# Create algorithm
algorithm = await sdk.algorithm.create(
    name="device-visits",
    display_name="Device Visits",
    author="Your Organization"
)

# Register version with manifest
version = await sdk.algorithm_version.create(
    algorithm_id=algorithm.id,
    manifest=manifest
)

print(f"Algorithm version registered: {version.id}")

Testing Your Algorithm

Before running at scale, test your algorithm:

  1. Unit test your processing logic with sample data
  2. Test locally using Docker with sample input files
  3. Create a test computation with a small AOI and short time range
  4. Validate output format and correctness
  5. Check resource usage to ensure it matches your manifest

Local Testing

Docker provides a convenient way of testing an algorithm:

# Create sample algo_input.json
cat > /tmp/algo_input.json << EOF
{
  "version": "0.1.0",
  "input_data_path": "/work/input",
  "output_path": "/work/output",
  "config": {
    "parameters": {
      "look_back_time": 3600,
      "look_forward_time": 3600
    }
  },
  "input_data": [...]
}
EOF

# Run container locally
docker run 
  -v /tmp:/work 
  -e ALGORITHM_INPUT_PATH=/work/algo_input.json 
  myorg/device-visits:1.0 
  python /app/device_visits.py

# Check output
cat /work/output/algo_output.json

Debugging

Sometimes testing locally does not reveal problems that happen in production. Therefore Elements provides an API that create a docker_compose.yaml file that creates a similar setup to how an algorithm runs in production.

Common Patterns

Reading Multiple Data Sources

for data_source in input_data['data_sources']:
    data_source_id = data_source['data_source_id']

    for data in data_source['data']:
        df = pd.read_parquet(data['file_path'])
        # Process data...

Handling Multiple Time Ranges

for time_range in data['details']['time_ranges']:
    start = time_range['start']
    end = time_range['end']

    # Filter data to time range
    filtered_df = df[(df['timestamp'] >= start) & (df['timestamp'] <= end)]
    # Process...

Writing Multiple Observations

observations = []
for time_period in time_periods:
    # Process this time period
    result_df = process_time_period(time_period)

    # Save measurements
    measurement_path = save_measurement(result_df)

    # Add observation
    observations.append({
        'observation_start_ts': time_period.start,
        'measurement_path': measurement_path,
        'observation_values': [{'count': len(result_df)}]
    })

Best Practices

  1. Validate input early - Check that required fields exist before processing
  2. Handle errors gracefully - Log errors and write empty results rather than crashing
  3. Be memory efficient - Process data in chunks if working with large datasets
  4. Use standard libraries - Prefer pandas, numpy, and other well-tested libraries
  5. Log progress - Write logs to help debug issues
  6. Clean up resources - Close file handles and free memory when done

Next Steps