Class AsyncRecordPipeLoop

java.lang.Object
com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop

public class AsyncRecordPipeLoop extends Object
Class that knows how to Run an asynchronous job (lambda, supplier) that writes into a RecordPipe, with another lambda (consumer) that consumes records from the pipe. Takes care of the job status monitoring, blocking when the pipe is empty, etc. There is a feature flag (System property "qqq.AsyncRecordPipeLoop.doFinalFlushInSupplierThread") to control where a finalFlush call on a BufferedRecordPipe takes place. Originally this call happened after the supplier job was complete, but it was (we believe) incorrectly done on the same thread as the consumer. This feature flag (which defaults to true - the now-believed correct behavior) moves that finalFlush call to be on the supplier thread to avoid deadlocks. But, if this fix/change turns out to not be correct, one can set this system property to anything other than "true" (recommended "false") to revert to the old behavior.
  • Constructor Details

    • AsyncRecordPipeLoop

      public AsyncRecordPipeLoop()
  • Method Details

    • run

      public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction<AsyncJobCallback,? extends Serializable,QException> supplier, UnsafeSupplier<Integer,QException> consumer) throws QException
      Run an async-record-pipe-loop.
      Parameters:
      jobName - name for the async job thread
      recordLimit - optionally, cancel the supplier/job after this number of records. e.g., for a preview step.
      recordPipe - constructed before this call, and used in both of the lambdas
      supplier - lambda that adds records into the pipe. e.g., a query or extract step.
      consumer - lambda that consumes records from the pipe e.g., a transform/load step.
      Throws:
      QException
    • getMinRecordsToConsume

      public Integer getMinRecordsToConsume()
      Getter for minRecordsToConsume
    • setMinRecordsToConsume

      public void setMinRecordsToConsume(Integer minRecordsToConsume)
      Setter for minRecordsToConsume
    • withMinRecordsToConsume

      public AsyncRecordPipeLoop withMinRecordsToConsume(Integer minRecordsToConsume)
      Fluent setter for minRecordsToConsume
    • getForcedJobUUID

      public String getForcedJobUUID()
      Getter for forcedJobUUID
    • setForcedJobUUID

      public void setForcedJobUUID(String forcedJobUUID)
      Setter for forcedJobUUID
    • withForcedJobUUID

      public AsyncRecordPipeLoop withForcedJobUUID(String forcedJobUUID)
      Fluent setter for forcedJobUUID