Batteries-Included Workflow Orchestration Tool: Flyte

https://odsc.com/california/#register

About Flyte

import pandas as pd
from flytekit import Resources, task
@task(limits=Resources(cpu="2", mem="150Mi"))
def total_pay(hourly_pay: float, hours_worked: int, df: pd.DataFrame) ->
pd.DataFrame:
return df.assign(total_pay=hourly_pay * hours_worked)

Key Benefits of Flyte

Type Checking

def concat(a, b):
return a + b
def concat(a: int, b: int) -> int:
return a + b
import typing
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit import task, workflow
@dataclass_json
@dataclass
class Datum(object):
"""
Example of a simple custom class that is modeled as a dataclass.
"""
x: int
y: str
z: typing.Dict[int, str]
@task
def stringify(x: int) -> Datum:
"""
A dataclass return will be regarded as a complex single JSON return.
"""
return Datum(x=x, y=str(x), z={x: str(x)})
@task
def add(x: Datum, y: Datum) -> Datum:
"""
Flytekit will automatically convert the passed in JSON to a DataClass. If the structures do not match, it will raise a runtime failure.
"""
x.z.update(y.z)
return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z)
@workflow
def wf(x: int, y: int) -> Datum:
"""
Dataclasses (JSON) can be returned from a workflow.
"""
return add(x=stringify(x=x), y=stringify(x=y))
if __name__ == "__main__":
"""
This workflow can be run locally. During local execution, the dataclasses are marshalled to and from json.
"""
wf(x=10, y=20)
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
@task
def t1(f: FlyteFile) -> str:
with open(f) as fp:
data = fp.readlines()
return data[0]
@workflow
def wf() -> str:
return t1(
f="https://raw.githubusercontent.com/mwaskom/seaborndata/master/iris.csv"
)
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running wf(), first line of the file is '{wf()}'")
flytekit.common.exceptions.user.FlyteAssertion: Failed to get data from
https://raw.githubusercontent.com/mwaskom/seaborndata/master/iris.csv to
/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteabhpuq8t/20211011_163211/local_flytekit/d50f36f4119018dda42d601f76ea0999/iris.csv (recursive=False).
Original exception: Value error! Received: 404. Request for data
@ https://raw.githubusercontent.com/mwaskom/seaborndata/master/iris.csv failed.
Expected status code 200

Reproducibility and Fault Tolerance

Reproducibility

Fault Tolerance

Incremental Development

Caching

@task(cache=True, cache_version="1.0")
def fetch_dataset(...) -> ...:
...

Conclusion

--

--

--

Our passion is bringing thousands of the best and brightest data scientists together under one roof for an incredible learning and networking experience.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Lift Charts and Why they are important in business

Meet the Speakers of ODSC NYC 2019

Steps For Choosing The Right Data Collection Tool

Five Tips to Elevate the Readability of your Python Code

Logistic Regression

Do You Need to Know Math for Data Science?

3 Machine Learning models for demand forecasting: Bike share company case study

How to boost your GNN

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
ODSC - Open Data Science

ODSC - Open Data Science

Our passion is bringing thousands of the best and brightest data scientists together under one roof for an incredible learning and networking experience.

More from Medium

Do You Really Need a Feature Store?

A compact way to store your dataframes to S3 directly from Python

Avro Utility Tool: Swiss Army knife

We were all using Airflow wrong… And now, it’s fixed!