Running Algorithms

Complete workflow to execute algorithm computations

Running Algorithms

Once your algorithm is registered, you can run it by creating and executing Algorithm Computations. This guide walks through the complete workflow.

Overview

Running an algorithm requires four components:

  1. Algorithm Configuration - Specifies parameter values and data sources
  2. AOI Collection - Defines where to run the algorithm
  3. TOI - Defines when to run the algorithm
  4. Algorithm Computation - Combines the above and executes

Quick Start Example

Here's a complete example of running an algorithm:

from elements.sdk.elements_sdk import ElementsSDK

sdk = ElementsSDK()

# 1. Create Algorithm Configuration
config = await sdk.algorithm_config.create(
    algorithm_version_id="algo-version-id-here",
    name="my-device-visits-config",
    params={
        'data_sources': [{
            'data_type_name': 'pings',
            'data_source_ids': ['safegraph_pings']
        }],
        'parameters': {
            'look_back_time': 1800,
            'look_forward_time': 1800
        }
    }
)

# 2. Create AOI Collection
aoi_collection = await sdk.aoi_collection.create(
    name="Downtown Palo Alto"
)

# Add an AOI to the collection
aoi = await sdk.aoi.create(
    aoi_collection_id=aoi_collection.id,
    aoi_inputs=[{
        'name': 'University Avenue',
        'geom_wkt': 'POLYGON((-122.161 37.445, -122.160 37.445, ...))'
    }]
)

# 3. Create TOI
from google.protobuf.timestamp_pb2 import Timestamp

start = Timestamp()
start.FromJsonString("2023-01-01T00:00:00Z")

finish = Timestamp()
finish.FromJsonString("2023-01-07T23:59:59Z")

toi = await sdk.toi.create(
    start_local=start,
    finish_local=finish
)

# 4. Create Computation
computation = await sdk.algorithm_computation.create(
    algorithm_config_id=config.id,
    aoi_collection_id=aoi_collection.id,
    toi_id=toi.id
)

# 5. Run Computation
await sdk.algorithm_computation.run(
    ids=[computation.id]
)

print(f"Computation started: {computation.id}")

# 6. Wait for completion and get results
# (See "Viewing Results" section below)

Step-by-Step Guide

Let's break down each step in detail:

Step 1: Create Algorithm Configuration

See Algorithm Configurations for complete details.

# Find your algorithm version
versions = await sdk.algorithm_version.list(
    algorithm_id=algorithm_id
)
version_id = versions[0].id

# Create configuration
config = await sdk.algorithm_config.create(
    algorithm_version_id=version_id,
    name="production-config",
    params={
        'data_sources': [
            {
                'data_type_name': 'pings',
                'data_source_ids': ['safegraph_pings', 'cuebiq_pings']
            }
        ],
        'parameters': {
            'look_back_time': 3600,
            'look_forward_time': 3600,
            'override_visit_time': False
        }
    }
)

print(f"Config created: {config.id}")

Step 2: Create AOI Collection

See AOI API Reference for complete details.

# Create collection
collection = await sdk.aoi_collection.create(
    name="Bay Area Retail Locations"
)

# Add multiple AOIs
aois = await sdk.aoi.create(
    aoi_collection_id=collection.id,
    aoi_inputs=[
        {
            'name': 'Store 1',
            'geom_wkt': 'POLYGON((...))'
        },
        {
            'name': 'Store 2',
            'geom_wkt': 'POLYGON((...))'
        },
        {
            'name': 'Store 3',
            'geom_wkt': 'POLYGON((...))'
        }
    ]
)

print(f"Collection created with {len(aois)} AOIs")

AOI formats:

You can specify AOI geometries using:

  • WKT (Well-Known Text): 'POLYGON((lon1 lat1, lon2 lat2, ...))'
  • GeoJSON: Can be converted to WKT using standard libraries

Step 3: Create TOI

See TOI API Reference for complete details.

Simple TOI (single time range):

from google.protobuf.timestamp_pb2 import Timestamp

start = Timestamp()
start.FromJsonString("2023-03-01T00:00:00Z")

finish = Timestamp()
finish.FromJsonString("2023-03-31T23:59:59Z")

toi = await sdk.toi.create(
    start_local=start,
    finish_local=finish
)

TOI with recurrence (weekdays only, 9am-5pm):

from terrascope_api.models.toi_pb2 import Recurrence, Cadence

start = Timestamp()
start.FromJsonString("2023-03-01T09:00:00Z")

finish = Timestamp()
finish.FromJsonString("2023-03-31T17:00:00Z")

# Recurrence rule: daily, weekdays only
recurrence_rule = "FREQ=DAILY;INTERVAL=1;BYDAY=MO,TU,WE,TH,FR"

recurrences = [
    Recurrence(
        rule=recurrence_rule,
        duration=Cadence(
            frequency=Cadence.Frequency.HOURLY,
            value=8  # 8 hours per day
        )
    )
]

toi = await sdk.toi.create(
    start_local=start,
    finish_local=finish,
    recurrences=recurrences
)

Step 4: Create Computation

Combine the configuration, AOI collection, and TOI:

computation = await sdk.algorithm_computation.create(
    algorithm_config_id=config.id,
    aoi_collection_id=collection.id,
    toi_id=toi.id
)

print(f"Computation created: {computation.id}")
print(f"Status: {computation.status}")

The computation is created but not yet running.

Step 5: Run Computation

Start the computation:

# Run the computation
await sdk.algorithm_computation.run(
    ids=[computation.id]
)

print(f"Computation started")

