The lifecycle of a job begins when the application contained in the job’s package is executed with the run command and a config. After the application starts, a service container is configured and the job listed in the config is instantiated. The stage is now set for the job to begin running.
Execution
When building a job, it is useful to envision the execution implemented as a for-each loop with a block of code before and after the loop. The following code shows the order in which the methods on a job are called.
var job = new MyJob();
await job.InitializeAsync();
await foreach (var item in job.GetItemsAsync())
await job.ProcessAsync(item);
await job.FinalizeAsync();
In every job there are two methods that must be implemented: GetItemsAsync
and ProcessAsync
. These two methods make up the core of the job’s execution. All of the other methods in a job have a default implementation that can be overridden when needed.
GetItemsAsync
GetItemsAsync
returns a collection of items to pass to ProcessAsync
, as an IAsyncEnumerable<TItem>
. Usually this method will contain a database query, an API call, open a file and return records, or read from a queue. Wherever the data comes from, there are some important aspects of how the data is used that can impact how you implement this method.
Serializability
The items returned from GetItemsAsync
should be JSON serializable. This makes it possible to provide detailed reporting and advanced processing scenarios such as distributing items to be processed on other nodes.
Yielding Items
A feature that IAsyncEnumerable<T>
enables is the ability to yield items from an async method. This makes it very easy to write code that allows the job to begin processing items before the entire collection of items has been read, avoiding the need to keep a large number of items in memory.
public override async IAsyncEnumerable<Environment> GetItemsAsync()
{
foreach (var orgId in await api.GetOrgs())
yield return await api.GetOrgDetails(orgId);
}
Using IEnumerable<T>
Although no cast from IEnumerable<T>
to IAsyncEnumerable<T>
exists, Runly provides an extension method with two overloads to wrap IEnumerable<T>
and Task<IEnumerable<T>>
in an IAsyncEnumerable<T>
.
Use ToAsyncEnumerable()
to convert an IEnumerable<T>
, such as an array, List<T>
, or other in-memory enumerable collection:
public override IAsyncEnumerable<string> GetItemsAsync() =>
new string[] { "Item1", "Item2" }.ToAsyncEnumerable();
ToAsyncEnumerable()
also converts async methods that return Task<IEnumerable<T>>
, such as database queries:
public override IAsyncEnumerable<Environment> GetItemsAsync() =>
db.GetUsersInRole("Owner").ToAsyncEnumerable();
When used in conjunction with a Task<IEnumerable<T>>
, the Task
is awaited when the first item in the collection is accessed by the enumerator.
Reading Files
When possible, avoid using methods that will read a large amount of data into memory before it is returned from GetItemsAsync
. The following code reads a file into memory, parsing rows into FileRecord
s and adding them to a list before returning the list.
// Don't use this approach, there's a better way!
public override IAsyncEnumerable<FileRecord> GetItemsAsync()
{
using var reader = new StreamReader(File.OpenRead("a-big-file.txt"));
var records = new List<FileRecord>();
string line;
while ((line = reader.ReadLine()) != null)
records.Add(new FileRecord(line));
return records.ToAsyncEnumerable();
}
The previous example fails to take advantage of the IAsyncEnumerator
and reads the entire file into memory before returning it. Instead, a better solution is to read the data asynchronously using MoveNextAsync
on the StreamReader
and yield a FileRecord
for each line read. This allows the Job
to start processing items as the file is being read. Eliminating the use of a list and yielding items makes the code simpler and able to scale with large files:
public override async IAsyncEnumerable<FileRecord> GetItemsAsync()
{
using var reader = new StreamReader(File.OpenRead("a-big-file.txt"));
string line;
while ((line = await reader.ReadLineAsync()) != null)
yield return new FileRecord(line);
}
Total Count
IAsyncEnumerable
is inherently different than IEnumerable
. An IAsyncEnumerable
is a stream of data where the operation to get the next item in the stream is awaited. In general, streams should only be enumerated once. Some can only be enumerated once, like messages from a queue. Because of this, IAsyncEnumerable
doesn’t lend itself to certain operations like Count
. For this reason, Runly doesn’t attempt to count the items in an IAsyncEnumerable
to determine the percentage of items that are processed in a job.
The exception to this is when ToAsyncEnumerable
is used to wrap an IEnumerable
in an AsyncEnumerableWrapper
. When this is the case, Runly performs a Count
on the underlying collection to get a total item count. This behavior can be turned off by passing false
into ToAsyncEnumerable(bool canBeCounted)
. This should be done when the IEnumerable
is reading from an underlying stream that can’t be enumerated more than once.
public override IAsyncEnumerable<FileRecord> GetItemsAsync()
{
// Set canBeCounted to false for IEnumerables with underlying streams
return new MyFileReader("some-file.txt").ToAsyncEnumerable(canBeCounted: false);
}
Identifying Items
Items are identified in the results of a run using a human readable string that’s created by passing the item to GetItemIdAsync
. Overriding this method in your job is optional. The default implementation calls the item’s ToString
method, so if TItem.ToString
returns a string that identifies the item, no override is needed. If TItem
has a name, ID, or similar data, you should override GetItemIdAsync
to return this data instead.
ProcessAsync
ProcessAsync
is the method where the job does its work, being called once for each item returned from GetItemsAsync
. When the method is complete, it
returns a Result
indicating success or failure and optionally includes output.
Parallel Execution
By default, ProcessAsync
can be called in parallel using multiple asynchronous Task
s. Access to the collection of items is synchronized, so even if the enumerator is not thread-safe it will be accessed in a thread-safe manner, making it possible to call ProcessAsync
concurrently. This behavior can be prevented by setting Options.CanProcessInParallel
to false
in the constructor.
The number of tasks used is determined by the config parameter Execution.ParallelTaskCount.
Dependency injection
ProcessAsync
can accept between zero to sixteen dependencies in addition to TItem
, depending on which Process
base class is extended. Dependencies that are not thread-safe, such as a database connection, should be taken through the ProcessAsync
method and not the constructor. A dependency taken through the constructor would be shared across multiple parallel executions of ProcessAsync
, likely leading to hard to diagnose errors.
Read more about dependency injection in jobs.
Transactions
Transactions should begin and end within ProcessAsync
. Transactions should never start in InitializeAsync
and end in FinalizeAsync
or in different calls to ProcessAsync
. Doing so can create long duration transactions, unpredictable behavior, and bugs that are hard to diagnose.
Take a scenario in which a job sends an alert as an email to a group of users:
public override IAsyncEnumerable<User> GetItemsAsync()
{
return db.GetUsersToAlert();
}
public override async Task<Result> ProcessAsync(User user, IDbConnection db)
{
using (var tx = db.BeginTransaction())
{
await mailApi.SendAlertToUser(user);
user.MarkUserAsAlerted();
await db.SaveUser(user);
tx.Commit();
return Result.Success();
}
}
While this code will work, it may break down in some situations.
Using Object-Relational Mappers
When using an object-relational mapper (ORM) within a transaction, it may be necessary to fetch the entity being modified in the same transaction in which it is modified. In this case, that would mean getting the User
from the database in ProcessAsync
rather than GetItemsAsync
.
Changing Conditions
Generally as the number of items returned from GetItemsAsync
increases, the length of time that the job runs will increase. The conditions that existed at the beginning of the job may change before a particular user is sent an alert. When ProcessAsync
is called for a user that no longer needs an alert, the alert may be sent again without issue, but if db.SaveUser()
checks for concurrency an exception would be thrown.
Concurrency in ProcessAsync
A more resilient approach is to keep the items returned from GetItemsAsync
lightweight and query the entity in ProcessAsync
. Returning user IDs instead of full User
objects allows you to keep the transaction isolated to ProcessAsync
. By fetching the User
and checking the status of the user’s alert in the transaction, any changes to the User
in the time between GetItemsAsync
and ProcessAsync
are accounted for.
public override IAsyncEnumerable<Guid> GetItemsAsync()
{
return db.GetUserIdsToAlert();
}
public override async Task<Result> ProcessAsync(Guid userId, IDbConnection db)
{
using (var tx = db.BeginTransaction())
{
var user = await db.GetUserById(userId);
if (!user.HasBeenAlerted)
{
await mailApi.SendAlertToUser(user);
user.MarkUserAsAlerted();
await db.SaveUser(user);
tx.Commit();
return Result.Success();
}
else
{
return Result.Success("No alert sent");
}
}
}
Cancellation
The Job
class contains a CancellationToken that should be used in ProcessAsync
. When calling methods that accept a CancellationToken
, simply pass the CancellationToken
property to the method, like the CopyDirectory
job in Examples.GettingStarted
does when copying a file:
public override async Task<Result> ProcessAsync(string file)
{
// ...
using (var source = File.Open(sourceFile, FileMode.Open))
{
var destDir = Path.GetDirectoryName(destFile);
if (!Directory.Exists(destDir))
Directory.CreateDirectory(destDir);
using (var dest = File.Create(destFile))
{
await source.CopyToAsync(dest, CancellationToken);
}
}
// ...
}
When a job is cancelled the CancellationToken
property’s IsCancellationRequested
flag will be set to true, enabling the job to abort work and return a Result
indicating the method was cancelled. Once cancellation has been requested, all currently executing ProcessAsync
tasks are allowed to complete and no further items will be processed. FinalizeAsync
will then be called with a disposition of Disposition.Cancelled
.
Result
When ProcessAsync
is complete, it must return a Result
which indicates success or failure. If the job can produce multiple
types of success or failure a category can be provided to group the results. For instance, if an item has been previously processed it could be put in a “Skipped” category so that there is a “Successful” (the default category name for successfully processed items) and a “Skipped” category.
A Result
should be created using one the static methods on the class, such as Success
or Failure
. If ProcessAsync
uses the CancellationToken
then SuccessOrCancelled
can be used to create a successful result unless the cancellation flag has been raised.
public override async Task<Result> ProcessAsync(string file)
{
var sourceFile = Path.Combine(Config.Source, file);
var destFile = Path.Combine(Config.Destination, file);
if (File.Exists(destFile))
return Result.Success("Already Copied");
try
{
using (var source = File.Open(sourceFile, FileMode.Open))
{
var destDir = Path.GetDirectoryName(destFile);
if (!Directory.Exists(destDir))
Directory.CreateDirectory(destDir);
using (var dest = File.Create(destFile))
{
await source.CopyToAsync(dest, CancellationToken);
}
}
}
catch (UnauthorizedAccessException) when (Config.IgnoreUnauthorizedAccessException)
{
return Result.Success("Skipped - Unauthorized", "Skipping file copy due to an UnauthorizedAccessException being thrown. Set IgnoreWhenAccessDenied = false to treat as an error.");
}
return Result.SuccessOrCancelled(CancellationToken);
}
Exceptions
If an unhandled exception occurs in ProcessAsync
the exception is caught and recorded in a failed Result
with the exception type name as the category. The full exception is stored with the result so that it can be inspected from the dashboard.
InitializeAsync
As the name suggests, InitializeAsync
is called at the beginning of a job’s lifetime, before GetItemsAsync
. This method gives an opportunity for the job to do any async
initialization that it might need to do. In general, any work that needs to be done before processing starts should be done here instead of in the constructor, including inspection and validation of the config and other constructor parameters. If the job cannot start due to invalid config, an exception should be thrown in this method.
Constructor vs. InitializeAsync
The constructor should only set fields using arguments passed into the constructor. An exception thrown in the constructor should indicate a design-time error, such as an ArgumentNullException
being thrown because a dependency was not supplied. Validation and initialization should occur in InitializeAsync
. When in doubt, do the work in InitializeAsync
.
public class MyJob : Process<MyConfig, string>
{
readonly IDownloader downloader;
public MyJob(MyConfig config, IDownloader downloader)
: base(config)
{
// validate dependencies in constructor
this.downloader = downloader ?? throw new ArgumentNullException(nameof(downloader));
}
public override async Task InitializeAsync()
{
// validate configuration in InitializeAsync
if (String.IsNullOrWhiteSpace(Config.DownloadUrl))
throw new ArgumentException("Missing download URL", nameof(Config.DownloadUrl));
var data = await downloader.DownloadFile(Config.DownloadUrl);
// ...
}
// ...
}
FinalizeAsync
FinalizeAsync
gives jobs an opportunity to do something after all items have been processed, such as cleanup or publishing an event to alert another application that the job’s task is finished. A Disposition
parameter indicates whether the job was successful, failed or cancelled by a user. This information can be used to determine how FinalizeAsync
behaves and can also determine whether other jobs run in response to the job completing.
public override async Task<object> FinalizeAsync(Disposition disposition)
{
await db.CleanUpAfterJob();
if (disposition == Disposition.Successful)
{
return new { Message = "Job completed successfully!" };
}
else
{
return new { Message = "Job failed to complete successfully." };
}
}