4.7. Queues

Custom job queues can be created with dedicated settings, and jobs can be assigned to run on those queues.

For example:

  • a highpriority queue with a high number of threads, allowing for faster task processing

  • a lowpriority queue with a low number of threads for jobs which do not need to complete promptly

  • a dedicated queue for transparent recall jobs, so that those jobs are never blocked by other workflows

In addition to custom queues, there is a default queue which always exists for all workers. The default queue is used when a job is submitted without specifying a target queue.

4.7.1. Configuration

Queues are configured in the worker configuration file located at /etc/ngenea/ngenea-worker.conf

A queue is defined by adding a new Queue section to the config file. The following creates a queue named highpriority with 20 threads.

[Queue highpriority]
threads = 20

Queue names may contain letters, numbers, underscores, and hyphens. Queue names may not contain whitespace.

Queues support the following settings:

  • threads: how many tasks can run in parallel on the queue

If no settings are provided under the queue section, the queue will inherit the global settings.

If no threads count is set in the global settings section, then a default of 10 is used.

For example, in the following

[settings]
...
threads = 4

[Queue transparent_recall]

[Queue highpriority]
threads = 10

The highpriority explicitly sets the number of threads to 10.

The transparent_recall queue does not explicitly set a number of threads, so it inherits threads = 4 from global settings.

4.7.2. Functions

Custom job queues have three functions

  • worker where core workflow tasks run

  • discovery where discovery tasks (recursive, snapdiff) run

  • custom where custom plugin tasks run

The default queue, which always exists, has those same three function plus two additional functions

  • interactive where internal tasks such as file browsing are run

  • settings where settings tasks, such as space creation and policies, are run

These two additional functions are not available on custom queues.

By default, each function gets the same number of threads. The number of threads can be configured per function

[Queue highpriority]
threads = 20
custom = {"threads": 10}

In this example, the worker and discovery functions will get 20 threads, but the custom function will only get 10 threads.

Per-function settings under the main [settings] section will only apply to functions of the default queue; they are not inherited.

4.7.2.1. Disabling Functions

Functions can be disabled for a queue by setting the function to false

For example, to disable the custom function, meaning custom plugin tasks cannot run on that queue

[Queue no-custom]
custom = false

In the same way, default queue functions can be disabled.

For example, in a cluster, you may set the management node to only run settings tasks

# management node
[settings]
discovery = false
worker = false
custom = false

And, you set ngenea nodes to only run workflow tasks, and not run settings tasks

# ngenea node
[settings]
settings = false

Warning

Each default queue function must be enabled on at least one node within a cluster, or else Hub will not function correctly.

4.7.3. Queue discovery

When a new queue is added to the worker config file, the change will be detected and the new queue will start up automatically. When a queue is removed from the worker config, the change will be detected and the queue will be shutdown automatically.

If this does not occur, or does not occur fast enough, a reload can be forced with

systemctl reload ngenea-worker

This will reload the worker config and start up/shutdown queues according to the worker config, without interrupting any queues which already exist and have not been changed.

Ngenea Hub uses worker heartbeats to automatically discover and register newly started queues. New queues will be visible and ready for use in Ngenea Hub workflows shortly after being added to the worker config.

When a queue is removed and shutdown, it will not be automatically de-registered from Ngenea Hub.

Removed queues must be manually de-registered using

ngeneahubctl manage remove_queue <site> <queue>

To re-use the same queue name of a previously removed queue, the --offline-only flag can be used. This must be preceded by a complete worker restart.

systemctl restart ngenea-worker

--offline-only will remove the queue only if the queue is offline. Note: If the queue is still alive, it will be recreated. If you want to disable the queue and ensure that it is then removed entirely from the hub, use --offline-only"

Warning

If you submit a job to a queue which has been shutdown and not de-registered, the job will not be processed unless the queue is configured and started up again.

4.7.4. Memory considerations

Queues use roughly 40MB of RAM per thread when no tasks are running.

Since each queue has three functions, if all three functions in a given queue are configured with the same number of threads, then the memory requirement is roughly threads * 125MB.

For example, creating a high priority queue with 20 threads per function (60 threads total) will use ~2.5GB of memory when no tasks are running.

Similarly, creating a low priority queue with 5 threads per function (15 threads total) will use ~625MB of memory when no tasks are running.

Task execution will consume additional memory on top of this baseline memory usage. Since more threads mean more tasks can run at the same time, a queue with a large number of threads can lead to significant memory usage at peak.

4.7.5. Workflows

A queue can be passed when submitting a workflow. If no queue is passed, the default queue will be used.

If a workflow contains tasks which run on multiple sites, such as send or sync workflows, the same queue will be used for both sites. If the named queue doesn't exist on one of the sites, the default queue will be used instead.

A different queue can be specified for each task in a workflow by providing the queue as a parameter in the task steps, for example

{
    "name": "dynamo.tasks.migrate",
    "queue": "highpriority"
}

Workflow 'fields' can be used to allow selecting per-task queues at runtime.

For example, the built-in send workflow provides a destinationqueue field, which can be passed at runtime in a similar way to destinationsite

{
    "paths": [
        "/mmfs1/data/project_one",
    ],
    "site": "london",
    "queue": "highpriority",
    "workflow": "send_to_site",
    "fields": {
        "destinationsite": "dublin",
        "destinationqueue": "lowpriority"
    }
}

For more information on constructing and running workflows, see Custom Workflows