2.9. Examples¶
This section provides complete, working plugin examples that can be used as starting points for your own implementations.
2.9.1. Email Notification¶
This plugin sends email notifications at the end of a workflow. It accepts configurable recipients, subject, sender, and SMTP server via workflow fields or hardcoded action parameters.
2.9.1.1. Task Implementation¶
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from smtplib import SMTP
from typing import List, Dict, Any
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
logger = logging.getLogger(__name__)
def send_email(email_to, email_from, subject, message, server="localhost"):
"""Send an email via SMTP."""
msg = MIMEMultipart()
msg['From'] = email_from
msg['To'] = ', '.join(email_to)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = subject
msg.attach(MIMEText(message))
try:
smtp = SMTP(server)
smtp.sendmail(email_from, email_to, msg.as_string())
smtp.close()
except OSError as error:
logger.error("Error sending notification email: %s", error)
raise
@app.task(bind=True, name="dynamo.custom.email_staff")
def email_staff(
self,
*args,
paths: List[Dict[str, Any]] = None,
jobid: int = None,
staff_members: List[str] = None,
message: str = None,
subject: str = None,
server: str = None,
from_address: str = None,
**kwargs,
):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
if not message:
message = f"Job {jobid} has completed processing {len(paths)} paths"
if not from_address:
from_address = "ngenea-worker@example.com"
if not subject:
subject = "Ngenea Worker Job completed"
if not server:
server = "localhost"
try:
send_email(
email_to=staff_members,
email_from=from_address,
subject=subject,
message=message,
server=server,
)
status.add_log(f"Notification sent to {', '.join(staff_members)}")
for item in paths:
status.add("processed", item["path"])
except Exception as e:
status.add_log(f"Failed to send email: {e}")
for item in paths:
status.add("failures", item["path"], message=str(e))
return status.dump()
2.9.1.2. Entry Point (setup.py)¶
entry_points={
"worker_plugin": [
"email_staff = email_staff.email_staff:email_staff",
],
},
2.9.1.3. Workflow Definition¶
This workflow migrates files and then sends an email notification:
{
"name": "migrate_notif",
"label": "Migrate with Notification",
"icon_classes": [
"fa fa-cloud fa-stack-2x text-primary",
"fa fa-refresh fa-stack-1x text-light"
],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.tasks.migrate"
},
{
"name": "dynamo.custom.email_staff",
"staff_members": [
"admin@organisation.com",
"ops@organisation.com"
],
"subject": "Migration complete",
"from_address": "notifications@organisation.com"
}
]
}
]
}
2.9.2. Script Execution¶
This plugin executes a shell script on the Ngenea Worker host. It supports configurable script paths, timeouts, and optional path arguments.
Danger
Scripts executed by this plugin run with the full permissions of the ngenea-worker service, which is typically root. A user-supplied script_location field allows arbitrary command execution as root on the Worker host. Never expose script_location as a runtime workflow field unless access to the workflow is strictly controlled. Hardcode the script path in the workflow action definition whenever possible, and ensure the script itself is not world-writable.
Warning
While the plugin uses subprocess.run with a list (avoiding direct shell injection), the target script receives additional_args and file paths as raw arguments. If the script passes these to a shell or interprets them unsafely, injection attacks are still possible. Validate and sanitize all inputs before passing them to external scripts.
Warning
If timeout is exposed as a runtime field, a user could set it to an arbitrarily large value, causing a hung script to block a Worker slot indefinitely. Always enforce a maximum timeout either in the task code or by hardcoding the value in the workflow definition.
2.9.2.1. Task Implementation¶
import logging
from subprocess import run as run_script, TimeoutExpired, SubprocessError, PIPE, STDOUT
from typing import List, Dict, Any
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
logger = logging.getLogger(__name__)
DEFAULT_SCRIPT = "/opt/cloud_script.sh"
@app.task(bind=True, name="dynamo.custom.run_cloud_script")
def run_cloud_script(
self,
*args,
paths: List[Dict[str, Any]] = None,
jobid: int = None,
script_location: str = None,
timeout: int = 600,
additional_args: List[str] = None,
use_paths: bool = False,
**kwargs,
):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
try:
cmd = [script_location if script_location else DEFAULT_SCRIPT]
if additional_args:
cmd = cmd + additional_args
if use_paths:
cmd = cmd + [item["path"] for item in paths]
result = run_script(
cmd,
stdout=PIPE,
stderr=STDOUT,
check=True,
timeout=timeout,
)
status.add_log(str(result.stdout))
for item in paths:
status.add("processed", item["path"])
except TimeoutExpired:
status.add_log("Script timed out")
for item in paths:
status.add("failures", item["path"], message="Script timed out")
except SubprocessError as e:
status.add_log(str(e))
for item in paths:
status.add("failures", item["path"], message=str(e))
return status.dump()
2.9.2.2. Entry Point (setup.py)¶
entry_points={
"worker_plugin": [
"run_cloud_script = run_cloud_script.run_cloud_script:run_cloud_script",
],
},
2.9.2.3. Workflow Definition¶
Run the default cloud script after migration:
{
"name": "migrate_cloud_script",
"label": "Migrate and Run Script",
"icon_classes": [
"fa fa-cloud fa-stack-2x text-primary",
"fa fa-refresh fa-stack-1x text-light"
],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.tasks.migrate"
},
{
"name": "dynamo.custom.run_cloud_script"
}
]
}
]
}
2.9.3. File Mover¶
This plugin moves files to a configurable destination directory. It demonstrates using workflow fields to parameterise a task.
Danger
shutil.move deletes the source file after copying. This operation cannot be undone. If destination_path is incorrect, inaccessible, or on a volatile filesystem (e.g., /tmp), the source files will be permanently lost. Always verify the destination path before running this workflow on production data. Consider testing with a small set of non-critical files first.
Warning
shutil.move will silently overwrite any existing file at the destination with the same name. The overwritten file is permanently lost. If collisions are possible, add an existence check before moving and mark collisions as failures or generate unique filenames.
Warning
This example uses only the filename (os.path.basename) to construct the destination path, discarding the source directory structure. If multiple source files share the same filename (e.g., /data/project_a/config.yaml and /data/project_b/config.yaml), later files will silently overwrite earlier ones. To preserve directory structure, compute a relative path from a common root instead.
Warning
The return payload reports the original source path as processed, but the file has been moved to a new location. If a downstream task in the chain attempts to access the file at the original path, it will fail. To support chaining, update the path in the return payload to reflect the new destination, or use this task only as the final step in a chain.
2.9.3.1. Task Implementation¶
import logging
import os
import shutil
from typing import List, Dict, Any
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
logger = logging.getLogger(__name__)
@app.task(bind=True, name="dynamo.custom.file_mover")
def file_mover(
self,
*args,
paths: List[Dict[str, Any]] = None,
jobid: int = None,
destination_path: str = None,
**kwargs,
):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
if not destination_path:
for item in paths:
status.add("failures", item["path"], message="No destination_path provided")
return status.dump()
for item in paths:
src = item["path"]
filename = os.path.basename(src)
dest = os.path.join(destination_path, filename)
try:
os.makedirs(destination_path, exist_ok=True)
shutil.move(src, dest)
logger.info("Moved %s to %s", src, dest)
status.add("processed", item["path"])
except FileNotFoundError:
status.add("skipped", item["path"], message="Source file not found")
except Exception as e:
status.add("failures", item["path"], message=str(e))
return status.dump()
2.9.3.2. Entry Point (setup.py)¶
entry_points={
"worker_plugin": [
"file_mover = file_mover.file_mover:file_mover",
],
},
2.9.3.3. Workflow Definition¶
This workflow uses a runtime field so users can specify the destination when triggering the workflow:
{
"name": "move_files",
"label": "Move Files to Destination",
"icon_classes": [
"fa fa-arrows fa-stack-2x text-primary"
],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [
{
"name": "dest",
"label": "Destination Directory",
"type": "string"
}
],
"filter_rules": [
{
"type": "file",
"state": "all",
"action": [
{
"name": "dynamo.custom.file_mover",
"destination_path": "*dest"
}
]
}
]
}
The *dest syntax references the runtime field value. When a user triggers this workflow, they will be prompted to enter the destination directory. See Workflow Fields for more details.