documentation¶
here’s a practical documentation, for source documentation please see source documentation
quickstart¶
to create a command line tool with clichain
you need:
to create a factory:
from clichain import cli tasks = cli.Tasks()
to implement task types using coroutine functions:
See also
The easiest way of implementing a task type is to use the task decorator:
from clichain import pipeline import logging import ast @pipeline.task def add_offset(ctrl, offset): logger = logging.getLogger(f'{__name__}.{ctrl.name}') logger.info(f'starting, offset = {offset}') with ctrl as push: while True: value = yield push(value + offset) logger.info('offset task finished, no more value') @pipeline.task def parse(ctrl): _parse = ast.literal_eval with ctrl as push: while True: push(_parse((yield)))
See also
to register task types into the factory:
tasks are integrated into the command line tool using
click
commandsThe simplest way of registering a task type is to decorate it with the factory:
import click @tasks @click.command(name='offset') @click.argument('offset') def offset_cli(offset): "add offset to value" offset = ast.literal_eval(offset) return add_offset(offset) @tasks @click.command(name='parse') def parse_cli(): "parse input data with ast.literal_eval" return parse()
See also
click
documentation for more details about commandsNote
it’s up to you to determine where and how you want the tasks to be registered into the factory, one way of doing this is to make the factory a module attribute and use it into separate scripts…
to start the main command from your main entry point:
if __name__ == '__main__': cli.app(tasks)
If we combine all the previous code into a single script, we get this:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from clichain import cli, pipeline
import click
import logging
import ast
tasks = cli.Tasks()
# -------------------------------------------------------------------- #
# implement tasks #
# -------------------------------------------------------------------- #
@pipeline.task
def add_offset(ctrl, offset):
logger = logging.getLogger(f'{__name__}.{ctrl.name}')
logger.info(f'starting, offset = {offset}')
with ctrl as push:
while True:
value = yield
push(value + offset)
logger.info('offset task finished, no more value')
@pipeline.task
def parse(ctrl):
_parse = ast.literal_eval
with ctrl as push:
while True:
push(_parse((yield)))
# -------------------------------------------------------------------- #
# register tasks #
# -------------------------------------------------------------------- #
@tasks
@click.command(name='offset')
@click.argument('offset')
def offset_cli(offset):
"add offset to value"
offset = ast.literal_eval(offset)
return add_offset(offset)
@tasks
@click.command(name='parse')
def parse_cli():
"parse input data with ast.literal_eval"
return parse()
# -------------------------------------------------------------------- #
# run cli #
# -------------------------------------------------------------------- #
if __name__ == '__main__':
cli.app(tasks)
if our script is called ‘dummy.py’, we can use --help option to get a full description:
$ ./dummy.py --help
Usage: dummy.py [OPTIONS] COMMAND1 [ARGS]... [COMMAND2 [ARGS]...]...
create a pipeline of tasks, read text data from the standard input
and send results to the standard output: ::
stdin(text) --> tasks... --> stdout(text)
[...]
Options:
-l, --logfile PATH use a logfile instead of stderr
-v, --verbose set the log level: None=WARNING, -v=INFO, -vv=DEBUG
--help Show this message and exit.
Commands:
offset add offset to value
parse parse input data with ast.literal_eval
[ begin fork
] end fork
, new branch
{ begin debug name group
} end debug name group
we can see our two task types are availables, we can use --help option as well on it:
$ ./dummy.py offset --help
Usage: dummy.py offset [OPTIONS] OFFSET
add offset to value
Options:
--help Show this message and exit.
See also
assuming we want to run this:
+--> +1 --+
+--> +10 --| +-----+
| +--> +2 --+ |
inp >> parse--| +--> >> out
+--> +100 --> +1 ----------+
we can use our tool as followings (sh):
$ PIPELINE="parse [ offset 10 [ offset 1 , offset 2 ] , offset 100 offset 1 ]"
$ python -c 'print("\n".join("123456789"))' | ./dummy.py $PIPELINE
12
13
102
13
14
103
14
15
104
15
16
105
16
17
106
17
18
107
18
19
108
19
20
109
20
21
110
Note
everything is run into a single process and thread
creating a factory¶
Task types are integrated into the command line tool using click
commands.
In order to achieve this we register commands into a factory and then use that factory when running the main command line interface.
from clichain import cli tasks = cli.Tasks()
The created factory will register all the commands into a dict
, which
can be accessed via the commands attribute.
See also
It’s up to the user to define a strategy about where to create the factory and how to access it from different parts of the program.
implementing a task¶
Task types are implemented using coroutine functions:
See also
Though you can implement a coroutine by yourself, the framework provides two ways of implementing a coroutine as expected by the framework:
clichain.pipeline.coroutine
decoratorthis is simply a trivial decorator which creates a coroutine function that will be primed when called, i.e advanced to the first yield.
example:
from clichain import pipeline @pipeline.coroutine def cr(*args, **kw): print('starting...') try: while True: item = yield print(f'processing: {item}') except GeneratorExit: print('ending...')
When used in a pipeline, the coroutine function will be called with specific keyword arguments:
See also
clichain.pipeline.create
for more detailscontext: a
clichain.pipeline.Context
object shared by all the coroutines of the pipeline.targets: an iterable containing the following coroutines in the pipeline (default is no targets).
debug: an optional name (used by
clichain.pipeline.task
, see below)Note
Default value is the coroutine’s key in the pipeline definition (default will be used if value is
None
or an empty string).
clichain.pipeline.task
decoratorthis is the easiest way of implementing a task type because the decorated function won’t have to worry about the input args and
GeneratorExit
handling, in addition automatic exception handling will be performed if an exception occurs (seeclichain.pipeline.task
for details).The
clichain.pipeline.Control
object will provide a push function to directly send data to next stages of the pipeline, and a name attribute can be used to identify the coroutine instance (when logging for example). The name is optionally given when creating the pipeline object usingclichain.pipeline.create
.example:
from clichain import pipeline @pipeline.task def add_offset(ctrl, offset): with ctrl as push: while True: value = yield push(value + offset)
registering a task¶
Task types are integrated into the command line tool using click
commands.
See also
click
documentation for more details about commands
The factory (see creating a factory) is a callable object meant to be used
as a decorator to register a new click
command into its commands
dictionary.
import click
@tasks
@click.command(name='offset')
@click.argument('offset')
def offset_cli(offset):
"add offset to value"
offset = ast.literal_eval(offset)
return add_offset(offset)
The click
command function is expected to return a coroutine function
that can be integrated into the created pipeline, see
implementing a task section for details.
Note
in the previous example we can access the registered task through the commands attribute of the factory:
assert tasks.commands['offset'] is offset_cli
Note the offset_cli callback function is a decorated version of the original callback function (defined by the user).
running the command line tool¶
The main command is executed by click
framework. Use the
clichain.cli.app
function to run it with the factory, example:
if __name__ == '__main__':
cli.app(tasks)
Note
additional args and kwargs will be kept in the click
context object, see clichain.cli.app
for details.
testing¶
In order to perform automated tests you can run the clichain.cli.test
function, which will run the main command using click.testing
framework.
example:
from clichain import cli
tasks = cli.Tasks()
# register the 'compute' task
[...]
# test
args = ['compute', '--help']
inputs = [1, 2, 3]
result = cli.test(tasks, args, inputs=inputs)
assert result.output == "foo"
assert result.exit_code == 0
assert not result.exception
Note
the test function supports additional arguments, see
clichain.cli.test
for details.
logging¶
Automatic logging is performed for registered tasks by
clichain.pipeline
framework when an unhandled exception occurs.
In this case the exception will be logged at ERROR level with
exception info (see logging.error
), using the optional name of the
coroutine to determine the logger path.
Todo
custom kwargs cannot be passed to clichain.pipeline.create
when using clichain.cli
framework (clichain.cli.app
or
clichain.cli.test
), such as custom root logger.
Note
when creating the pipeline the root logger to use can be
specified, see clichain.pipeline.create
for details. The default
root logger will be clichain.pipeline.logger
.
Note
an optional name can be given by the user (using a specific
command defined in clichain.cli
) to coroutines when creating the
pipeline, see clichain.pipeline.create
and implementing a task
for more details.
Using the optional name to perform logging in tasks implementation is advised, example:
from clichain import pipeline
import logging
logger = logging.getLogger(__name__)
@pipeline.task
def add_offset(ctrl, offset):
log = logger.getChild(ctrl.name)
log.info(f'starting, offset = {offset}')
with ctrl as push:
while True:
value = yield
push(value + offset)
log.info('offset task finished, no more value')
exceptions handling¶
Exceptions in click
commands should be handled using click
exception
handling framework.
example:
@tasks
@click.command(name='offset')
@click.argument('offset')
def offset_cli(offset):
"add offset to value"
try:
offset = ast.literal_eval(offset)
except:
raise click.BadParameter(f'wrong value: {offset}')
return add_offset(offset)
If an unhandled exception occurs in a task when the pipeline is running,
then the exception will be logged (see logging) and the
main command will abort (using click.Abort
) after all the coroutines
have been closed.
example:
@pipeline.task
def add_offset(ctrl, offset):
with ctrl as push:
while True:
value = yield
if value > 0:
push(value + offset)
else:
raise NotImplementedError(value)
Note
In the above example, all the tasks after ‘add_offset’ in the pipeline will be terminated, all the tasks before ‘add_offset’ will fail. This behaviour is the native behaviour of coroutines, since coroutines following ‘add_offset’ will have no more values and coroutines before ‘add_offset’ will face a StopIteration.
Whatever the exit state of the process (fail or completed), all the coroutines of the pipeline will be closed (i.e coroutine.close() will be called), that means the following coroutine will close the file as soon as the pipeline stops anyways:
@coroutine
def cr(*args, **kw):
with open('foo/bar') as f:
while True:
data = yield
[...]
See also