Creating Algorithms
Step-by-step guide to building custom algorithms
Creating Algorithms
This guide walks you through creating a custom algorithm. We'll use a device visits algorithm as a example.
Overview
Creating an algorithm involves five steps:
- Define Requirements - Specify inputs, outputs, and parameters
- Create Manifest - Document the algorithm in a JSON manifest
- Implement Algorithm - Write the algorithm code
- Build Container - Package as a Docker image
- Register Algorithm - Publish to the platform
Example: Device Visits Algorithm
We'll build a device visits algorithm that processes geolocation ping data to determine how long devices spend in user-specified locations (AOIs).
Requirements
Input:
- Data Type:
pings(device_id, latitude, longitude, timestamp) - Filtered by AOI and TOI
Processing:
- Group consecutive pings from the same device
- Consider pings part of a visit if they're within a specified time threshold
- Support configurable look-back and look-forward times
Output:
- Data Type:
device_visits(aoi_id, device_id, start_time, end_time) - Summary: Count of unique visitors per AOI
Parameters:
look_back_time- Seconds to look back before first ping (default: 3600)look_forward_time- Seconds to look forward after last ping (default: 3600)override_visit_time- Boolean to use observation time boundaries (default: false)
Parallelization:
- By AOI - Each AOI can be processed independently
- By time - If grouping by hour/day, each time period can be processed independently
Step 1: Define Requirements
Before writing any code, clearly define:
Input Data
What data does your algorithm need?
Data Types: pings
Data Sources: safegraph_pings, cuebiq_pings (or specific sources)
Spatial filter: AOI boundaries
Temporal filter: TOI time ranges
Processing Logic
What does your algorithm do?
1. Read filtered ping data
2. Group pings by device_id
3. Sort pings by timestamp
4. Identify visit boundaries using time thresholds
5. Calculate visit start and end times
6. Write visit records
Output Data
What results does your algorithm produce?
Data Type: device_visits
Observation values: visit_count (number of unique devices)
Output files: Parquet files with detailed visit records
Parameters
What can users configure?
look_back_time: integer (0-2592000 seconds)
look_forward_time: integer (0-2592000 seconds)
override_visit_time: boolean
Resource Requirements
What resources does your algorithm need?
CPU: 200 millicores (0.2 cores)
Memory: 5 GB
GPU: 0
Step 2: Create A Manifest
Create a manifest that describes your algorithm. See Algorithm Manifests for more details.
The basic manifest structure is:
{
"manifest_version": "0.1.0",
"metadata": {
"description": "Produce a list of AOI visits per device",
"tags": ["device_visits"],
"version": "0.0.1"
},
"inputs": [...],
"outputs": {...},
"parameters": [...],
"container_parameters": {...},
"interface": {...}
}Step 3: Implement the Algorithm
Write your algorithm in your preferred programming language. It must:
- Read input from the path specified in
algo_input.json - Process data according to your logic
- Write output to the path specified in
algo_input.json
See Algorithm Input/Output for complete details on reading and writing data.
Sample Implementation Structure (Python)
import json
import os
import pandas as pd
from pathlib import Path
def main():
# 1. Read algorithm input
input_path = os.environ.get('ALGORITHM_INPUT_PATH')
with open(input_path) as f:
algo_input = json.load(f)
# 2. Extract configuration
config = algo_input['config']
look_back = config['parameters']['look_back_time']
look_forward = config['parameters']['look_forward_time']
# 3. Process each AOI
results = []
for input_data in algo_input['input_data']:
aoi_version = input_data['aoi_version']
# Read input data
for data_source in input_data['data_sources']:
for data in data_source['data']:
pings_df = pd.read_parquet(data['file_path'])
# Process pings into visits
visits_df = process_pings_to_visits(
pings_df,
aoi_version,
look_back,
look_forward
)
# Save visit details
measurement_path = save_visits(visits_df, algo_input['output_path'])
# Create observation
observation = {
'observation_start_ts': int(data['details']['time_ranges'][0]['start']),
'measurement_path': measurement_path,
'observation_values': [{
'visit_count': len(visits_df['device_id'].unique())
}]
}
results.append({
'source_aoi_version': aoi_version,
'data_type': 'device_visits',
'observations': [observation]
})
# 4. Write algorithm output
output = {'results': results}
output_path = Path(algo_input['output_path']) / 'algo_output.json'
with open(output_path, 'w') as f:
json.dump(output, f)
def process_pings_to_visits(pings_df, aoi_version, look_back, look_forward):
# Your visit detection logic here
visits = []
# ... implementation ...
return pd.DataFrame(visits)
def save_visits(visits_df, output_dir):
# Save to parquet file
filename = f"visits_{uuid.uuid4()}.parquet"
filepath = Path(output_dir) / filename
visits_df.to_parquet(filepath)
return filename
if __name__ == '__main__':
main()Step 4: Build A Container
Package your algorithm as a Docker container. See Container Images for complete details.
Sample Dockerfile
FROM python:3.9-slim
# Install dependencies
COPY requirements.txt /app/
RUN pip install -r /app/requirements.txt
# Copy algorithm code
COPY device_visits.py /app/
# Set working directory
WORKDIR /app
# Algorithm will be invoked via manifest commandBuild and Push
# Build the image
docker build -t myorg/device-visits:1.0 .
# Push to registry
docker push myorg/device-visits:1.0Step 5: Register Algorithm
Register your algorithm with the platform. See Registering Algorithms for complete details.
from elements.sdk.elements_sdk import ElementsSDK
import json
sdk = ElementsSDK()
# Load manifest
with open('manifest.json') as f:
manifest = json.load(f)
# Create algorithm
algorithm = await sdk.algorithm.create(
name="device-visits",
display_name="Device Visits",
author="Your Organization"
)
# Register version with manifest
version = await sdk.algorithm_version.create(
algorithm_id=algorithm.id,
manifest=manifest
)
print(f"Algorithm version registered: {version.id}")Testing Your Algorithm
Before running at scale, test your algorithm:
- Unit test your processing logic with sample data
- Test locally using Docker with sample input files
- Create a test computation with a small AOI and short time range
- Validate output format and correctness
- Check resource usage to ensure it matches your manifest
Local Testing
Docker provides a convenient way of testing an algorithm:
# Create sample algo_input.json
cat > /tmp/algo_input.json << EOF
{
"version": "0.1.0",
"input_data_path": "/work/input",
"output_path": "/work/output",
"config": {
"parameters": {
"look_back_time": 3600,
"look_forward_time": 3600
}
},
"input_data": [...]
}
EOF
# Run container locally
docker run
-v /tmp:/work
-e ALGORITHM_INPUT_PATH=/work/algo_input.json
myorg/device-visits:1.0
python /app/device_visits.py
# Check output
cat /work/output/algo_output.jsonDebugging
Sometimes testing locally does not reveal problems that happen in production. Therefore Elements provides an API that create a docker_compose.yaml file that creates a similar setup to how an algorithm runs in production.
Common Patterns
Reading Multiple Data Sources
for data_source in input_data['data_sources']:
data_source_id = data_source['data_source_id']
for data in data_source['data']:
df = pd.read_parquet(data['file_path'])
# Process data...Handling Multiple Time Ranges
for time_range in data['details']['time_ranges']:
start = time_range['start']
end = time_range['end']
# Filter data to time range
filtered_df = df[(df['timestamp'] >= start) & (df['timestamp'] <= end)]
# Process...Writing Multiple Observations
observations = []
for time_period in time_periods:
# Process this time period
result_df = process_time_period(time_period)
# Save measurements
measurement_path = save_measurement(result_df)
# Add observation
observations.append({
'observation_start_ts': time_period.start,
'measurement_path': measurement_path,
'observation_values': [{'count': len(result_df)}]
})Best Practices
- Validate input early - Check that required fields exist before processing
- Handle errors gracefully - Log errors and write empty results rather than crashing
- Be memory efficient - Process data in chunks if working with large datasets
- Use standard libraries - Prefer pandas, numpy, and other well-tested libraries
- Log progress - Write logs to help debug issues
- Clean up resources - Close file handles and free memory when done
Next Steps
- Learn more about Algorithm Manifests
- Understand Algorithm Input/Output in detail
- See how to create Container Images
- Follow the guide for Registering Algorithms
- Once registered, learn Running Algorithms
Updated 5 months ago