Queue pipeline example


An asynchronous queue pipeline uses parallel tasks and concurrent queues to sequentially process input values in stages. For efficient pipeline processing, an applicable number of producer and consumer task instances must be allocated to prevent overloading any of the queues, resulting in a pipeline bottleneck.

This example shows how to use the Dari asynchronous classes to implement a simple pipeline that processes Book objects. The purpose of the pipeline is to retrieve all Book objects stored in a database and identify the longest word across all of the books.

The diagram shows the pipeline process flow. The pipeline consists of four queues that are produced and consumed by five background tasks. The queues and tasks are created and started in the BookAnalyzer class.

Queue pipeline example.png Queue pipeline example.png

In the above diagram—

  1. The Dari-supplied AsyncDatabaseReader implementation retrieves all Book objects from the database and produces the items into bookQueue.
  2. BookReader implementation consumes bookQueue and produces to rawBookWordQueue, which contains BookWord objects.
  3. BookWordSanitizer implementation consumes items in rawBookWordQueue and produces to cleanedBookWordQueue, which also contains BookWord objects.
  4. BookWordFilter implementation consumes from cleanedBookWordQueue and produces to longestWordQueue, which contains string items.
  5. LongestWordCalculator implementation consumes from longestWordQueue and writes the longest detected word to a log.

The processing sequence shown in the diagram can be tracked in the Task Status tool.

Book Analyzer queues.png Book Analyzer queues.png

In the above screen shot—

  • The “Book Analyzer Queues” section lists the queue processors in the context of producing and consuming queues. The “Production” column lists the tasks that produce items into a queue; the “Consumption” column lists the tasks that consume items in a queue. Tasks that function as both a producer and a consumer are listed in both columns.
  • The “Book Analyzer Queues” section reflects the pipeline flow in a bottom-up fashion. For example, in the first step, AsyncDatabaseReader reads from the database and produces to bookQueue. In the second step, BookAnalyzer, listed in the “Consumption” column, consumes from bookQueue (2a). BookAnalyzer then produces to rawBookWordQueue (2b), and is thus listed in the “Production” column as well.
  • The “Successes” and “Failures” columns show the number of items successfully or unsuccessfully produced into or consumed from the queue. The “Wait” column shows the average time it took a task to produce or consume each item in the queue.


You can have multiple instances of a task running if necessary, which would be reflected in the Task Status tool. For example, if you instantiate three BookReader tasks, then three instances would be listed:

Queue Array List.png Queue Array List.png


Book represents a book stored in the database. The text of a book is referenced with a URL.

public class Book extends Content {

    @Required
    @Indexed(unique = true)
    private String name;

    /* i.e. http://classics.mit.edu/Homer/odyssey.mb.txt */
    private String textUrl;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTextUrl() {
        return textUrl;
    }

    public void setTextUrl(String textUrl) {
        this.textUrl = textUrl;
    }
}


BookWord represents a word from a book, consisting of the record UUID of the book with which the word is associated, the word’s sequential position in the book, and the text of the word.

public class BookWord {

    private UUID bookId;
    private String word;
    private int index;

    public BookWord(UUID bookId, String word, int index) {
        this.bookId = bookId;
        this.word = word;
        this.index = index;
    }

    public UUID getBookId() {
        return bookId;
    }

    public String getWord() {
        return word;
    }

    public int getIndex() {
        return index;
    }
}


The analyze method of BookAnalyzer creates the queues and creates and starts the consumer and producer tasks. The producer and consumer classes are included as inner classes in BookAnalyzer, but are shown in separate listings below.

public class BookAnalyzer {

