Async database
Dari provides asynchronous database support for efficient batch processing in high-volume environments. AsyncDatabaseReader reads from a database to a queue, and AsyncDatabaseWriter writes to a database from a queue.
AsyncDatabaseReader and AsyncDatabaseWriter are implemented as background tasks, extending AsyncProducer and AsyncConsumer respectively. AsyncProducer produces items into an output queue; a subclass of AsyncProducer implements how data is produced into the queue. AsyncConsumer consumes all items from an input queue; a subclass of AsyncConsumer implements how the queue data is consumed. You can create your own queue processing background tasks by subclassing AsyncProducer or AsyncConsumer.
When a background task is created, it appears in the Task Status tool.
AsyncDatabaseReader enables a queue to be produced from a database query. In the following example, a queue is created and populated with Article objects retrieved from the database.
/* Instantiate an AsyncQueue object. */
AsyncQueue<Article> articleQueue = new AsyncQueue<Article>();
/* Retrieve all Articles. */
Query<Article> articleQuery = Query.from(Article.class);
/* Instantiate and submit a AsyncDatabaseReader object.
The first argument is the executor. If null, the default executor is used.
The second argument identifies the queue where the Article objects are written. If null, the constructor creates a new queue.
The third argument is the database from which the objects are retrieved.
The fourth argument is the database query to execute. */
new AsyncDatabaseReader(
"Article Reader",
articleQueue,
Database.Static.getDefault(),
articleQuery)
.submit();
Typically when a queue is populated by an AsyncDatabaseReader task, it is processed by another background task, one that reads objects from an input queue, processes them, and writes the processed objects to an output queue. The following code creates such a task, which reads from the articleQueue populated by Article Reader, processes the articles, then writes them to an output queue processedQueue. This task could, for example, change the lead image that every article references.
/* Create processing task that reads from articleQueue, processes articles, and writes to processedQueue */
AsyncQueue<Article> processedQueue = new AsyncQueue<Article>(new ArrayBlockingQueue<Article>(10000));
new ProcessorTask(
"Article Processor",
articleQueue,
processedQueue)
.submit();
AsyncDatabaseWriter commits many objects to a database in batches. Continuing with the above scenario, the following example creates an AsyncDatabaseWriter task, which commits the articles in “processedQueue” to the database.
new AsyncDatabaseWriter(
"Article Writer", /* Executor. If null, default executor is used. */
processedQueue, /* Input queue where Article objects are stored in memory. */
Database.Static.getDefault(), /* Database to which objects are written. */
WriteOperation.SAVE, /* Operation to use. */
100, /* Commit size. */
true) /* commitEventually flag. If true uses commitWritesEventually method, otherwise uses commitWrites method. *
.submit();