Batch.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. <?php
  2. namespace Illuminate\Bus;
  3. use Carbon\CarbonImmutable;
  4. use Closure;
  5. use Illuminate\Contracts\Queue\Factory as QueueFactory;
  6. use Illuminate\Contracts\Support\Arrayable;
  7. use Illuminate\Queue\CallQueuedClosure;
  8. use Illuminate\Support\Arr;
  9. use Illuminate\Support\Collection;
  10. use JsonSerializable;
  11. use Throwable;
  12. class Batch implements Arrayable, JsonSerializable
  13. {
  14. /**
  15. * The queue factory implementation.
  16. *
  17. * @var \Illuminate\Contracts\Queue\Factory
  18. */
  19. protected $queue;
  20. /**
  21. * The repository implementation.
  22. *
  23. * @var \Illuminate\Bus\BatchRepository
  24. */
  25. protected $repository;
  26. /**
  27. * The batch ID.
  28. *
  29. * @var string
  30. */
  31. public $id;
  32. /**
  33. * The batch name.
  34. *
  35. * @var string
  36. */
  37. public $name;
  38. /**
  39. * The total number of jobs that belong to the batch.
  40. *
  41. * @var int
  42. */
  43. public $totalJobs;
  44. /**
  45. * The total number of jobs that are still pending.
  46. *
  47. * @var int
  48. */
  49. public $pendingJobs;
  50. /**
  51. * The total number of jobs that have failed.
  52. *
  53. * @var int
  54. */
  55. public $failedJobs;
  56. /**
  57. * The IDs of the jobs that have failed.
  58. *
  59. * @var array
  60. */
  61. public $failedJobIds;
  62. /**
  63. * The batch options.
  64. *
  65. * @var array
  66. */
  67. public $options;
  68. /**
  69. * The date indicating when the batch was created.
  70. *
  71. * @var \Carbon\CarbonImmutable
  72. */
  73. public $createdAt;
  74. /**
  75. * The date indicating when the batch was cancelled.
  76. *
  77. * @var \Carbon\CarbonImmutable|null
  78. */
  79. public $cancelledAt;
  80. /**
  81. * The date indicating when the batch was finished.
  82. *
  83. * @var \Carbon\CarbonImmutable|null
  84. */
  85. public $finishedAt;
  86. /**
  87. * Create a new batch instance.
  88. *
  89. * @param \Illuminate\Contracts\Queue\Factory $queue
  90. * @param \Illuminate\Bus\BatchRepository $repository
  91. * @param string $id
  92. * @param string $name
  93. * @param int $totalJobs
  94. * @param int $pendingJobs
  95. * @param int $failedJobs
  96. * @param array $failedJobIds
  97. * @param array $options
  98. * @param \Carbon\CarbonImmutable $createdAt
  99. * @param \Carbon\CarbonImmutable|null $cancelledAt
  100. * @param \Carbon\CarbonImmutable|null $finishedAt
  101. * @return void
  102. */
  103. public function __construct(QueueFactory $queue,
  104. BatchRepository $repository,
  105. string $id,
  106. string $name,
  107. int $totalJobs,
  108. int $pendingJobs,
  109. int $failedJobs,
  110. array $failedJobIds,
  111. array $options,
  112. CarbonImmutable $createdAt,
  113. ?CarbonImmutable $cancelledAt = null,
  114. ?CarbonImmutable $finishedAt = null)
  115. {
  116. $this->queue = $queue;
  117. $this->repository = $repository;
  118. $this->id = $id;
  119. $this->name = $name;
  120. $this->totalJobs = $totalJobs;
  121. $this->pendingJobs = $pendingJobs;
  122. $this->failedJobs = $failedJobs;
  123. $this->failedJobIds = $failedJobIds;
  124. $this->options = $options;
  125. $this->createdAt = $createdAt;
  126. $this->cancelledAt = $cancelledAt;
  127. $this->finishedAt = $finishedAt;
  128. }
  129. /**
  130. * Get a fresh instance of the batch represented by this ID.
  131. *
  132. * @return self
  133. */
  134. public function fresh()
  135. {
  136. return $this->repository->find($this->id);
  137. }
  138. /**
  139. * Add additional jobs to the batch.
  140. *
  141. * @param \Illuminate\Support\Enumerable|object|array $jobs
  142. * @return self
  143. */
  144. public function add($jobs)
  145. {
  146. $count = 0;
  147. $jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
  148. $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
  149. if (is_array($job)) {
  150. $count += count($job);
  151. return with($this->prepareBatchedChain($job), function ($chain) {
  152. return $chain->first()
  153. ->allOnQueue($this->options['queue'] ?? null)
  154. ->allOnConnection($this->options['connection'] ?? null)
  155. ->chain($chain->slice(1)->values()->all());
  156. });
  157. } else {
  158. $job->withBatchId($this->id);
  159. $count++;
  160. }
  161. return $job;
  162. });
  163. $this->repository->transaction(function () use ($jobs, $count) {
  164. $this->repository->incrementTotalJobs($this->id, $count);
  165. $this->queue->connection($this->options['connection'] ?? null)->bulk(
  166. $jobs->all(),
  167. $data = '',
  168. $this->options['queue'] ?? null
  169. );
  170. });
  171. return $this->fresh();
  172. }
  173. /**
  174. * Prepare a chain that exists within the jobs being added.
  175. *
  176. * @param array $chain
  177. * @return \Illuminate\Support\Collection
  178. */
  179. protected function prepareBatchedChain(array $chain)
  180. {
  181. return collect($chain)->map(function ($job) {
  182. $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
  183. return $job->withBatchId($this->id);
  184. });
  185. }
  186. /**
  187. * Get the total number of jobs that have been processed by the batch thus far.
  188. *
  189. * @return int
  190. */
  191. public function processedJobs()
  192. {
  193. return $this->totalJobs - $this->pendingJobs;
  194. }
  195. /**
  196. * Get the percentage of jobs that have been processed (between 0-100).
  197. *
  198. * @return int
  199. */
  200. public function progress()
  201. {
  202. return $this->totalJobs > 0 ? round(($this->processedJobs() / $this->totalJobs) * 100) : 0;
  203. }
  204. /**
  205. * Record that a job within the batch finished successfully, executing any callbacks if necessary.
  206. *
  207. * @param string $jobId
  208. * @return void
  209. */
  210. public function recordSuccessfulJob(string $jobId)
  211. {
  212. $counts = $this->decrementPendingJobs($jobId);
  213. if ($this->hasProgressCallbacks()) {
  214. $batch = $this->fresh();
  215. collect($this->options['progress'])->each(function ($handler) use ($batch) {
  216. $this->invokeHandlerCallback($handler, $batch);
  217. });
  218. }
  219. if ($counts->pendingJobs === 0) {
  220. $this->repository->markAsFinished($this->id);
  221. }
  222. if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
  223. $batch = $this->fresh();
  224. collect($this->options['then'])->each(function ($handler) use ($batch) {
  225. $this->invokeHandlerCallback($handler, $batch);
  226. });
  227. }
  228. if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
  229. $batch = $this->fresh();
  230. collect($this->options['finally'])->each(function ($handler) use ($batch) {
  231. $this->invokeHandlerCallback($handler, $batch);
  232. });
  233. }
  234. }
  235. /**
  236. * Decrement the pending jobs for the batch.
  237. *
  238. * @param string $jobId
  239. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  240. */
  241. public function decrementPendingJobs(string $jobId)
  242. {
  243. return $this->repository->decrementPendingJobs($this->id, $jobId);
  244. }
  245. /**
  246. * Determine if the batch has finished executing.
  247. *
  248. * @return bool
  249. */
  250. public function finished()
  251. {
  252. return ! is_null($this->finishedAt);
  253. }
  254. /**
  255. * Determine if the batch has "progress" callbacks.
  256. *
  257. * @return bool
  258. */
  259. public function hasProgressCallbacks()
  260. {
  261. return isset($this->options['progress']) && ! empty($this->options['progress']);
  262. }
  263. /**
  264. * Determine if the batch has "success" callbacks.
  265. *
  266. * @return bool
  267. */
  268. public function hasThenCallbacks()
  269. {
  270. return isset($this->options['then']) && ! empty($this->options['then']);
  271. }
  272. /**
  273. * Determine if the batch allows jobs to fail without cancelling the batch.
  274. *
  275. * @return bool
  276. */
  277. public function allowsFailures()
  278. {
  279. return Arr::get($this->options, 'allowFailures', false) === true;
  280. }
  281. /**
  282. * Determine if the batch has job failures.
  283. *
  284. * @return bool
  285. */
  286. public function hasFailures()
  287. {
  288. return $this->failedJobs > 0;
  289. }
  290. /**
  291. * Record that a job within the batch failed to finish successfully, executing any callbacks if necessary.
  292. *
  293. * @param string $jobId
  294. * @param \Throwable $e
  295. * @return void
  296. */
  297. public function recordFailedJob(string $jobId, $e)
  298. {
  299. $counts = $this->incrementFailedJobs($jobId);
  300. if ($counts->failedJobs === 1 && ! $this->allowsFailures()) {
  301. $this->cancel();
  302. }
  303. if ($this->hasProgressCallbacks() && $this->allowsFailures()) {
  304. $batch = $this->fresh();
  305. collect($this->options['progress'])->each(function ($handler) use ($batch, $e) {
  306. $this->invokeHandlerCallback($handler, $batch, $e);
  307. });
  308. }
  309. if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
  310. $batch = $this->fresh();
  311. collect($this->options['catch'])->each(function ($handler) use ($batch, $e) {
  312. $this->invokeHandlerCallback($handler, $batch, $e);
  313. });
  314. }
  315. if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
  316. $batch = $this->fresh();
  317. collect($this->options['finally'])->each(function ($handler) use ($batch, $e) {
  318. $this->invokeHandlerCallback($handler, $batch, $e);
  319. });
  320. }
  321. }
  322. /**
  323. * Increment the failed jobs for the batch.
  324. *
  325. * @param string $jobId
  326. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  327. */
  328. public function incrementFailedJobs(string $jobId)
  329. {
  330. return $this->repository->incrementFailedJobs($this->id, $jobId);
  331. }
  332. /**
  333. * Determine if the batch has "catch" callbacks.
  334. *
  335. * @return bool
  336. */
  337. public function hasCatchCallbacks()
  338. {
  339. return isset($this->options['catch']) && ! empty($this->options['catch']);
  340. }
  341. /**
  342. * Determine if the batch has "finally" callbacks.
  343. *
  344. * @return bool
  345. */
  346. public function hasFinallyCallbacks()
  347. {
  348. return isset($this->options['finally']) && ! empty($this->options['finally']);
  349. }
  350. /**
  351. * Cancel the batch.
  352. *
  353. * @return void
  354. */
  355. public function cancel()
  356. {
  357. $this->repository->cancel($this->id);
  358. }
  359. /**
  360. * Determine if the batch has been cancelled.
  361. *
  362. * @return bool
  363. */
  364. public function canceled()
  365. {
  366. return $this->cancelled();
  367. }
  368. /**
  369. * Determine if the batch has been cancelled.
  370. *
  371. * @return bool
  372. */
  373. public function cancelled()
  374. {
  375. return ! is_null($this->cancelledAt);
  376. }
  377. /**
  378. * Delete the batch from storage.
  379. *
  380. * @return void
  381. */
  382. public function delete()
  383. {
  384. $this->repository->delete($this->id);
  385. }
  386. /**
  387. * Invoke a batch callback handler.
  388. *
  389. * @param callable $handler
  390. * @param \Illuminate\Bus\Batch $batch
  391. * @param \Throwable|null $e
  392. * @return void
  393. */
  394. protected function invokeHandlerCallback($handler, Batch $batch, ?Throwable $e = null)
  395. {
  396. try {
  397. return $handler($batch, $e);
  398. } catch (Throwable $e) {
  399. if (function_exists('report')) {
  400. report($e);
  401. }
  402. }
  403. }
  404. /**
  405. * Convert the batch to an array.
  406. *
  407. * @return array
  408. */
  409. public function toArray()
  410. {
  411. return [
  412. 'id' => $this->id,
  413. 'name' => $this->name,
  414. 'totalJobs' => $this->totalJobs,
  415. 'pendingJobs' => $this->pendingJobs,
  416. 'processedJobs' => $this->processedJobs(),
  417. 'progress' => $this->progress(),
  418. 'failedJobs' => $this->failedJobs,
  419. 'options' => $this->options,
  420. 'createdAt' => $this->createdAt,
  421. 'cancelledAt' => $this->cancelledAt,
  422. 'finishedAt' => $this->finishedAt,
  423. ];
  424. }
  425. /**
  426. * Get the JSON serializable representation of the object.
  427. *
  428. * @return array
  429. */
  430. public function jsonSerialize(): array
  431. {
  432. return $this->toArray();
  433. }
  434. /**
  435. * Dynamically access the batch's "options" via properties.
  436. *
  437. * @param string $key
  438. * @return mixed
  439. */
  440. public function __get($key)
  441. {
  442. return $this->options[$key] ?? null;
  443. }
  444. }