Luigi Tips and Tricks

Luigi is a great library to manage data science workflows. Here is a collection of tricks I found useful. It assumes basic familiarity with Luigi. If you want more details on how I use Luigi day-to-day, consider reading my previous post!

Pickled Targets

Targets are the core of Luigi. Luigi already provides a set of useful Targets, but I was missing simply being able to pass arbitrary objects between tasks.

To this end, I use python’s built-in serialization mechanism, pickle:

import luigi.file
import luigi.format

import os.path
import pickle

from typing import Union, Sequence

import logging
_log = logging.getLogger(__name__)

class PickledTarget(luigi.file.LocalTarget):
    _data = {}
    
    def __init__(self, id: Union[str, Sequence[str]],
                 cache_dir: str =None) :
        if cache_dir is None or len(cache_dir) == 0:
            cache_dir = 'luigi_cache/'
        
        self._id = id if type(id) == str else os.path.join(*id)
        super().__init__(
            os.path.join(
                cache_dir,
                f'{self._id}'),
            format=luigi.format.Nop)
        
    def put(self, data):
        self._data[self._id] = data
        
        _log.debug('caching data for {} in {}'.format(self._id, self.path))
        with self.open('wb') as tmp:
            pickle.dump(data, tmp)
        
    def get(self):
        if self._id in self._data:
            return self._data[self._id]
        
        if self.exists():
            _log.debug(f'found cached data for {self._id} in {self.path}')
            self._data[self._id] = pickle.load(self.open('rb'))
            return self._data[self._id]
        
        raise Exception('no value stored for {}'.format(self._id))

There are various points that are noteworthy:

  • the target stores the data in a class-level field, which allows to minimize the number of deserializations. This assumes that the data is (handled by the client code as if it was) immutable, which I always do. Having this field as a class-level rather than an instance field is the normal way to go in Luigi.
  • the data is associated with an ID, which corresponds to the serialization location. This id should be specific to a particular task and its set of parameters.
  • the ID can be either a single str, in which case it will be the name of the file in the cache directory, or a sequence of str (tuple or list), in which case it will be a sequence of nested directories. This helps organize the cache with a structure of the type task_name/param1/param2/serialized_data. This can be helpful when the logic of a task has been changed and the cache needs to be cleared.
  • the super class needs to be initialized with the luigi.format.Nop format parameter for being able to write in binary mode. A look at the code shows that this format is mostly aimed at handling line endings, and so the default fails when opening a file in non-text mode. More details can be found in this GitHub issue.
  • even though one caches the data in memory, it is necessary to serialize everything to disk when running Luigi with multiple workers, as the _data field is not shared between processes. As it might be useful to avoid serialization while performing experiments, the target has the cache_dir parameter to allow caching to a temporary directory
  • you might be surprised by the way the default value for that parameter is handled. This is to work nicer with Luigi task parameters. I do not claim this is the best solution, it just happened to fit nicely in my workflow.

Using that task looks like this:

class CookMeal(luigi.Task):
    ingredients = luigi.ListParameter([
        'eggs', 'spam', 'bacon', 'spam', 'spam'
    ])
    cache_dir = luigi.Parameter('')

    def run(self):
        out = self.output()

        out.put(' and '.join(self.ingredients))

    def output(self):
        return PickledTarget(
            ('cook_meal', *self.ingredients),
            self.cache_dir)

Printing the Dependency Graph

It is often useful to be able to visualize the dependency graph one is about to run. Luigi’s central scheduler provides a web interface to this end, but I am not always able or willing to use the central scheduler. No big deal! Luigi also provides the ability to get a string representation of the dependency graph, through the luigi.tools.deps_tree.print_tree(task) function. Note that, contrary to the name, the function does not actually print anything, but instead returns a string, that you will be responsible to print using any method you like (print, logging or what-have-you).

Arguably less nice to look at than the web version, but extremely useful! And if you happen to store logfiles, it will stay there forever, which might be useful even if you are able to use the central scheduler.

Parameter Mixins

A pain point with Luigi is to work with tasks that have dependencies with a lot of parameters: you will have to add those parameters to all tasks that depend on them, which can quickly become a pain and is very error prone.

I solve that by separating by tasks between mixin classes that define the parameters, and the actual tasks that inherit from those mixins. This way, adding the parameters of a dependency is as easy as inheriting from the corresponding mixin.

I define that inheritance hierarchy at the mixin level, such that each task only ever extends one parameter mixin and one Task type.

I also add getters to easily get the parameters relative to one task. This is better demonstrated with an example:

import luigi
import hashlib
import numpy as np

from collections import OrderedDict

def _parameter_dict(task, *names):
    d = OrderedDict()

    for name in names:
        d[name] = getattr(task, name)

    return d

def IngredientsMixin:
    ingredients = luigi.ListParameter([
        'eggs', 'bacon', 'spam'
    ])

    @property
    def ingredients_parameters(self):
        return _paramter_dict(self, 'ingredients')

def CookMixin(IngredientsMixin):
    cook_random_seed = luigi.IntParameter(42)
    n_ingredients = luigi.IntParameter(10)

    @property
    def cooking_parameters(self):
        parameters = _parameter_dict(
            self,
            'cook_random_seed', 'n_ingredients'
        )
        parameters.update(self.ingredients_parameters)
        return parameters

def ProvideIngredientsTask(IngredientsMixin, luigi.Task):
    def run(self):
        self.output().put(self.ingredients)

    def output(self):
        return PickledTarget(('ingredients', *self.ingredients))

def CookMeal(CookMixin, luigi.Task):
    def requires(self):
        return ProvideIngredientsTask(**self.ingredients_parameters)

    def run(self):
        ingredients = self.requires().get()

        r = np.random.RandomState(self.cook_random_seed)
        menu = r.choice(ingredients, self.n_ingredients)

        self.output().put(' and '.join(menu))

    def output(self):
        return PickledTarget(
            (
                'meal',
                *list(map(str, self.cooking_parameters.values()))
            )
        )

Now imagine how it looks like if the tree is more complex and you need to add parameters to one task: you have much less work to do, and much less risk of errors. Notice how I use an OrderedDict to make sure I always list the parameters in the same order.

Always Use Dictionnaries

Finally, a small piece of advice: Luigi allows you to return any collection of Targets from output(). I found it to be always better to return a dictionary, even when there is only one Target to return. It makes it much easier to extend the data returned by a task in the future, which I find myself doing quite often. In particular, having a low cost way to return additional tables help keep all data tidy. If returning a new table is too hard compared to adding a column, you quickly fall into the trap of “just adding a few columns now to clean up later”.

An alternative is to avoid ever calling output() directly, but always create getters for all individual outputs.

Anything Else?

That’s all for now. If you have your own tips and tricks, make sure to mention them in the comments, and if I like them I will include them here (and give you the credit)!

Comment

Want to react? Send me an e-mail or use Webmentions

Webmentions

No webmentions were found.