Using data engineering tools to improve control over data has never been easier.

Thanks to a massive effort from the data science community, we are in the midst of a data revolution. Machine learning and artificial intelligence sectors are evolving to produce more advanced solutions, with a common factor being the way both sectors have adopted data pipelines to improve the way they use the data.

In computing, a pipeline (also known as a data pipeline) is a set of data processing elements connected in series, where the output of one element is the input of the next one.

How data flows through operations

Applying data engineering concepts and technologies to our applications creates another level of control over the data. We can build transformation processes that are easily implemented, maintained and reused.

In this post, we show you how to complete a simple data manipulation task using an open source data engineering solution called Kedro.

The problem: A simple day-to-day task

For the sake of simplicity, let’s define a simple product dataset that contains these fields:

  • Item that is our database ID
  • Product field that contains the product name
  • Category of the product
  • Units as the amount of products
  • UnitCost is the base cost of the product
  • Margin is the percentage of benefit we want to apply on the product cost

And a few very simple TODO list tasks:

  1. Read some data from a spreadsheet.
  2. Do some cleaning (missing data) and fixes (names not capitalised).
  3. Calculate the final price (unit cost + unit cost * margin).
  4. Save the results to a JSON file.

Using Kedro makes it easier to manage large workflows and to build and reuse pluggable nodes.

Preparing the working environment

Kedro is an open source Python framework for creating reproducible, maintainable and modular data science code.

To run Kedro on our computer, we need to have the following installed:

  • Python > 3.6
  • A virtual environment activated (try Anaconda if you are not familiar with venvs)
  • The project example downloaded to our computer

As with any Python package, we install Kedro with:

> pip install kedro

The easiest way to verify if Kedro has been installed correctly is to run this command:

> kedro info

We should see Kedro printed as an ASCII art graphic:

Once Kedro is installed, we can continue installing our project dependencies, which have been declared in our repository already. Simply run this command to install them:

> kedro install

A brief overview of Kedro

The framework has plenty of documentation and examples, but for this example, we need to understand just three main concepts:

  • Nodes are the building blocks and represent tasks or operations.
  • Pipelines organise the dependencies and execution order of our collection of nodes.
  • DataCatalog is a dictionary to store datasets.

Our implementation

The scaffolding of this example has been generated using the basic Kedro starter template. This step is not mandatory, but it is useful if we want to have good separation of concerns.

If you are starting a new project, Kedro provides a command to kickstart it, but you won’t need it for this demo:

> kedro new

Kedro has been built on top of some very powerful libraries, including numpy and pandas, which will give us a huge performance boost and simplify the process. However, their scope is beyond this article. Let’s learn it in a practical way.
As we explained previously, our Data Catalog will contain datasets definitions. Datasets can be defined either via YAML or programmatically, using an API. Both methods allow you to specify the dataset name, type, location, save and load arguments, as well as credentials.

Dataset type

In our example, we define our input and output datasets in catalog.yml. The first dataset will point to our spreadsheet, and the second dataset will point to our final JSON file.

example_spreadsheet:                        # Dataset Name
  type: pandas.ExcelDataSet                  # Library to handle the operations
  filepath: data/01_raw/kedro_example.xlsx   # Relative file path
  Load_args:                                 # Arguments used when we load data
    engine: openpyxl     # A Python library to read/write Excel 2010 xlsx/xlsm files
 
 product_data:     				     # Dataset name
  type: pandas.JSONDataSet     		     # Library to handle the file operations
  filepath: data/03_primary/calculate_pricing.json     # File to read/write data
  save_args:    				     # Arguments used when we save the dataset
    orient: 'records'     # listlike [{column -> value}, ... , {column -> value}]

Kedro is an opinionated framework, and it encourages good practices about folding structure. In this implementation, we are including our spreadsheet file in data/01_raw, and the output dataset will be saved, for example, in the folder data/03_primary,

If we want to add validation in any interim step, we can define the node output in our Data Catalog. Otherwise, Kedro will keep this information in-memory using the MemoryDataSet.

