Queueable.php 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. <?php
  2. namespace Illuminate\Bus;
  3. use Closure;
  4. use Illuminate\Queue\CallQueuedClosure;
  5. use Illuminate\Support\Arr;
  6. use PHPUnit\Framework\Assert as PHPUnit;
  7. use RuntimeException;
  8. trait Queueable
  9. {
  10. /**
  11. * The name of the connection the job should be sent to.
  12. *
  13. * @var string|null
  14. */
  15. public $connection;
  16. /**
  17. * The name of the queue the job should be sent to.
  18. *
  19. * @var string|null
  20. */
  21. public $queue;
  22. /**
  23. * The number of seconds before the job should be made available.
  24. *
  25. * @var \DateTimeInterface|\DateInterval|array|int|null
  26. */
  27. public $delay;
  28. /**
  29. * Indicates whether the job should be dispatched after all database transactions have committed.
  30. *
  31. * @var bool|null
  32. */
  33. public $afterCommit;
  34. /**
  35. * The middleware the job should be dispatched through.
  36. *
  37. * @var array
  38. */
  39. public $middleware = [];
  40. /**
  41. * The jobs that should run if this job is successful.
  42. *
  43. * @var array
  44. */
  45. public $chained = [];
  46. /**
  47. * The name of the connection the chain should be sent to.
  48. *
  49. * @var string|null
  50. */
  51. public $chainConnection;
  52. /**
  53. * The name of the queue the chain should be sent to.
  54. *
  55. * @var string|null
  56. */
  57. public $chainQueue;
  58. /**
  59. * The callbacks to be executed on chain failure.
  60. *
  61. * @var array|null
  62. */
  63. public $chainCatchCallbacks;
  64. /**
  65. * Set the desired connection for the job.
  66. *
  67. * @param string|null $connection
  68. * @return $this
  69. */
  70. public function onConnection($connection)
  71. {
  72. $this->connection = $connection;
  73. return $this;
  74. }
  75. /**
  76. * Set the desired queue for the job.
  77. *
  78. * @param string|null $queue
  79. * @return $this
  80. */
  81. public function onQueue($queue)
  82. {
  83. $this->queue = $queue;
  84. return $this;
  85. }
  86. /**
  87. * Set the desired connection for the chain.
  88. *
  89. * @param string|null $connection
  90. * @return $this
  91. */
  92. public function allOnConnection($connection)
  93. {
  94. $this->chainConnection = $connection;
  95. $this->connection = $connection;
  96. return $this;
  97. }
  98. /**
  99. * Set the desired queue for the chain.
  100. *
  101. * @param string|null $queue
  102. * @return $this
  103. */
  104. public function allOnQueue($queue)
  105. {
  106. $this->chainQueue = $queue;
  107. $this->queue = $queue;
  108. return $this;
  109. }
  110. /**
  111. * Set the desired delay in seconds for the job.
  112. *
  113. * @param \DateTimeInterface|\DateInterval|array|int|null $delay
  114. * @return $this
  115. */
  116. public function delay($delay)
  117. {
  118. $this->delay = $delay;
  119. return $this;
  120. }
  121. /**
  122. * Set the delay for the job to zero seconds.
  123. *
  124. * @return $this
  125. */
  126. public function withoutDelay()
  127. {
  128. $this->delay = 0;
  129. return $this;
  130. }
  131. /**
  132. * Indicate that the job should be dispatched after all database transactions have committed.
  133. *
  134. * @return $this
  135. */
  136. public function afterCommit()
  137. {
  138. $this->afterCommit = true;
  139. return $this;
  140. }
  141. /**
  142. * Indicate that the job should not wait until database transactions have been committed before dispatching.
  143. *
  144. * @return $this
  145. */
  146. public function beforeCommit()
  147. {
  148. $this->afterCommit = false;
  149. return $this;
  150. }
  151. /**
  152. * Specify the middleware the job should be dispatched through.
  153. *
  154. * @param array|object $middleware
  155. * @return $this
  156. */
  157. public function through($middleware)
  158. {
  159. $this->middleware = Arr::wrap($middleware);
  160. return $this;
  161. }
  162. /**
  163. * Set the jobs that should run if this job is successful.
  164. *
  165. * @param array $chain
  166. * @return $this
  167. */
  168. public function chain($chain)
  169. {
  170. $this->chained = collect($chain)->map(function ($job) {
  171. return $this->serializeJob($job);
  172. })->all();
  173. return $this;
  174. }
  175. /**
  176. * Prepend a job to the current chain so that it is run after the currently running job.
  177. *
  178. * @param mixed $job
  179. * @return $this
  180. */
  181. public function prependToChain($job)
  182. {
  183. $this->chained = Arr::prepend($this->chained, $this->serializeJob($job));
  184. return $this;
  185. }
  186. /**
  187. * Append a job to the end of the current chain.
  188. *
  189. * @param mixed $job
  190. * @return $this
  191. */
  192. public function appendToChain($job)
  193. {
  194. $this->chained = array_merge($this->chained, [$this->serializeJob($job)]);
  195. return $this;
  196. }
  197. /**
  198. * Serialize a job for queuing.
  199. *
  200. * @param mixed $job
  201. * @return string
  202. *
  203. * @throws \RuntimeException
  204. */
  205. protected function serializeJob($job)
  206. {
  207. if ($job instanceof Closure) {
  208. if (! class_exists(CallQueuedClosure::class)) {
  209. throw new RuntimeException(
  210. 'To enable support for closure jobs, please install the illuminate/queue package.'
  211. );
  212. }
  213. $job = CallQueuedClosure::create($job);
  214. }
  215. return serialize($job);
  216. }
  217. /**
  218. * Dispatch the next job on the chain.
  219. *
  220. * @return void
  221. */
  222. public function dispatchNextJobInChain()
  223. {
  224. if (! empty($this->chained)) {
  225. dispatch(tap(unserialize(array_shift($this->chained)), function ($next) {
  226. $next->chained = $this->chained;
  227. $next->onConnection($next->connection ?: $this->chainConnection);
  228. $next->onQueue($next->queue ?: $this->chainQueue);
  229. $next->chainConnection = $this->chainConnection;
  230. $next->chainQueue = $this->chainQueue;
  231. $next->chainCatchCallbacks = $this->chainCatchCallbacks;
  232. }));
  233. }
  234. }
  235. /**
  236. * Invoke all of the chain's failed job callbacks.
  237. *
  238. * @param \Throwable $e
  239. * @return void
  240. */
  241. public function invokeChainCatchCallbacks($e)
  242. {
  243. collect($this->chainCatchCallbacks)->each(function ($callback) use ($e) {
  244. $callback($e);
  245. });
  246. }
  247. /**
  248. * Assert that the job has the given chain of jobs attached to it.
  249. *
  250. * @param array $expectedChain
  251. * @return void
  252. */
  253. public function assertHasChain($expectedChain)
  254. {
  255. PHPUnit::assertTrue(
  256. collect($expectedChain)->isNotEmpty(),
  257. 'The expected chain can not be empty.'
  258. );
  259. if (collect($expectedChain)->contains(fn ($job) => is_object($job))) {
  260. $expectedChain = collect($expectedChain)->map(fn ($job) => serialize($job))->all();
  261. } else {
  262. $chain = collect($this->chained)->map(fn ($job) => get_class(unserialize($job)))->all();
  263. }
  264. PHPUnit::assertTrue(
  265. $expectedChain === ($chain ?? $this->chained),
  266. 'The job does not have the expected chain.'
  267. );
  268. }
  269. /**
  270. * Assert that the job has no remaining chained jobs.
  271. *
  272. * @return void
  273. */
  274. public function assertDoesntHaveChain()
  275. {
  276. PHPUnit::assertEmpty($this->chained, 'The job has chained jobs.');
  277. }
  278. }