Before and after Example
Before ETL Process:
import requests
def extract() -> dict:
"""Use the Open Weather Map API to fetch Boston weather data.
Returns:
dict: a JSON response of many wheather measurements.
"""
url = "https://api.openweathermap.org/data/2.5/weather"
response = requests.request(
"GET", url, params={"q": "Boston", "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
)
return response.json()
def transform(response: dict) -> float:
"""Process the response and extract windspeed information
Args:
response (dict): Response JSON from extract task
Returns:
float: Current wind speed
"""
return response.get("wind", {}).get("speed", 0.0)
def load(speed: float):
"""Append data to file
Args:
speed (float): Windspeed from transform task
"""
with open("windspeed.txt", "a") as f:
f.write(str(speed) + "\n")
if __name__ == "__main__":
response = extract()
windspeed = transform(response)
load(windspeed)
After prefect, turning functions into tasks and making sure they retry and on intervals for success:
import requests
from prefect import task, Flow
from datetime import timedelta
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract() -> dict:
...
@task
def transform(response: dict) -> float:
...
@task(max_retries=3, retry_delay=timedelta(5))
def load(speed: float):
...
with Flow("Windspeed Tracker") as flow:
response = extract()
windspeed = transform(response)
load(windspeed)
if __name__ == "__main__":
flow.run()