5.3.1. Creating your plugin

Warning

This feature is currently in alpha.

Ngenea Hub allows the integration of custom Celery tasks through a plugin system. These plugins are Python modules recognized by the Ngenea Worker and can be used within defined workflows in Ngenea Hub.

5.3.1.1. Creating a Custom Plugin: echo_args

Ngenea Hub supports plugin development through its CLI, allowing users to extend functionality via custom task plugins. This section walks through creating a sample plugin named echo_args, which simply returns the arguments provided to it.

Plugin Directory

All custom plugins must reside in the following directory for correct installation and execution:

/var/lib/ngenea-worker/plugins

Each plugin will persist in this location, and Ngenea Hub will reference this path to discover and load available plugins.

Step 1: Create the Plugin Template

To generate the basic structure for the echo_args plugin, use the following command:

ngenea-worker plugins create echo_args

This command creates the plugin at: /var/lib/ngenea-worker/plugins/echo_args.

Inside this directory, you will find a Python module at: /var/lib/ngenea-worker/plugins/echo_args/echo_args/echo_args.py.

This module contains a default task function named example_task, which we will customize in the next step.

Step 2: Implement the Plugin Logic

The default example_task function is provided as a template. Here’s the original sample code:

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.example_task")
def example_task(self, *args, paths: List = None, jobid: int = None, **kwargs):
    print(paths)
    print(jobid)
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

Important

Custom tasks must return their results using the FileActionStatus object, as shown above. Failure to follow this return structure will result in the task being stuck in a Pending state, with tracebacks visible in the Ngenea Hub logs.

Refer to Custom Tasks > Return Payload for required return formats.

Step 3: Modify the Task for echo_args

Update the function and task name to reflect the custom behavior of our plugin. The revised implementation should look like this:

Note

The name provided to app.task will be the task name that is used to call the task within a workflow.

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.echo_args")
def echo_args(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

Note

The value provided to the name parameter in the @shared_task decorator determines the task name that will be used within workflow definitions.

Step 4: Update the setup.py Entry Point

For Ngenea Hub to recognize and register your custom task, you must define an appropriate entry point in your plugin’s setup.py file. Entry points for custom tasks should be declared under the worker_plugin group.

Original setup:

"worker_plugin": [
    "echo_args=echo_args.echo_args:example_task",
]

Updated setup (to reflect the new function name):

"worker_plugin": [
    "echo_args=echo_args.echo_args:echo_args",
]

5.3.1.2. External Dependencies

If your plugin relies on external Python packages, you can manage these dependencies through the setup.py file, which is automatically included in the plugin template created using:

ngenea-worker plugins create

To specify additional dependencies, add them to the install_requires list within setup.py. This ensures the required packages are installed into the Ngenea Worker environment when the plugin is deployed.

Example:

install_requires = [
    "celery<=5.4",
    "example-module==0.1.0"
]

Note

Do not modify the version or presence of the celery package in this list. Altering celery may lead to compatibility issues or disrupt core Ngenea Worker functionality.

By managing dependencies in this way, you ensure plugin compatibility, reduce runtime errors, and maintain seamless integration with the Ngenea Worker framework.