Defining our nodes or operations based on our TODO list example

We need to read data from a spreadsheet. By defining the dataset in the Data Catalog, we have taken care of this already.

Applying data transformation to prepare the data for clients

Each node will be a function that may accept some parameters and may return some data. We declare all inputs and outputs using Python typing.

Our example nodes expect a panda dataframe, which is two-dimensional, size-mutable, tabular data. It also contains both row and column labels, and they will return a modified dataframe.

Step 1: Let’s clean any row which has an empty product name. Calling the panda method dropna, we are able to remove missing values. With the subset parameter, we are pointing to the label selected.

def remove_missing_label(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna(subset=['Product'])

Step 2: We also want to remove any products that have been sold out (no more units). Assigning the data frame to a filtered version of itself is quite convenient when we want to filter by conditions.

def remove_no_units(data: pd.DataFrame) -> pd.DataFrame:
    return data[data['Units'] > 0]

Step 3: Product names are not well formatted either. Following the same approach, we create another function in which we apply a lambda function to capitalise each name.

def capitalize_product_names(data: pd.DataFrame) -> pd.DataFrame:
    data['Product'] = data.Product.apply(lambda x: x.capitalize())
    return data

Step 4: Calculate the final price of each product. Adding an extra column is simple. In a couple of lines, we can calculate the final price, which basically is the cost of the product plus the benefit (unit cost * margin). Finally, we drop these internal fields.

def calculate_pricing(data: pd.DataFrame) -> pd.DataFrame:
    data['Price'] = data.UnitCost + data.UnitCost * data.Margin
    return data.drop(['UnitCost', 'Margin'], axis=1)

Save the final output into a JSON file

Again, this has been done already. We defined this output in our data catalog, so Kedro will export it in the format selected.

Connecting our nodes through a pipeline

At this point, we have defined our data inputs and outputs and our operations or nodes, and now we are going to connect them, creating a data pipeline as follows:

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                remove_missing_label,
                "example_spreadsheet",
                "remove_missing_label",
            ),
            node(
                remove_no_units,
                "remove_missing_label",
                "remove_no_units",
            ),
            node(
                capitalize_product_names,
                "remove_no_units",
                "capitalize_product_names",
            ),
            node(
                calculate_pricing,
                "capitalize_product_names",
                "product_data",
            ),
        ]
    )

Registering and running our pipeline

The final step is to register our new pipeline into Kedro’s main execution. Reusing the default class ProjectHooks, we import and instantiate our new pipeline. Basically, it will return all defined nodes, and Kedro will organise them depending on their inputs and outputs.

from kedro_nearform_example.pipelines import data_engineering as de
 
 class ProjectHooks:
   # With this decorator we declare a hook and we use the custom class register_pipelines to define our implementation
   @hook_impl
   def register_pipelines(self) -> Dict[str, Pipeline]:
       """Register the project's pipeline.
       Returns: A mapping from a pipeline name to a ``Pipeline`` object.
       """
       data_engineering_pipeline = de.create_pipeline()
 
       return {
           "de": data_engineering_pipeline,
           "__default__": data_engineering_pipeline,
       }

And that’s it. Let’s run our program:

> kedro run

If everything goes well, Kedro will generate a JSON data file like this:

 
[
   {
       "Item": 1,
       "Product": "Basketball",
       "Category": "Sports",
       "Units": 100,
       "Price": 6.0
   },
   {
       "Item": 2,
       "Product": "Tennis ball",
       "Category": "Sports",
       "Units": 100,
       "Price": 4.0
   },
...
]

If we install Kedro Viz, we can graphically view the full process:

This was just a brief introduction to Kedro. It has many additional helpful features. We can continue building applications using data as a part of our objects, functions and modules — but we can also try this new paradigm of building data-oriented architectures using reusable, testable, high-performance data science tools.

View all posts  |  Technology  |  Business  |  Culture  |  Opinion  |  Design
Follow us for more information on this and other topics.