# You can run multiple computations at once
await sdk.algorithm_computation.run(
    ids=[comp1_id, comp2_id, comp3_id]
)

Monitoring Progress

Check Computation Status

# Get current status
computation = await sdk.algorithm_computation.get(
    algorithm_computation_id=computation.id
)

print(f"Status: {computation.status}")
print(f"Progress: {computation.progress}%")

Possible statuses:

  • CREATED - Computation created but not started
  • RUNNING - Currently executing
  • COMPLETED - Successfully finished
  • FAILED - Encountered an error
  • PAUSED - Temporarily paused
  • CANCELLED - Cancelled by user

Poll for Completion

import asyncio

async def wait_for_completion(computation_id, timeout=3600):
    """Poll until computation completes or times out"""
    start_time = asyncio.get_event_loop().time()

    while True:
        computation = await sdk.algorithm_computation.get(
            algorithm_computation_id=computation_id
        )

        status = computation.status

        if status == 'COMPLETED':
            print("✅ Computation completed successfully")
            return True
        elif status == 'FAILED':
            print("❌ Computation failed")
            return False
        elif status == 'CANCELLED':
            print("⚠️ Computation was cancelled")
            return False

        # Check timeout
        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > timeout:
            print("⏱️ Timeout waiting for completion")
            return False

        # Wait before polling again
        await asyncio.sleep(30)  # Poll every 30 seconds

# Usage
success = await wait_for_completion(computation.id)

View Computation History

# Get execution history
history = await sdk.algorithm_computation.history(
    algorithm_computation_id=computation.id
)

for event in history:
    print(f"{event.timestamp}: {event.status} - {event.message}")

Managing Computations

Pause a Computation

await sdk.algorithm_computation.pause(
    ids=[computation.id]
)

print("Computation paused")

Resume a Paused Computation

await sdk.algorithm_computation.resume(
    ids=[computation.id]
)

print("Computation resumed")

Update Computation

You can update certain computation properties:

await sdk.algorithm_computation.update(
    algorithm_computation_id=computation.id,
    name="Updated computation name",
    description="New description"
)

Delete a Computation

await sdk.algorithm_computation.delete(
    ids=[computation.id]
)

print("Computation deleted")

Note: This deletes the computation record but not the results (if any were generated).

Viewing Results

Once a computation completes, you can access the results. See Algorithm Results for complete details.

Get Results Summary

from terrascope_api.models.result_pb2 import Pagination

# Get results
result_request = client.models.result_pb2.ResultGetRequest(
    algorithm_computation_id=computation.id,
    pagination=Pagination(page_size=100),
    include_measurements=False  # Don't include detailed measurements
)

result_response = await client.api.result.get(result_request)

# View observations
for result in result_response.results:
    aoi_version = result.source_aoi_version
    print(f"\nResults for AOI {aoi_version}:")

    for observation in result.observations:
        timestamp = observation.observation_start_ts
        values = observation.observation_values

        print(f"  Time: {timestamp}")
        print(f"  Values: {values}")

Export Results

# Export to CSV
export_request = client.models.result_pb2.ResultExportRequest(
    algorithm_computation_id=computation.id,
    file_type=client.models.result_pb2.ResultExportRequest.FileType.CSV,
    export_type=client.models.result_pb2.ResultExportRequest.ExportType.STANDARD
)

export_response = await client.api.result.export(export_request, timeout=300)

download_url = export_response.download_url
print(f"Results available at: {download_url}")

# Download and process
import wget
import zipfile

zip_file = wget.download(download_url)

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall('./results')

print("Results extracted to ./results")

Open Computations

Open Computations are computations with a TOI that extends into the future. They automatically process new data as it becomes available.

Create an Open Computation

from datetime import datetime, timedelta

# TOI that extends 30 days into the future
start = Timestamp()
start.FromDatetime(datetime.now())

finish = Timestamp()
finish.FromDatetime(datetime.now() + timedelta(days=30))

toi = await sdk.toi.create(
    start_local=start,
    finish_local=finish
)

# Create and run computation
computation = await sdk.algorithm_computation.create(
    algorithm_config_id=config.id,
    aoi_collection_id=collection.id,
    toi_id=toi.id
)

await sdk.algorithm_computation.run(ids=[computation.id])

print("Open computation started - will process data as it arrives")

Open Computations continue running until:

  • The TOI finish time is reached
  • You pause or cancel the computation
  • The computation encounters an error

Troubleshooting

Computation Stays in CREATED Status

The computation wasn't started:

# Make sure to call run()
await sdk.algorithm_computation.run(ids=[computation.id])

Computation Fails Immediately

Check the computation history for errors:

history = await sdk.algorithm_computation.history(
    algorithm_computation_id=computation.id
)

for event in history:
    if event.status == 'FAILED':
        print(f"Error: {event.message}")

Common causes:

  • Invalid parameter values
  • Data source not available
  • Container image not accessible
  • Insufficient resources

No Results Generated

Check if:

  • AOI collection is empty
  • TOI has no data available
  • Data sources don't have data for the specified time/location
# Check AOI collection
aois = await sdk.aoi.list(aoi_collection_id=collection.id)
print(f"AOI count: {len(aois)}")

# Check TOI
toi = await sdk.toi.get(toi_id=toi.id)
print(f"TOI: {toi.start_local} to {toi.finish_local}")

Best Practices

  1. Start small - Test with one AOI and short time range first
  2. Monitor progress - Check status periodically during long-running computations
  3. Handle failures - Implement retry logic for transient failures
  4. Clean up - Delete test computations when done
  5. Use appropriate resources - Ensure algorithm manifest matches actual needs

Next Steps