Running Algorithms
Complete workflow to execute algorithm computations
Running Algorithms
Once your algorithm is registered, you can run it by creating and executing Algorithm Computations. This guide walks through the complete workflow.
Overview
Running an algorithm requires four components:
- Algorithm Configuration - Specifies parameter values and data sources
- AOI Collection - Defines where to run the algorithm
- TOI - Defines when to run the algorithm
- Algorithm Computation - Combines the above and executes
Quick Start Example
Here's a complete example of running an algorithm:
from elements.sdk.elements_sdk import ElementsSDK
sdk = ElementsSDK()
# 1. Create Algorithm Configuration
config = await sdk.algorithm_config.create(
algorithm_version_id="algo-version-id-here",
name="my-device-visits-config",
params={
'data_sources': [{
'data_type_name': 'pings',
'data_source_ids': ['safegraph_pings']
}],
'parameters': {
'look_back_time': 1800,
'look_forward_time': 1800
}
}
)
# 2. Create AOI Collection
aoi_collection = await sdk.aoi_collection.create(
name="Downtown Palo Alto"
)
# Add an AOI to the collection
aoi = await sdk.aoi.create(
aoi_collection_id=aoi_collection.id,
aoi_inputs=[{
'name': 'University Avenue',
'geom_wkt': 'POLYGON((-122.161 37.445, -122.160 37.445, ...))'
}]
)
# 3. Create TOI
from google.protobuf.timestamp_pb2 import Timestamp
start = Timestamp()
start.FromJsonString("2023-01-01T00:00:00Z")
finish = Timestamp()
finish.FromJsonString("2023-01-07T23:59:59Z")
toi = await sdk.toi.create(
start_local=start,
finish_local=finish
)
# 4. Create Computation
computation = await sdk.algorithm_computation.create(
algorithm_config_id=config.id,
aoi_collection_id=aoi_collection.id,
toi_id=toi.id
)
# 5. Run Computation
await sdk.algorithm_computation.run(
ids=[computation.id]
)
print(f"Computation started: {computation.id}")
# 6. Wait for completion and get results
# (See "Viewing Results" section below)Step-by-Step Guide
Let's break down each step in detail:
Step 1: Create Algorithm Configuration
See Algorithm Configurations for complete details.
# Find your algorithm version
versions = await sdk.algorithm_version.list(
algorithm_id=algorithm_id
)
version_id = versions[0].id
# Create configuration
config = await sdk.algorithm_config.create(
algorithm_version_id=version_id,
name="production-config",
params={
'data_sources': [
{
'data_type_name': 'pings',
'data_source_ids': ['safegraph_pings', 'cuebiq_pings']
}
],
'parameters': {
'look_back_time': 3600,
'look_forward_time': 3600,
'override_visit_time': False
}
}
)
print(f"Config created: {config.id}")Step 2: Create AOI Collection
See AOI API Reference for complete details.
# Create collection
collection = await sdk.aoi_collection.create(
name="Bay Area Retail Locations"
)
# Add multiple AOIs
aois = await sdk.aoi.create(
aoi_collection_id=collection.id,
aoi_inputs=[
{
'name': 'Store 1',
'geom_wkt': 'POLYGON((...))'
},
{
'name': 'Store 2',
'geom_wkt': 'POLYGON((...))'
},
{
'name': 'Store 3',
'geom_wkt': 'POLYGON((...))'
}
]
)
print(f"Collection created with {len(aois)} AOIs")AOI formats:
You can specify AOI geometries using:
- WKT (Well-Known Text):
'POLYGON((lon1 lat1, lon2 lat2, ...))' - GeoJSON: Can be converted to WKT using standard libraries
Step 3: Create TOI
See TOI API Reference for complete details.
Simple TOI (single time range):
from google.protobuf.timestamp_pb2 import Timestamp
start = Timestamp()
start.FromJsonString("2023-03-01T00:00:00Z")
finish = Timestamp()
finish.FromJsonString("2023-03-31T23:59:59Z")
toi = await sdk.toi.create(
start_local=start,
finish_local=finish
)TOI with recurrence (weekdays only, 9am-5pm):
from terrascope_api.models.toi_pb2 import Recurrence, Cadence
start = Timestamp()
start.FromJsonString("2023-03-01T09:00:00Z")
finish = Timestamp()
finish.FromJsonString("2023-03-31T17:00:00Z")
# Recurrence rule: daily, weekdays only
recurrence_rule = "FREQ=DAILY;INTERVAL=1;BYDAY=MO,TU,WE,TH,FR"
recurrences = [
Recurrence(
rule=recurrence_rule,
duration=Cadence(
frequency=Cadence.Frequency.HOURLY,
value=8 # 8 hours per day
)
)
]
toi = await sdk.toi.create(
start_local=start,
finish_local=finish,
recurrences=recurrences
)Step 4: Create Computation
Combine the configuration, AOI collection, and TOI:
computation = await sdk.algorithm_computation.create(
algorithm_config_id=config.id,
aoi_collection_id=collection.id,
toi_id=toi.id
)
print(f"Computation created: {computation.id}")
print(f"Status: {computation.status}")The computation is created but not yet running.
Step 5: Run Computation
Start the computation:
# Run the computation
await sdk.algorithm_computation.run(
ids=[computation.id]
)
print(f"Computation started")
# You can run multiple computations at once
await sdk.algorithm_computation.run(
ids=[comp1_id, comp2_id, comp3_id]
)Monitoring Progress
Check Computation Status
# Get current status
computation = await sdk.algorithm_computation.get(
algorithm_computation_id=computation.id
)
print(f"Status: {computation.status}")
print(f"Progress: {computation.progress}%")Possible statuses:
CREATED- Computation created but not startedRUNNING- Currently executingCOMPLETED- Successfully finishedFAILED- Encountered an errorPAUSED- Temporarily pausedCANCELLED- Cancelled by user
Poll for Completion
import asyncio
async def wait_for_completion(computation_id, timeout=3600):
"""Poll until computation completes or times out"""
start_time = asyncio.get_event_loop().time()
while True:
computation = await sdk.algorithm_computation.get(
algorithm_computation_id=computation_id
)
status = computation.status
if status == 'COMPLETED':
print("✅ Computation completed successfully")
return True
elif status == 'FAILED':
print("❌ Computation failed")
return False
elif status == 'CANCELLED':
print("⚠️ Computation was cancelled")
return False
# Check timeout
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
print("⏱️ Timeout waiting for completion")
return False
# Wait before polling again
await asyncio.sleep(30) # Poll every 30 seconds
# Usage
success = await wait_for_completion(computation.id)View Computation History
# Get execution history
history = await sdk.algorithm_computation.history(
algorithm_computation_id=computation.id
)
for event in history:
print(f"{event.timestamp}: {event.status} - {event.message}")Managing Computations
Pause a Computation
await sdk.algorithm_computation.pause(
ids=[computation.id]
)
print("Computation paused")Resume a Paused Computation
await sdk.algorithm_computation.resume(
ids=[computation.id]
)
print("Computation resumed")Update Computation
You can update certain computation properties:
await sdk.algorithm_computation.update(
algorithm_computation_id=computation.id,
name="Updated computation name",
description="New description"
)Delete a Computation
await sdk.algorithm_computation.delete(
ids=[computation.id]
)
print("Computation deleted")Note: This deletes the computation record but not the results (if any were generated).
Viewing Results
Once a computation completes, you can access the results. See Algorithm Results for complete details.
Get Results Summary
from terrascope_api.models.result_pb2 import Pagination
# Get results
result_request = client.models.result_pb2.ResultGetRequest(
algorithm_computation_id=computation.id,
pagination=Pagination(page_size=100),
include_measurements=False # Don't include detailed measurements
)
result_response = await client.api.result.get(result_request)
# View observations
for result in result_response.results:
aoi_version = result.source_aoi_version
print(f"\nResults for AOI {aoi_version}:")
for observation in result.observations:
timestamp = observation.observation_start_ts
values = observation.observation_values
print(f" Time: {timestamp}")
print(f" Values: {values}")Export Results
# Export to CSV
export_request = client.models.result_pb2.ResultExportRequest(
algorithm_computation_id=computation.id,
file_type=client.models.result_pb2.ResultExportRequest.FileType.CSV,
export_type=client.models.result_pb2.ResultExportRequest.ExportType.STANDARD
)
export_response = await client.api.result.export(export_request, timeout=300)
download_url = export_response.download_url
print(f"Results available at: {download_url}")
# Download and process
import wget
import zipfile
zip_file = wget.download(download_url)
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall('./results')
print("Results extracted to ./results")Open Computations
Open Computations are computations with a TOI that extends into the future. They automatically process new data as it becomes available.
Create an Open Computation
from datetime import datetime, timedelta
# TOI that extends 30 days into the future
start = Timestamp()
start.FromDatetime(datetime.now())
finish = Timestamp()
finish.FromDatetime(datetime.now() + timedelta(days=30))
toi = await sdk.toi.create(
start_local=start,
finish_local=finish
)
# Create and run computation
computation = await sdk.algorithm_computation.create(
algorithm_config_id=config.id,
aoi_collection_id=collection.id,
toi_id=toi.id
)
await sdk.algorithm_computation.run(ids=[computation.id])
print("Open computation started - will process data as it arrives")Open Computations continue running until:
- The TOI finish time is reached
- You pause or cancel the computation
- The computation encounters an error
Troubleshooting
Computation Stays in CREATED Status
The computation wasn't started:
# Make sure to call run()
await sdk.algorithm_computation.run(ids=[computation.id])Computation Fails Immediately
Check the computation history for errors:
history = await sdk.algorithm_computation.history(
algorithm_computation_id=computation.id
)
for event in history:
if event.status == 'FAILED':
print(f"Error: {event.message}")Common causes:
- Invalid parameter values
- Data source not available
- Container image not accessible
- Insufficient resources
No Results Generated
Check if:
- AOI collection is empty
- TOI has no data available
- Data sources don't have data for the specified time/location
# Check AOI collection
aois = await sdk.aoi.list(aoi_collection_id=collection.id)
print(f"AOI count: {len(aois)}")
# Check TOI
toi = await sdk.toi.get(toi_id=toi.id)
print(f"TOI: {toi.start_local} to {toi.finish_local}")Best Practices
- Start small - Test with one AOI and short time range first
- Monitor progress - Check status periodically during long-running computations
- Handle failures - Implement retry logic for transient failures
- Clean up - Delete test computations when done
- Use appropriate resources - Ensure algorithm manifest matches actual needs
Next Steps
- Learn more about Algorithm Configurations
- Understand Creating Computations in depth
- See Algorithm Results for accessing output data
- Review the SDK API Reference for all available methods
Updated 5 months ago