4.3.1. Creating your plugin

Warning

This feature is currently in alpha.

A plugin for the hub is a python module that contains one or multiple celery tasks that can be discovered by the Ngenea Worker, these tasks can then be used in workflows in Ngenea Hub.

4.3.1.1. Example plugin creation

Using the Ngenea Hub CLI a template plugin can be created in the plugin directory at /var/lib/ngenea-worker/plugins. All plugins will persist in this location and will need to be in this directory to be correctly installed.

For the following example, we will be creating a plugin named echo_args that will return the expected key word arguments provided to the task itself.

To begin, create a plugin template for the example echo_args using:

ngenea-worker plugins create echo_args

This will create the template plugin project at /var/lib/ngenea-worker/plugins/echo_args. To implement custom logic, navigate to /var/lib/ngenea-worker/plugins/echo_args in which contains the python module file echo_args/echo_args.py. Within this file there is the basic example task example_task:

from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.utils.utils import get_queue_name

@shared_task(bind=True, name="dynamo.custom.example_task")
def example_task(self, *args, paths: List = None, jobid: int = None, **kwargs):
    print(paths)
    print(jobid)
    status = FileActionStatus(self.name, paths, jobid, get_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

Warning

The task return must adhere to the format exampled above otherwise the job will result in an indefinite Pending state in the UI/API and the Ngenea Hub logs will present a traceback in the task call back. Refer to section: Custom Tasks/Return Payload for the specific return format of task results.

For more information on the supported file states, please refer to the section supported file states in the Custom Tasks page.

The logic within this task can be replaced along with the name of the function to our echo task:

Note

The name provided to shared_task will be the task name that is used to call the task within a workflow.

from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.utils.utils import get_queue_name

@shared_task(bind=True, name="dynamo.custom.echo_args")
def echo_args(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

With the new task in place, the entry point for the plugin will have to be adjusted in setup.py.

Note

Ensure that all custom task entry points are defined under worker_plugin

Initial setup.py entry points:

...
    "worker_plugin": [
        "echo_args=echo_args.echo_args:example_task",
    ]
...

In this case, the changes are as follows:

...
    "worker_plugin": [
        "echo_args=echo_args.echo_args:echo_args",
    ]
...

4.3.1.2. External Dependencies

If the logic in the plugin requires an external dependency, it can be installed into the worker using the setup.py within the template plugin generated via ngenea-worker plugins create. To add additional dependencies, they can be added to the install_requires section:

Note

Ensure that the celery package and version is un-edited within the plugin requirements, otherwise it may have unexpected effects on the functionality of Ngenea Worker.

install_requires=[
    "celery<=5.4",
    "example-module==0.1.0"
]

4.3.1.3. Installing custom plugins

To install the newly created plugin refer to the plugin installation page.