source documentation¶
cli¶
cli
focuses on the command line tool aspect using click
See also
-
class
clichain.cli.
Cli
(name=None, invoke_without_command=False, no_args_is_help=None, subcommand_metavar=None, chain=False, result_callback=None, **attrs)¶ Implements root command using
click.MultiCommand
See also
Cli
provides implementation for the root command by extendingclick.MultiCommand
(Then the click command is created specifyingCli
as “cls” parameter in theclick.command
decorator.-
get_command
(ctx, name)¶ Given a context and a command name, this returns a
Command
object if it exists or returnsNone
.
-
list_commands
(ctx)¶ Returns a list of subcommand names in the order they should appear.
-
-
class
clichain.cli.
Tasks
(commands=None)¶ Defines a factory to register tasks types.
Tasks
provides a factory to register and hold implemented tasks so they get available to the user.Tasks are implemented by coroutines.
See also
The command line interface is implemented using
click
. In order to make a task usable from the command line interface, you need to define aclick
command (for most cases usingclick.command
decorator). example:@click.command(name='compute') @click.option('--approximate', '-a', help='compute with approximation') @click.argument('x') def my_compute_task(approximate, x): " the task doc that will appear as help " # process inputs parameters and options...
See also
click
for details on how to implements commandsThe command is expected to return a coroutine function such as
clichain.pipeline.coroutine
, seeclichain.pipeline.create
for details.full example:
from clichain import pipeline, cli import ast # creates the factory here but should be # common to all task types... tasks = cli.Tasks() @pipeline.task def divide(ctrl, den): print(f'{ctrl.name} is starting') with ctrl as push: while True: value = yield push(value / den) print(f'{ctrl.name} has finished with no error') # the task will be made available as 'compute' in the # command line interface @tasks @click.command(name='compute') @click.option('--approximate', '-a', help='compute with approximation') @click.argument('den') def compute_task_cli(approximate, den): " the task doc that will appear as help " if den == 0: raise click.BadParameter("can't devide by 0") if approximate: den = round(den) return devide(den) @pipeline.task def parse(ctrl): _parse = ast.literal_eval with ctrl as push: while True: try: push(_parse((yield))) except (SyntaxError, ValueError): pass @tasks @click.command(name='parse') def parse_cli(): " performs literal_eval on data " return parse()
See also
-
__call__
(cmd)¶ wraps and register a
click.command
object into the factoryTasks
object is intended to be used as a decorator:tasks = cli.Tasks() # register 'compute' command into 'tasks' @tasks @click.command(name='compute') [...]
The command is expected to return a coroutine function such as
clichain.pipeline.coroutine
, seeclichain.pipeline.create
for details.Note
a log message will be emitted to indicate this task is created every time the command is called
-
__init__
(commands=None)¶ initializes new factory with
commands
commands
is a dict containing all registered commands, and will be set to a new empty dict if None.context
is the current click context, it’s set to None until the main command is called (seeapp
), which will setcontext
to the current context value.
-
_prepare_cmd
(cmd)¶ wraps the
click.command
callback function and replace itthe wrapper function will:
log a ‘create’ message (using
logger.info
)use the callback function result to create the next coroutine in the pipeline. The coroutine function created by the callback function is stored in the current
click
context. A stack is used to process pipeline’s branches.See also
clichain.pipeline
for details on how the pipeline is specified
The wrapper function does not return anything
See also
this method is called by
Tasks.__call__
-
-
clichain.cli.
_get_obj
(tasks, args, kwargs)¶ get obj parameter for
click
contextThe created obj is a dict, it’s used internally when processing commands.
-
clichain.cli.
app
(tasks, *args, **kw)¶ run
click
main command: this will start the CLI tool.See also
tasks
is theTasks
factory to use (containing user commands)extra
args
andkwargs
are added to theclick
context’s obj (a dict) as ‘args’ and ‘kwargs’, they’re not used by the framework.app
usesCli
which extendsclick.MultiCommand
to create the main command as a multicommand interface. This main command holds all the user defined commands and is the main entry point of the created tool.The pipeline itself is created and run by the
process
function, which is called when the main command itself returns, i.e when the all the input args have been processed byclick
.See also
the main command itself only set up logging, see also
usage
-
clichain.cli.
process
(obj, rv, logfile, verbose)¶ callback of the main command, called by
click
process
creates the pipeline (usingclichain.pipeline.create
), then run it with inputs from stdin and sending outputs to stdout (getting stdin and stdout fromclick.get_text_stream
).if an exception occures then log the exception and raise
click.Abort
.See also
-
clichain.cli.
test
(tasks, clargs, args=None, kwargs=None, **kw)¶ run the CLI using
click.testing
, intended for automated testsSee also
The main command is then run with
click.testing.CliRunner
this is roughly equivalent to:
>>> runner = click.testing.CliRunner() >>> obj = cli._get_obj(tasks, args, kwargs) >>> result = runner.invoke(cli._app, clargs, obj=obj, **kw)
tasks
is theTasks
factory to use (containing user commands)clargs
is a list containing the command line arguments, i.e what the user would send in interactive mode.optional
args
andkwargs
will be sent to theclick
context (in context.obj[‘args’] and context.obj[‘kwargs’]), they will not be used by the framework.extra
kw
will be forwarded toclick.testing.CliRunner.invoke
, for example:input=[1,2,3], catch_exceptions=False
creates a
click.testing.CliRunner
to invoke the main command and returns runner.invoke result.See also
click.testing
-
clichain.cli.
usage
()¶ create a pipeline of tasks, read text data from the standard input and send results to the standard output:
stdin(text) --> tasks... --> stdout(text)
The overall principle is to run a data stream processor by chaining different kinds of tasks (available tasks depending on the implementation, see list below).
you can create a single branch pipeline as a sequence of tasks, for instance:
inp >>> A -> B -> C >>> out
or you can create a more complex pipeline defining multiple branches, for instance:
+--> B --> C --+ inp >>> A--| +--> F >>> out +--> D --> E --+
tasks are implemented by coroutines functions as described by David Beazle (see http://www.dabeaz.com/coroutines/ for details).
Specifying pipeline workflow:
basic syntax allows you to specify the worflow of the pipeline.
A single sequence of tasks as the following:
inp >>> A -> B -> C >>> out
is specified as:
A B C
Note
plus parameters and options of the tasks themselves, i.e:
A -x -y arg1 B -z ...
Creating branches requires ‘workflow commands’, for instance the following example:
+--> B --> C --+ inp >>> A--| +--> F >>> out +--> D --> E --+
would be specified as:
A [ B C , D E ] F
the same way we can define sub branches, for instance:
+--> C1 --+ +--> B --| +-----+ | +--> C2 --+ | inp >>> A--| +--> F >>> out +--> D --> E ------------+
would be specified as:
A [ B [ C1 , C2 ] , D E ] F
Execution order:
When parallel branches are defined (as ‘C1’ and ‘C2’ in the previous example) they are processed in the same order as they are defined in the command line arguments, that means in this example:
A [ B [ C1 , C2 ] , D E ] F
If the input data is:
1 2 [...]
Then the workflow will be such as:
# data will go through C1 then C2 1 -> A -> B -> C1 -> F 1 -> A -> B -> C2 -> F 2 -> A -> B -> C1 -> F 2 -> A -> B -> C2 -> F [...]
And the order is reproducible
Attaching a name to branches or tasks:
You can attach a name to coroutines when defining the pipeline, which will be used as a suffix to get the logger if an exception occurs in the coroutne, i.e:
base_logger.getChild(<name>)
Note
This is useful in particular if you’re using the same task type in several branches. The name could be used as well in the coroutine, depending on its implementation (see
clichain.pipeline.create
for more details).example:
+--> B --> C --+ inp >>> A--| +--> B >>> out +--> B --> D --+
you could specify the name of the branches (i.e all the coroutines of those branches) with:
A [ { 'b1' B C } , { 'b2' B D } ] { 'b3' B )
Note
the name specification is valid for every coroutine whose definition starts within the parenthesis, for example:
A { 'b1' [ B C , B D ] } B
is equivalent to:
A [ { 'b1' B C , B D } ] B
which is also equivalent to:
A [ { 'b1' B C , B D ] } B
which is equivalent to:
A [ { 'b1' B C } , { 'b1' B D } ] B
note the output ‘B’ coroutine will have no name
And using the following specification:
A [ { 'b1' B } C , { 'b2' B } D ] B
will only give ‘b1’ and ‘b2’ names to the ‘B’ coroutines (and not to the ‘C’ and ‘D’ coroutines as in the previous example).
Then note the following:
A { 'b1' [ B C , { 'b2' B } D ] } B
is equivalent to:
A [ { 'b1' B C } , { 'b2' B } { 'b1' D } ] B
Then note that the following:
A [ { 'b1' } B , { 'b2' } B ] B
will have no effect at all.
pipeline¶
pipeline
module provides tools to create a pipeline of tasks
a pipeline can be composed of one or several branches, but everything runs in a single thread. The initial goal of this framework is to provide a simple and direct way of defining task types and reuse them in different pipeline configurations.
The motivation is not to parallelise tasks yet tasks could be parallelized in some configurations, depending on the exact use case and the context…
tasks are implemented by coroutines functions as described by David Beazle (see http://www.dabeaz.com/coroutines/ for details).
This module is used by clichain.cli
module.
-
class
clichain.pipeline.
Context
(logger=<Logger clichain.pipeline (WARNING)>, obj=None)¶ will be passed to all `coroutine`s in the pipeline
Context
object is a common object shared by all coroutines in the pipeline.attributes:
exceptions
is a list which remains empty until an exception occurs within atask
and is handled by the module. Thenexception
contains each exception caught by the module. Each exception is logged only one time with its traceback when it’s caught byControl
context manager.Note
if an exception is caught by the module it will be “re-raised” thus terminate the process but user code could still raise another exception(s) for example if a coroutine is not implemented using
task
or GeneratorExit is handled within the user loop…logger
will be used for every message logged by the module, and can be used by the user. The default is to use the module’slogger
.obj
attribute is an arbitrary object provided by the user when creating the pipeline. The default isNone
.
See also
-
class
clichain.pipeline.
Control
(context, name, targets)¶ Internal, ‘control’ obj received by
task
decorated functionsControl
is a context managerSee also
-
__enter__
()¶ return push function
See also
-
__exit__
(tpe, value, tb)¶ handle GeneratorExit exception and log unhandled exceptions
Control
object is created bytask
decorator, the decorated function gets theControl
object as first arg, and is expected to use it as a context manager, which will handle GeneratorExit and log any unhandled exception.context
attribute (Context
object) will be used if the exception is notNone
orGeneratorExit
, in order to:- determine if the exception traceback should be logged,
if the exception has already been logged by another
Control
object (i.e in another coroutine), then only an error message will be logged, otherwise the full exception will be recorded. - get the base logger to use
See also
- determine if the exception traceback should be logged,
if the exception has already been logged by another
-
__init__
(context, name, targets)¶ initialize new
Control
objectcontext
is aContext
object,Control
object will use it to log exception if an exception occurs in the coroutine while used as a context manager. TheContext
object is also accessible through thecontext
attribute.name
will be accessible asname
attribute (rw)ex:
logger = logging.getLogger(f'{__name__}.{ctrl.name}')
targets
will be accessible throughtargets
property (ro)See also
targets
is read only to ensure consistency withpush
function returned byControl.__enter__
:Control
object is expected to be used as a context manager:with ctrl as push: while True: data = yield push(data) # send data to next coroutines
a
push
function is defined and returned whenControl
is used as a context manager, but can actually be created usingControl.push
property.the purpose is to force using an efficient solution avoiding attributes lookup (using self) for every call, which has an impact given this function is likely to be called a lot (usually for each processed item). This way we define the function and reference it once in the user function (as ‘push’ in the example).
See also
task
decorator
-
push
¶ return a ‘push’ function sending data to next coroutines
See also
-
targets
¶ next coroutines in the pipeline
-
-
class
clichain.pipeline.
Pipeline
(targets)¶ User interface returned by
create
functionPipeline
object contains the `coroutine`s of a pipeline.When used as a context manager, it ensures that coroutines will be closed immediately at the end of the process.
See also
Pipeline
also has an additionalPipeline.run
method which can be called to run the pipeline over an input stream and wait until the process complete.-
__enter__
()¶ return a function sending data thtough the pipeline
ex:
with pipeline as process: for data in stream: process(data)
Note
this is equivalent to:
with pipeline: target = pipeline.target for data in stream: target.send(data)
See also
-
__exit__
(tpe, value, tb)¶ close all the coroutines of the pipeline, raise exc if any
The purpose of using the
Pipeline
object as a context manager is essentially to make sure all the coroutines will be terminated (closed) at the end of the process.This can be critical if user functions are expected to do some teardown jobs after processing data, for instance:
# file won't be closed until the coroutine is closed # (see while loop...) @coroutine def cr(targets, *args, file, **kw): with open(file) as f: while True: data = yield [...]
See also
-
__init__
(targets)¶ initialize a new pipeline with `coroutine`s
targets
is an iterable containing the coroutines of the pipeline, the first item must be the input coroutine.See also
-
run
(inputs)¶ run the pipeline over
inputs
iteratorsend data from
inputs
to the pipeline until their is no more data in inputs or an exception occurs.
-
-
clichain.pipeline.
coroutine
(func)¶ coroutine decorator, ‘prime’ the coroutine function
this function is intended to be used as a decorator to create a basic coroutine function, for instance:
@coroutine def cr(*args, **kw): print('starting...') try: while True: item = yield print(f'processing: {item}') except GeneratorExit: print('ending...')
calling the decorated function will automatically get it to the first yield statement.
>>> cr() starting...
Note
the decorated function is wrapped using
functools.wraps
See also
-
clichain.pipeline.
create
(tasks, output=<built-in function print>, **kw)¶ create a pipeline of coroutines from a specification
a pipeline is a succession of coroutines organized into one or several branches.
output
is a strategy to use for the pipeline output, the default strategy isprint
.output
will be called for each data reaching the pipeline’s output, it takes a single argument.extra keyword args will be used to initialize the
Context
object that will be send to all the coroutines of the pipeline.tasks
argument describes the pipeline and consists of a mapping of coroutines as key: value pairs, where each single key identifies a single coroutine.each coroutine is defined either by a single
coroutine
function (see task field below) or a dictionnay, which contains the following fields:task: the coroutine function to use
See also
task
decoratorthe coroutine function will be called with the following keyword arguments:
context: the
Context
object shared by all the coroutines of the pipeline.targets: an iterable containing the following coroutines in the pipeline (default is no targets).
debug: an optional name, used by
task
to get a child logger from theContext
logger, which will be used to log error if an exception occurs. The exception will be logged at error level and the exc_info will be passed to the log record. The value will be accessible (and writeable) throughControl.name
attribute, which can be usefull for logging:ex:
logger = logging.getLogger(f'{__name__}.{ctrl.name}')
Note
Default value is the coroutine’s key in the pipeline definition (default will be used if value is
None
or an empty string).
input: (optional) set this coroutine as a ‘target’ of the coroutine(s) defined by input. input can be a single key or an iterable containing keys of other coroutines defined in the pipeline dictionnary.
Note
None
value will be interpreted as the pipeline’s main input. No value or an empty list is equivalent as None if this coroutine is not specified as output of an other coroutine in the pipeline.output: (optional) set the coroutine(s) whose keys are defined in output as a ‘target’ of this coroutine. output can be a single key or an iterable containing keys of other coroutines defined in the pipeline dictionnary.
Note
None
value will be interpreted as the pipeline’s main output. No value or an empty list is equivalent as None if this coroutine is not specified as input of an other coroutine in the pipeline.debug: (optional) a debug name to use in logging if an unhandled exception occurs. see above description.
Note
specifying a coroutine by a
coroutine
function is equivalent as providing a dictionnary containing only the task field.examples:
given we have the following declarations:
@coroutine def output(targets, **kw): try: while True: for t in targets: t.send('RESULT: {}'.format((yield))) except GeneratorExit: return @task def parse(ctrl): with ctrl as push: while True: try: value = ast.literal_eval((yield)) except (SyntaxError, ValueError): continue push(value) @task def mytask(ctrl, param): logger = logging.getLogger(f'{__name__}.{ctrl.name}') logger.info('starting task') with ctrl as push: while True: [...] logger.info('finishing task')
defining a pipeline composed of a single sequence:
example:
inp >>> a --> b --> c --> d >>> out
here’s how we could define it:
pipeline = pipeline.create({ 'a': parse(), 'b': {'task': mytask(1), 'input': 'a'}, 'c': {'task': mytask(2), 'input': 'b'}, 'd': {'task': output, 'input': 'c'}, })
the created pipeline is a
Pipeline
object, it can be run over any input generator using its ‘Pipeline.run’ method, sending data to stdout by default.See also
define a pipeline with branches:
example:
+--> B --> C >>> out inp >>> A--| +--> D --> E >>> out
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': B, 'd': D, 'c': {'task': C, 'input': 'b'}, 'e': {'task': E, 'input': 'd'}, })
redoundant specification is not an issue, and the following example is equivalent to the previous one:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': {'task': B, 'input': 'a', 'output': 'c'}, 'd': {'task': D, 'input': 'a', 'output': 'e'}, 'c': {'task': C, 'input': 'b', 'output': None}, 'e': {'task': E, 'input': 'd', 'output': ()}, })
join branches
example: given we want to implement this:
+--> B --> C --+ inp >>> A--| +--> N >>> out +--> D --> E --+
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': B, 'c': {'task': C, 'input': 'b', 'output': 'f'}, 'd': D, 'e': {'task': E, 'input': 'd', 'output': 'f'}, 'f': F, })
control branches order
the order in which coroutines are initialized, called and closed is reproducible.
to control the data flow order between several branches just use the keys in the pipeline definition, as they will be sorted, like in the following example:
+--> (1) X --+ +--> (2) X --+ inp >>> A--+--> (3) X --+--> B >>> out +--> (4) X --+ +--> (5) X --+
here’s how we could define it:
pipeline = pipeline.create({ 'a': A, 1: {'task': X, 'input': 'a', 'output': 'b'}, 2: {'task': X, 'input': 'a', 'output': 'b'}, 3: {'task': X, 'input': 'a', 'output': 'b'}, 4: {'task': X, 'input': 'a', 'output': 'b'}, 5: {'task': X, 'input': 'a', 'output': 'b'}, 'b': B, })
the ‘X’ coroutines will be initialized and processed in the expected order: 1, 2, 3, 4, 5 (they will be closed, if no exception occurs, in the opposite order).
loop back
Warning
looping is currently not implemented and will raise a
NotImplementedError
when creating the pipeline.example: given we want to implement this:
+--> B --> C --+ + >>> out inp >>> A--| +--> N -- + +--> D --> E --+ | | | +--> F --+ | | | +------------------+
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': {'task': B, 'output': 'c'}, 'c': {'task': C}, 'd': {'task': D, 'output': 'e'}, 'e': {'task': E}, 'n': {'task': N, 'input': ('c', 'e'), 'output': None}, 'f': {'task': F, 'input': 'n', 'output': 'n'}, })
Warning
defining a loop can end up in an infinite recursion , no control is done on this, so it’s up to the tasks implementation to handle this…
specify coroutines name
in some contexts we may want to define a name for a coroutine which is different from its key.
example: the previous example with ordered branches was:
+--> (1) X --+ +--> (2) X --+ inp >>> A--+--> (3) X --+--> B >>> out +--> (4) X --+ +--> (5) X --+
here’s how we could define it:
pl = { 'a': A, 'b': B, } pl.update({ i: {'task': X, 'input': 'a', 'output': 'b', 'debug': f"the X task number {i}"} for i in range(1, 6) }) pl = pipeline.create(pl)
-
clichain.pipeline.
task
(func)¶ make “partial”
coroutines
expected to be used withcreate
.task
will create a “partial” function, which when called with args and keyword args will actually return acoroutine
function designed to be used withcreate
function.example:
a basic coroutine adding offset to input data could be defined as follows using
task
:@task def offset(ctrl, offset): print('pre-processing') with ctrl as push: while True: value = yield push(value + offset) # will be executed unless an exception occurs in # the 'while' loop print('post_processing')
ctrl
will handle theGeneratorExit
exception and log any unhandled exception.- the
push
method send data to the next coroutines in the pipeline.
the resulting function is called with the original function’s args and keyword args:
off = offset(offset=1)
off is a partial
coroutine
function expected to be used in a pipeline defintion withcreate
.the coroutine will eventually be created calling this new function with specific arguments depending on the pipeline specification (see
create
for details), ex:# create the coroutine off = off(targets=[t1, t2...])
Note
as for
coroutine
, all the functions (partial or final functions) are wrapped usingfunctools.wraps
example:
@task def output(ctrl): with ctrl: while True: print((yield)) @task def parse(ctrl): with ctrl as push: while True: try: value = ast.literal_eval((yield)) except (SyntaxError, ValueError): continue push(value) @task def offset(ctrl, offset): offset = int(offset) logger = logging.getLogger(f'{__name__}.{ctrl.name}') logger.info(f'offset: {offset}') with ctrl as push: while True: value = yield push(value + offset) logger.info('offset task finished, no more value') if __name__ == '__main__': out = output() off1 = offset(10) off2 = offset(offset=100) parse = parse() # the previous results (out, off1, off2, proc) should # be used in the pipeline definition and the followings # should be performed by "create" out = out() off1 = off1((out,)) off2 = off2((out,)) parse = parse([off1, off2]) with open('foo.txt') as inputs: for data in inputs: parse.send(data) out.close() off1.close() off2.close() parse.close()