Author: Juan Angel Giraldo
As programmers or data professionals, we always use tools to automate our clients’ processes or specific flows within our work, but what happens when our tasks in writing code also become repetitive? In this opportunity, we will study the concept of code generators for data tasks, their variations, and the application of these ideas to data product development.
Generative programs
Automatic programming is a programming technique through which computer programs can be created using different abstraction mechanisms. These procedures allow us to naturally extend routine tasks in software development to abstract, high-level programs. In this way, programs can be created that support us in the process of “manufacturing” other software components in an automated and systematic way.
These generative programs serve different purposes and are implemented with various methods and tools. For example, a code generator can be used to build database access code or external service access layers.
Some of how these generative programs are implemented are:
– Code Templates: predefined code snippets that can be reused and customized for specific functionalities or patterns.
– Code Scaffolding (based on code templates)
– Domain Specific Languages (DSL)
– Model Driven Development tools (MDD)
– Metaprogramming (run-time modification)
Code generators por medio de templates
This exploration will focus on the technique associated with code templates. In this case we present four use cases where this technique is useful to speed up development times in projects where the objective is to develop data-driven products.
The strategy for creating programs with code templates is as follows:
– Establish a file template: You develop the template from which you want to create more programs systematically.
– Create a config file (optional): The template must have space to be configured according to parameters set when it is designed. The assignment of values to these parameters can be done directly in the code generator program or modularized in a config file.
– Create the code generation program: This program acts as a factory to create new programs from the established template and the determined parameters. This program receives the configuration values and returns a program that is ready to run.
Dynamic DAG generation for Airflow
Data engineering and task orchestration
Occasionally, crafting DAGs manually proves impractical. Perhaps you’re dealing with hundreds or thousands of DAGs, each performing analogous tasks with only a few parameters distinction. In such a scenario, dynamically generating DAGs emerges as the more logical approach.
File template
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from pendulum import datetime
@dag(
dag_id=dag_id_to_replace,
start_date=datetime(2023, 7, 1),
schedule=schedule_to_replace,
catchup=False,
)
def dag_from_config():
BashOperator(
task_id=”say_hello”,
bash_command=bash_command_to_replace,
env={“ENVVAR”: env_var_to_replace},
)
dag_from_config()
Config file
{
“dag_id”: “dag_file_1”,
“schedule”: “‘@daily'”,
“bash_command”: “‘echo $ENVVAR'”,
“env_var”: “‘Hello! :)'”
}
Code generator
import json
import os
import shutil
import fileinput
config_filepath = “include/dag-config/”
dag_template_filename = “include/dag-template.py”
for filename in os.listdir(config_filepath):
f = open(config_filepath + filename)
config = json.load(f)
new_filename = “dags/” + config[“dag_id”] + “.py”
shutil.copyfile(dag_template_filename, new_filename)
for line in fileinput.input(new_filename, inplace=True):
line = line.replace(“dag_id_to_replace”, “‘” + config[“dag_id”] + “‘”)
line = line.replace(“schedule_to_replace”, config[“schedule”])
line = line.replace(“bash_command_to_replace”, config[“bash_command”])
line = line.replace(“env_var_to_replace”, config[“env_var”])
print(line, end=””)
In this case, the parameter substitution logic was set up manually in the generator program. However, to make the templates more flexible and improve the consistency of the generator programs, we can use tools such as Jinja.
Jinja templating is a popular template engine for Python. It allows you to embed Python-like expressions and statements within templates, which are then rendered into dynamic content. Jinja templates contain placeholders, or variables, enclosed within double curly braces {{ }}, along with control structures like loops and conditionals.
When the template is rendered, these placeholders are replaced with actual values based on the context provided. This enables dynamic content generation for web pages, emails, configuration files, and more, making Jinja templating widely used in web development, automation, and content generation tasks.
Data quality checks for dataframes
Data engineering and data quality
An essential part of our work as data stewards is ensuring consistency and quality are maintained when moving and transforming data. This task is extensive and sometimes repetitive, depending on the source of the data and the processing. We can design a template to automate a first approximation, for example, the following piece of code does standard checks on a data frame:
File Template
import pandas as pd
data = pd.read_csv(‘{{ input_file }}’)
# Data validation functions
def check_completeness(column_name):
if data[column_name].isnull().any():
print(f”Validation failed: {column_name} contains missing values.”)
else:
print(f”Validation passed: {column_name} is complete.”)
def check_duplicates(column_name):
if data[column_name].duplicated().any():
print(f”Validation failed: {column_name} contains duplicate values.”)
else:
print(f”Validation passed: {column_name} has no duplicates.”)
def check_numeric_range(column_name, min_value, max_value):
if (data[column_name] < min_value).any() or (data[column_name] > max_value).any():
print(f”Validation failed: {column_name} values are outside the range ({min_value}, {max_value}).”)
else:
print(f”Validation passed: {column_name} values are within the range.”)
def check_string_length(column_name, max_length):
if data[column_name].str.len().max() > max_length:
print(f”Validation failed: {column_name} contains values exceeding the maximum length ({max_length}).”)
else:
print(f”Validation passed: {column_name} values are within the length limit.”)
# Perform validations
{% for column in columns_to_validate %}
check_completeness(‘{{ column }}’)
check_duplicates(‘{{ column }}’)
check_numeric_range(‘{{ column }}’, 0, 100)
check_string_length(‘{{ column }}’, 50)
{% endfor %}
Code generator
from jinja2 import Template
# Define the validation template
with open(“Template_file.template”) as f:
validation_template = f.read()
# Define parameters for template rendering
validation_params = {
‘input_file’: ‘data_to_validate.csv’,
‘columns_to_validate’: [‘column1’, ‘column2’, ‘column3’]
}
# Render the template with parameters
rendered_validation_code = Template(validation_template).render(validation_params)
print(rendered_validation_code)
Pipeline Scaffolding
Data engineering and Data pipelines
The task of designing a pipeline to feed a statistical analysis or machine learning model can vary in its steps and needs. However, there are certain steps that can be standardized and would follow a pattern as follows:
File template
import pandas as pd
from mymodules import create_engine # universal database connection
# Extract
{% if extract_from_csv %}
data = pd.read_csv(‘{{ source_file }}’)
{% elif extract_from_database %}
{% if database_engine == ‘postgresql’ %}
engine = create_engine(‘{{ source_database_url }}’, database=”postgresql”)
{% elif database_engine == ‘mysql’ %}
engine = create_engine(‘{{ source_database_url }}’, database=”mysql”)
{% endif %}
query = ‘{{ source_query }}’
data = pd.read_sql(query, con=engine)
{% endif %}
# Transform
###
# Load
{% if target_database_engine == ‘postgresql’ %}
engine = create_engine(‘{{ target_database_url }}’, database=”postgresql”)
{% elif target_database_engine == ‘mysql’ %}
engine = create_engine(‘{{ target_database_url }}’, database=”mysql”)
{% endif %}
data.to_sql(‘{{ target_table }}’, con=engine, index=False, if_exists=’replace’)
Code generator
from jinja2 import Template
with open(“Template_file.template”) as f:
etl_template = f.read()
# Define parameters for template rendering
etl_params = {
‘extract_from_csv’: False,
‘extract_from_database’: True,
‘database_engine’: ‘postgresql’,
‘source_database_url’: ‘postgresql://username:password@localhost:5432/source_database’,
‘source_query’: ‘SELECT FROM source_table WHERE date > \’2022-01-01\”,
‘target_database_engine’: ‘postgresql’,
‘target_database_url’: ‘postgresql://username:password@localhost:5432/target_database’,
‘target_table’: ‘target_table’
}
# Render the template with parameters
rendered_etl_code = Template(etl_template).render(etl_params)
print(rendered_etl_code)
Documentation Scaffolding
Finally, another task that can save us time and repetition effort is the creation of documentation for our projects. A simple version of this template could be:
Template file
# {{ script_name }} Documentation
## Purpose
{{ purpose }}
## Usage
“`bash
python {{ script_name }} {{ usage_arguments }}
“`
## Dependencies
– Python 3.x
– Dependencies: {{ dependencies }}
## Configuration
– Describe any configuration settings or environment variables.
## Example
– Provide an example of how to run the script with sample inputs.
## Contributing
– Explain how others can contribute to the development of the script.
## License
– Specify the license information for the script.
Code generator
from jinja2 import Template
def get_dependencies():
try:
with open(‘requirements.txt’, ‘r’) as file:
return file.read().strip()
except FileNotFoundError:
return ‘No dependencies found’
def generate_documentation(script_name, purpose, usage_arguments):
with open(“Template_file.template”) as f:
documentation_template = f.read()
# Define parameters for template rendering
documentation_params = {
‘script_name’: script_name,
‘purpose’: purpose,
‘usage_arguments’: usage_arguments,
‘dependencies’: get_dependencies()
}
# Render the template with parameters
rendered_documentation = Template(documentation_template).render(documentation_params)
print(rendered_documentation)
if __name__ == “__main__”:
generate_documentation(
script_name=’example_script.py’,
purpose=’This script performs a specific task.’,
usage_arguments=’input_file.txt output_file.txt’
)
CONCLUSION
Although simple in implementation, these demonstrations bring to the table a whole world of possibilities for process standardization in the data-driven product or service creation space. The use of these techniques allows us to take ownership of the procedures and mold them to the needs of the project or our organization.
References
– https://en.wikipedia.org/wiki/Automatic_programming (Automatic programming)
– https://docs.astronomer.io/learn/dynamically-generating-dags (Dynamically generate DAGs in Airflow)
Juan Angel Giraldo – Data Engineer