Luigi Tips and Tricks
Luigi is a great library to manage data science workflows. Here is a collection of tricks I found useful. It assumes basic familiarity with Luigi. If you want more details on how I use Luigi day-to-day, consider reading my previous post!
Pickled Targets
Target
s are the core of Luigi.
Luigi already provides a set of useful Target
s,
but I was missing simply being able to pass arbitrary objects between tasks.
To this end, I use python’s built-in serialization mechanism, pickle:
import luigi.file
import luigi.format
import os.path
import pickle
from typing import Union, Sequence
import logging
_log = logging.getLogger(__name__)
class PickledTarget(luigi.file.LocalTarget):
_data = {}
def __init__(self, id: Union[str, Sequence[str]],
cache_dir: str =None) :
if cache_dir is None or len(cache_dir) == 0:
cache_dir = 'luigi_cache/'
self._id = id if type(id) == str else os.path.join(*id)
super().__init__(
os.path.join(
cache_dir,
f'{self._id}'),
format=luigi.format.Nop)
def put(self, data):
self._data[self._id] = data
_log.debug('caching data for {} in {}'.format(self._id, self.path))
with self.open('wb') as tmp:
pickle.dump(data, tmp)
def get(self):
if self._id in self._data:
return self._data[self._id]
if self.exists():
_log.debug(f'found cached data for {self._id} in {self.path}')
self._data[self._id] = pickle.load(self.open('rb'))
return self._data[self._id]
raise Exception('no value stored for {}'.format(self._id))
There are various points that are noteworthy:
- the target stores the data in a class-level field, which allows to minimize the number of deserializations. This assumes that the data is (handled by the client code as if it was) immutable, which I always do. Having this field as a class-level rather than an instance field is the normal way to go in Luigi.
- the data is associated with an ID, which corresponds to the serialization location. This id should be specific to a particular task and its set of parameters.
- the ID can be either a single
str
, in which case it will be the name of the file in the cache directory, or a sequence ofstr
(tuple or list), in which case it will be a sequence of nested directories. This helps organize the cache with a structure of the typetask_name/param1/param2/serialized_data
. This can be helpful when the logic of a task has been changed and the cache needs to be cleared. - the
super
class needs to be initialized with theluigi.format.Nop
format parameter for being able to write in binary mode. A look at the code shows that this format is mostly aimed at handling line endings, and so the default fails when opening a file in non-text mode. More details can be found in this GitHub issue. - even though one caches the data in memory, it is necessary to serialize everything to disk when running Luigi with multiple workers,
as the
_data
field is not shared between processes. As it might be useful to avoid serialization while performing experiments, the target has thecache_dir
parameter to allow caching to a temporary directory - you might be surprised by the way the default value for that parameter is handled. This is to work nicer with Luigi task parameters. I do not claim this is the best solution, it just happened to fit nicely in my workflow.
Using that task looks like this:
class CookMeal(luigi.Task):
ingredients = luigi.ListParameter([
'eggs', 'spam', 'bacon', 'spam', 'spam'
])
cache_dir = luigi.Parameter('')
def run(self):
out = self.output()
out.put(' and '.join(self.ingredients))
def output(self):
return PickledTarget(
('cook_meal', *self.ingredients),
self.cache_dir)
Printing the Dependency Graph
It is often useful to be able to visualize the dependency graph one is about to run.
Luigi’s central scheduler provides a web interface to this end,
but I am not always able or willing to use the central scheduler.
No big deal! Luigi also provides the ability to get a string representation of the dependency graph,
through the luigi.tools.deps_tree.print_tree(task)
function.
Note that, contrary to the name, the function does not actually print anything,
but instead returns a string, that you will be responsible to print
using any method you like (print
, logging
or what-have-you).
Arguably less nice to look at than the web version, but extremely useful! And if you happen to store logfiles, it will stay there forever, which might be useful even if you are able to use the central scheduler.
Parameter Mixins
A pain point with Luigi is to work with tasks that have dependencies with a lot of parameters: you will have to add those parameters to all tasks that depend on them, which can quickly become a pain and is very error prone.
I solve that by separating by tasks between mixin classes that define the parameters, and the actual tasks that inherit from those mixins. This way, adding the parameters of a dependency is as easy as inheriting from the corresponding mixin.
I define that inheritance hierarchy at the mixin level, such that each task only ever extends one
parameter mixin and one Task
type.
I also add getters to easily get the parameters relative to one task. This is better demonstrated with an example:
import luigi
import hashlib
import numpy as np
from collections import OrderedDict
def _parameter_dict(task, *names):
d = OrderedDict()
for name in names:
d[name] = getattr(task, name)
return d
def IngredientsMixin:
ingredients = luigi.ListParameter([
'eggs', 'bacon', 'spam'
])
@property
def ingredients_parameters(self):
return _paramter_dict(self, 'ingredients')
def CookMixin(IngredientsMixin):
cook_random_seed = luigi.IntParameter(42)
n_ingredients = luigi.IntParameter(10)
@property
def cooking_parameters(self):
parameters = _parameter_dict(
self,
'cook_random_seed', 'n_ingredients'
)
parameters.update(self.ingredients_parameters)
return parameters
def ProvideIngredientsTask(IngredientsMixin, luigi.Task):
def run(self):
self.output().put(self.ingredients)
def output(self):
return PickledTarget(('ingredients', *self.ingredients))
def CookMeal(CookMixin, luigi.Task):
def requires(self):
return ProvideIngredientsTask(**self.ingredients_parameters)
def run(self):
ingredients = self.requires().get()
r = np.random.RandomState(self.cook_random_seed)
menu = r.choice(ingredients, self.n_ingredients)
self.output().put(' and '.join(menu))
def output(self):
return PickledTarget(
(
'meal',
*list(map(str, self.cooking_parameters.values()))
)
)
Now imagine how it looks like if the tree is more complex and you need to add parameters to one task: you have much less work to do, and much less risk of errors. Notice how I use an OrderedDict to make sure I always list the parameters in the same order.
Always Use Dictionnaries
Finally, a small piece of advice: Luigi allows you to return any collection of Target
s
from output()
.
I found it to be always better to return a dictionary, even when there is only one Target
to return.
It makes it much easier to extend the data returned by a task in the future,
which I find myself doing quite often.
In particular, having a low cost way to return additional tables help keep all data
tidy.
If returning a new table is too hard compared to adding a column,
you quickly fall into the trap of “just adding a few columns now to clean up later”.
An alternative is to avoid ever calling output()
directly,
but always create getters for all individual outputs.
Anything Else?
That’s all for now. If you have your own tips and tricks, make sure to mention them in the comments, and if I like them I will include them here (and give you the credit)!
Webmentions
No webmentions were found.
Comment
Want to react? Send me an e-mail or use Webmentions