    protected static final Logger LOGGER = LoggerFactory.getLogger(BookAnalyzer.class);
    public static final String EXECUTOR = "Book Analyzer";
    public static void analyze() {
      /* Create the queues */
        AsyncQueue<Book> bookQueue = new AsyncQueue<>();
        AsyncQueue<BookWord> rawBookWordQueue = new AsyncQueue<>(new ArrayBlockingQueue<>(500));
        AsyncQueue<BookWord> cleanedBookWordQueue = new AsyncQueue<>();
        AsyncQueue<String> longestWordQueue = new AsyncQueue<>();

       /* Mark the queues to close automatically */
       longestWordQueue.closeAutomatically();
       cleanedBookWordQueue.closeAutomatically();
       rawBookWordQueue.closeAutomatically();
       bookQueue.closeAutomatically();
       /* Create the consumers and producers
          Only 1 database reader */
       AsyncDatabaseReader<Book> bookFinder = new AsyncDatabaseReader<>(EXECUTOR,
             bookQueue, Database.Static.getDefault(), Query.from(Book.class));

       /* For the following tasks, only 1 instance is created. To create multiple tasks,
         increase the task's count field (e.g. bookReaderCount). */

       /* Implements AsyncConsumer, but also behaves like a Processor. See BookReader listing below. */
       final int bookReaderCount = 1;
       List<AsyncConsumer<Book>> bookReaders = IntStream
             .range(0, bookReaderCount)
             .mapToObj(i -> new BookReader(
                     bookQueue, rawBookWordQueue))
             .collect(Collectors.toList());

       final int wordSanitizersCount = 1;
       List<AsyncProcessor<BookWord, BookWord>> wordSanitizers = IntStream
             .range(0, wordSanitizersCount)
             .mapToObj(i -> new BookWordSanitizer(
                     rawBookWordQueue, cleanedBookWordQueue))
             .collect(Collectors.toList());

       final int wordFiltersCount = 1;
       List<AsyncProcessor<BookWord, String>> wordFilters = IntStream
             .range(0, wordFiltersCount)
             .mapToObj(i -> new BookWordFilter(
                     cleanedBookWordQueue, longestWordQueue))
             .collect(Collectors.toList());

       /* Only 1 longest word calculator (unless you want to share the running tally across threads). */
       final int longestWordCalculatorsCount = 1;
       List<AsyncConsumer<String>> longestWordCalculators = IntStream
             .range(0, longestWordCalculatorsCount)
             .mapToObj(i -> new LongestWordCalculator(longestWordQueue))
             .collect(Collectors.toList());

       /* Starts the Consumers and Producers */
       longestWordCalculators.forEach(Task::submit);
       wordFilters.forEach(Task::submit);
       wordSanitizers.forEach(Task::submit);
       bookReaders.forEach(Task::submit);
       bookFinder.submit(); // runs once and stops

       LOGGER.info("Started Analyzing Books...");
    }
    /* Inner Task classes for queue processing, shown below. */
}


As a subclass of AsynConsumer, BookReader implements the consume method. Using the java.util.Scanner class, the method consumes Book objects from bookQueue, parses the book into word tokens, and produces to rawBookWordQueue, which contains BookWord objects.

Even though BookReader extends AsynConsumer, the constructor makes BookReader a producer as well. The Queue#addProducer method enables a consumer to also take on a producer role.

public static class BookReader extends AsyncConsumer<Book> {

    private AsyncQueue<BookWord> output;

    public BookReader(AsyncQueue<Book> input, AsyncQueue<BookWord> output) {
        super(EXECUTOR, input);
        this.output = output;
        this.output.addProducer(this);
    }

    @Override
    protected void consume(Book book) throws Exception {
        Scanner sc = new Scanner(book.getTextStream());
        int index = 0;
        while (sc.hasNext()) {
            String word = sc.next();
            output.add(new BookWord(book.getId(), word, index));
            index++;
        }
    }

    @Override
    protected void finished() {
        try {
            super.finished();
        } finally {
            this.output.removeProducer(this);
        }
    }
}


As a subclass of AsyncProcessor, BookWordSanitizer implements the process method. The method consumes BookWord objects from rawBookWordQueue, removes punctuation from word text, and checks word text for unwanted characters. Transformed and remaining BookWord objects are produced to cleanedBookWordQueue. (Clean up code is not shown in the code example.)

