ConcurrencyLimiter.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. <?php
  2. namespace Illuminate\Redis\Limiters;
  3. use Illuminate\Contracts\Redis\LimiterTimeoutException;
  4. use Illuminate\Support\Sleep;
  5. use Illuminate\Support\Str;
  6. use Throwable;
  7. class ConcurrencyLimiter
  8. {
  9. /**
  10. * The Redis factory implementation.
  11. *
  12. * @var \Illuminate\Redis\Connections\Connection
  13. */
  14. protected $redis;
  15. /**
  16. * The name of the limiter.
  17. *
  18. * @var string
  19. */
  20. protected $name;
  21. /**
  22. * The allowed number of concurrent tasks.
  23. *
  24. * @var int
  25. */
  26. protected $maxLocks;
  27. /**
  28. * The number of seconds a slot should be maintained.
  29. *
  30. * @var int
  31. */
  32. protected $releaseAfter;
  33. /**
  34. * Create a new concurrency limiter instance.
  35. *
  36. * @param \Illuminate\Redis\Connections\Connection $redis
  37. * @param string $name
  38. * @param int $maxLocks
  39. * @param int $releaseAfter
  40. * @return void
  41. */
  42. public function __construct($redis, $name, $maxLocks, $releaseAfter)
  43. {
  44. $this->name = $name;
  45. $this->redis = $redis;
  46. $this->maxLocks = $maxLocks;
  47. $this->releaseAfter = $releaseAfter;
  48. }
  49. /**
  50. * Attempt to acquire the lock for the given number of seconds.
  51. *
  52. * @param int $timeout
  53. * @param callable|null $callback
  54. * @param int $sleep
  55. * @return mixed
  56. *
  57. * @throws \Illuminate\Contracts\Redis\LimiterTimeoutException
  58. * @throws \Throwable
  59. */
  60. public function block($timeout, $callback = null, $sleep = 250)
  61. {
  62. $starting = time();
  63. $id = Str::random(20);
  64. while (! $slot = $this->acquire($id)) {
  65. if (time() - $timeout >= $starting) {
  66. throw new LimiterTimeoutException;
  67. }
  68. Sleep::usleep($sleep * 1000);
  69. }
  70. if (is_callable($callback)) {
  71. try {
  72. return tap($callback(), function () use ($slot, $id) {
  73. $this->release($slot, $id);
  74. });
  75. } catch (Throwable $exception) {
  76. $this->release($slot, $id);
  77. throw $exception;
  78. }
  79. }
  80. return true;
  81. }
  82. /**
  83. * Attempt to acquire the lock.
  84. *
  85. * @param string $id A unique identifier for this lock
  86. * @return mixed
  87. */
  88. protected function acquire($id)
  89. {
  90. $slots = array_map(function ($i) {
  91. return $this->name.$i;
  92. }, range(1, $this->maxLocks));
  93. return $this->redis->eval(...array_merge(
  94. [$this->lockScript(), count($slots)],
  95. array_merge($slots, [$this->name, $this->releaseAfter, $id])
  96. ));
  97. }
  98. /**
  99. * Get the Lua script for acquiring a lock.
  100. *
  101. * KEYS - The keys that represent available slots
  102. * ARGV[1] - The limiter name
  103. * ARGV[2] - The number of seconds the slot should be reserved
  104. * ARGV[3] - The unique identifier for this lock
  105. *
  106. * @return string
  107. */
  108. protected function lockScript()
  109. {
  110. return <<<'LUA'
  111. for index, value in pairs(redis.call('mget', unpack(KEYS))) do
  112. if not value then
  113. redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
  114. return ARGV[1]..index
  115. end
  116. end
  117. LUA;
  118. }
  119. /**
  120. * Release the lock.
  121. *
  122. * @param string $key
  123. * @param string $id
  124. * @return void
  125. */
  126. protected function release($key, $id)
  127. {
  128. $this->redis->eval($this->releaseScript(), 1, $key, $id);
  129. }
  130. /**
  131. * Get the Lua script to atomically release a lock.
  132. *
  133. * KEYS[1] - The name of the lock
  134. * ARGV[1] - The unique identifier for this lock
  135. *
  136. * @return string
  137. */
  138. protected function releaseScript()
  139. {
  140. return <<<'LUA'
  141. if redis.call('get', KEYS[1]) == ARGV[1]
  142. then
  143. return redis.call('del', KEYS[1])
  144. else
  145. return 0
  146. end
  147. LUA;
  148. }
  149. }