public static class BookWordSanitizer extends AsyncProcessor<BookWord, BookWord> {

    Set<Integer> PUNCTUATION_CHARACTERS = new HashSet<>(
             Arrays.asList(
                     (int) '~',   (int) '&',   (int) '{',   (int) '\'',
                     (int) '`',   (int) '*',   (int) '}',   (int) '"',
                     (int) '!',   (int) '(',   (int) '[',   (int) '<',
                     (int) '@',   (int) ')',   (int) ']',   (int) '>',
                     (int) '#',   (int) '-',   (int) '\\',  (int) ',',
                     (int) '$',   (int) '_',   (int) '|',   (int) '.',
                     (int) '%',   (int) '+',   (int) ';',   (int) '?',
                     (int) '^',   (int) '=',   (int) ':',   (int) '/'
             )
    );

    public BookWordSanitizer(AsyncQueue<BookWord> input, AsyncQueue<BookWord> output) {
        super(EXECUTOR, input, output);
    }

    @Override
    protected BookWord process(BookWord bookWord) throws Exception {
        String word = bookWord.getWord();

        Thread.sleep(1);

        /* Trim punctuation and lower case it */
        String sanitizedWord = trimPunctuation(word, PUNCTUATION_CHARACTERS::contains).toLowerCase();

        /* Ignore email address like text and URLs */
        if (sanitizedWord.contains("@") || sanitizedWord.startsWith("http")) {
            sanitizedWord = "";
        }

        if (!sanitizedWord.equals(word)) {
            return new BookWord(bookWord.getBookId(), sanitizedWord, bookWord.getIndex());

        } else {
            return bookWord;
        }
    }
    /* Clean-up methods not shown. */
}


As a subclass of AsyncProcessor, BookWordFilter implements the process method. The method consumes BookWord objects from cleanedBookWordQueue, verifies that the object’s word text is at least one character in length, and produces the word text to longestWordQueue, a String queue.

public static class BookWordFilter extends AsyncProcessor<BookWord, String> {

    private static final int MINIMUM_WORD_LENGTH = 1;

    public BookWordFilter(AsyncQueue<BookWord> input, AsyncQueue<String> output) {
        super(EXECUTOR, input, output);
    }

    @Override
    protected String process(BookWord bookWord) throws Exception {
        String word = bookWord.getWord();

        if (word.length() < MINIMUM_WORD_LENGTH) {
            /* Or you can throw an exception and handle it gracefully. */
            return null;
        }
        return word;
    }

    @Override
    protected void handleError(BookWord item, Exception error) {
        /* Handle errors here... */
    }
}


As a subclass of AsynConsumer, LongestWordCalculator implements the consume method. The method consumes each word string from the longestWordQueue, compares the length of the current word to that of the longest word found so far, and sets a variable to the current word if it is indeed longer. Upon completion, the task writes the longest word found across all of the Book objects.

public static class LongestWordCalculator extends AsyncConsumer<String> {

    private String longestWord = null;

    public LongestWordCalculator(AsyncQueue<String> input) {
        super(EXECUTOR, input);
    }

    @Override
    protected void consume(String item) throws Exception {
        if (longestWord == null || longestWord.length() < item.length()) {
            longestWord = item;
        }
    }

    @Override
    protected void finished() {
        super.finished();
        LOGGER.info("Longest word is: " + longestWord);
    }
}

Our robust, flexible Design System provides hundreds of pre-built components you can use to build the presentation layer of your dreams.

Asset types
Module types
Page types
Brightspot is packaged with content types that get you up and running in a matter of days, including assets, modules and landing pages.

Content types
Modules
Landing pages
Everything you need to know when creating, managing, and administering content within Brightspot CMS.

Dashboards
Publishing
Workflows
Admin configurations
A guide for installing, supporting, extending, modifying and administering code on the Brightspot platform.

Field types
Content modeling
Rich-text elements
Images
A guide to configuring Brightspot's library of integrations, including pre-built options and developer-configured extensions.

Google Analytics
Shopify
Apple News