connection_pool.rb 39.5 KB
Newer Older
1 2
# frozen_string_literal: true

3 4 5
require "thread"
require "concurrent/map"
require "monitor"
6

N
Nick 已提交
7
module ActiveRecord
8
  # Raised when a connection could not be obtained within the connection
9
  # acquisition timeout period: because max connections in pool
10
  # are in use.
11 12 13
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

14
  # Raised when a pool was unable to get ahold of all its connections
15 16 17
  # to perform a "group" action such as
  # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!]
  # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!].
18 19 20
  class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
  end

N
Nick 已提交
21
  module ConnectionAdapters
22
    # Connection pool base class for managing Active Record database
23 24
    # connections.
    #
P
Pratik Naik 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
40 41 42
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
43 44
    # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection]
    #    as with Active Record 2.1 and
45 46
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
47 48
    #    {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!].
    #    This will be the default behavior for Active Record when used in conjunction with
49
    #    Action Pack's request handling cycle.
50
    # 2. Manually check out a connection from the pool with
51
    #    {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for
52
    #    returning this connection to the pool when finished by calling
53 54
    #    {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin].
    # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which
55 56
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
57
    #
P
Pratik Naik 已提交
58 59 60 61 62
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
63
    # There are several connection-pooling-related options that you can add to
64 65
    # your database connection configuration:
    #
M
Matthew Draper 已提交
66 67 68 69 70 71 72
    # * +pool+: maximum number of connections the pool may manage (default 5).
    # * +idle_timeout+: number of seconds that a connection will be kept
    #   unused in the pool before it is automatically disconnected (default
    #   300 seconds). Set this to zero to keep connections forever.
    # * +checkout_timeout+: number of seconds to wait for a connection to
    #   become available before giving up and raising a timeout error (default
    #   5 seconds).
73 74 75 76
    #
    #--
    # Synchronization policy:
    # * all public methods can be called outside +synchronize+
77
    # * access to these instance variables needs to be in +synchronize+:
78 79 80 81
    #   * @connections
    #   * @now_connecting
    # * private methods that require being called in a +synchronize+ blocks
    #   are now explicitly documented
82
    class ConnectionPool
D
Devin Christensen 已提交
83
      # Threadsafe, fair, LIFO queue.  Meant to be used by ConnectionPool
84
      # with which it shares a Monitor.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

100
        # Returns the number of threads currently waiting on this
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
          end
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
            @queue.push element
            @cond.signal
          end
        end

116
        # If +element+ is in the queue, remove and return it, or +nil+.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
        # If +timeout+ is not given, remove and return the head the
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
135
        # is, don't jump ahead in line).  Otherwise, return +nil+.
136
        #
N
Neeraj Singh 已提交
137
        # If +timeout+ is given, block if there is no element
138 139 140 141
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
142
        # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
N
Neeraj Singh 已提交
143
        # becomes available within +timeout+ seconds,
144
        def poll(timeout = nil)
145
          synchronize { internal_poll(timeout) }
146 147 148 149
        end

        private

150 151 152
          def internal_poll(timeout)
            no_wait_poll || (timeout && wait_poll(timeout))
          end
153

154 155 156
          def synchronize(&block)
            @lock.synchronize(&block)
          end
157

158
          # Test if the queue currently contains any elements.
159 160 161
          def any?
            !@queue.empty?
          end
162

163 164 165 166
          # A thread can remove an element from the queue without
          # waiting if and only if the number of currently available
          # connections is strictly greater than the number of waiting
          # threads.
167 168 169
          def can_remove_no_wait?
            @queue.size > @num_waiting
          end
170

171
          # Removes and returns the head of the queue if possible, or +nil+.
172
          def remove
173
            @queue.pop
174
          end
175

176 177
          # Remove and return the head the queue if the number of
          # available elements is strictly greater than the number of
178
          # threads currently waiting.  Otherwise, return +nil+.
179 180 181
          def no_wait_poll
            remove if can_remove_no_wait?
          end
182

183 184
          # Waits on the queue up to +timeout+ seconds, then removes and
          # returns the head of the queue.
185 186
          def wait_poll(timeout)
            @num_waiting += 1
187

B
bogdanvlviv 已提交
188
            t0 = Concurrent.monotonic_time
189 190
            elapsed = 0
            loop do
191 192 193
              ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
                @cond.wait(timeout - elapsed)
              end
194

195
              return remove if any?
196

B
bogdanvlviv 已提交
197
              elapsed = Concurrent.monotonic_time - t0
198 199 200 201 202
              if elapsed >= timeout
                msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
                  [timeout, elapsed]
                raise ConnectionTimeoutError, msg
              end
203
            end
204 205
          ensure
            @num_waiting -= 1
206 207 208
          end
      end

209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
      # Adds the ability to turn a basic fair FIFO queue into one
      # biased to some thread.
      module BiasableQueue # :nodoc:
        class BiasedConditionVariable # :nodoc:
          # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
          # +signal+ and +wait+ methods are only called while holding a lock
          def initialize(lock, other_cond, preferred_thread)
            @real_cond = lock.new_cond
            @other_cond = other_cond
            @preferred_thread = preferred_thread
            @num_waiting_on_real_cond = 0
          end

          def broadcast
            broadcast_on_biased
            @other_cond.broadcast
          end

          def broadcast_on_biased
            @num_waiting_on_real_cond = 0
            @real_cond.broadcast
          end

          def signal
            if @num_waiting_on_real_cond > 0
              @num_waiting_on_real_cond -= 1
              @real_cond
            else
              @other_cond
            end.signal
          end

          def wait(timeout)
            if Thread.current == @preferred_thread
              @num_waiting_on_real_cond += 1
              @real_cond
            else
              @other_cond
            end.wait(timeout)
          end
        end

        def with_a_bias_for(thread)
          previous_cond = nil
          new_cond      = nil
          synchronize do
            previous_cond = @cond
            @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
          end
          yield
        ensure
          synchronize do
            @cond = previous_cond if previous_cond
            new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
          end
        end
      end

267 268 269
      # Connections must be leased while holding the main pool mutex. This is
      # an internal subclass that also +.leases+ returned connections while
      # still in queue's critical section (queue synchronizes with the same
270
      # <tt>@lock</tt> as the main pool) so that a returned connection is already
271 272
      # leased and there is no need to re-enter synchronized block.
      class ConnectionLeasingQueue < Queue # :nodoc:
273 274
        include BiasableQueue

275
        private
276 277 278 279 280
          def internal_poll(timeout)
            conn = super
            conn.lease if conn
            conn
          end
281 282
      end

M
Matthew Draper 已提交
283 284 285
      # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on
      # +pool+. A reaper instantiated with a zero frequency will never reap
      # the connection pool.
286
      #
M
Matthew Draper 已提交
287 288
      # Configure the frequency by setting +reaping_frequency+ in your database
      # yaml file (default 60 seconds).
289 290 291 292 293 294 295 296
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

297
        def run
M
Matthew Draper 已提交
298
          return unless frequency && frequency > 0
299
          Thread.new(frequency, pool) { |t, p|
300
            loop do
301 302
              sleep t
              p.reap
M
Matthew Draper 已提交
303
              p.flush
304 305 306 307 308
            end
          }
        end
      end

309
      include MonitorMixin
310
      include QueryCache::ConnectionPoolConfiguration
311

312
      attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
313
      attr_reader :spec, :connections, :size, :reaper
314

P
Pratik Naik 已提交
315 316 317 318 319 320
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
321
      def initialize(spec)
322 323
        super()

N
Nick 已提交
324
        @spec = spec
325

326
        @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
M
Matthew Draper 已提交
327 328 329 330
        if @idle_timeout = spec.config.fetch(:idle_timeout, 300)
          @idle_timeout = @idle_timeout.to_f
          @idle_timeout = nil if @idle_timeout <= 0
        end
331

332 333
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
334

335 336 337 338
        # This variable tracks the cache of threads mapped to reserved connections, with the
        # sole purpose of speeding up the +connection+ method. It is not the authoritative
        # registry of which thread owns which connection. Connection ownership is tracked by
        # the +connection.owner+ attr on each +connection+ instance.
339
        # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
340 341
        # then that +thread+ does indeed own that +conn+. However, an absence of a such
        # mapping does not mean that the +thread+ doesn't own the said connection. In
342
        # that case +conn.owner+ attr should be consulted.
343
        # Access and modification of <tt>@thread_cached_conns</tt> does not require
344
        # synchronization.
345
        @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
346

347
        @connections         = []
348
        @automatic_reconnect = true
349

350
        # Connection pool allows for concurrent (outside the main +synchronize+ section)
351 352 353 354
        # establishment of new connections. This variable tracks the number of threads
        # currently in the process of independently establishing connections to the DB.
        @now_connecting = 0

355
        @threads_blocking_new_connections = 0
356

357
        @available = ConnectionLeasingQueue.new self
358 359

        @lock_thread = false
360

M
Matthew Draper 已提交
361 362 363 364
        # +reaping_frequency+ is configurable mostly for historical reasons, but it could
        # also be useful if someone wants a very low +idle_timeout+.
        reaping_frequency = spec.config.fetch(:reaping_frequency, 60)
        @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f)
365
        @reaper.run
366 367 368 369 370 371 372 373
      end

      def lock_thread=(lock_thread)
        if lock_thread
          @lock_thread = Thread.current
        else
          @lock_thread = nil
        end
374 375
      end

376 377 378 379
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
380
      # held in a cache keyed by a thread.
381
      def connection
382
        @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout
N
Nick 已提交
383 384
      end

385
      # Returns true if there is an open connection being used for the current thread.
386
      #
387
      # This method only works for connections that have been obtained through
388
      # #connection or #with_connection methods. Connections obtained through
389
      # #checkout will not be detected by #active_connection?
390
      def active_connection?
391
        @thread_cached_conns[connection_cache_key(Thread.current)]
392 393
      end

394
      # Signal that the thread is finished with the current connection.
395
      # #release_connection releases the connection-thread association
396
      # and returns the connection to the pool.
397 398 399 400 401 402 403
      #
      # This method only works for connections that have been obtained through
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be automatically released.
      def release_connection(owner_thread = Thread.current)
        if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
          checkin conn
404
        end
405 406
      end

407 408
      # If a connection obtained through #connection or #with_connection methods
      # already exists yield it to the block. If no such connection
409
      # exists checkout a connection, yield it to the block, and checkin the
410
      # connection when finished.
411
      def with_connection
412 413 414 415 416
        unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
          conn = connection
          fresh_connection = true
        end
        yield conn
417
      ensure
418
        release_connection if fresh_connection
N
Nick 已提交
419 420
      end

421 422
      # Returns true if a connection has already been opened.
      def connected?
423
        synchronize { @connections.any? }
N
Nick 已提交
424 425
      end

P
Pratik Naik 已提交
426
      # Disconnects all connections in the pool, and clears the pool.
427 428
      #
      # Raises:
429
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
430
      #   connections in the pool within a timeout interval (default duration is
431
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
432 433 434 435
      def disconnect(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
436 437 438 439
              if conn.in_use?
                conn.steal!
                checkin conn
              end
440 441 442
              conn.disconnect!
            end
            @connections = []
443
            @available.clear
444
          end
N
Nick 已提交
445 446 447
        end
      end

448 449
      # Disconnects all connections in the pool, and clears the pool.
      #
450
      # The pool first tries to gain ownership of all connections. If unable to
451
      # do so within a timeout interval (default duration is
452
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool is forcefully
453
      # disconnected without any regard for other connection owning threads.
454 455 456 457
      def disconnect!
        disconnect(false)
      end

M
Matthew Draper 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
      # Discards all connections in the pool (even if they're currently
      # leased!), along with the pool itself. Any further interaction with the
      # pool (except #spec and #schema_cache) is undefined.
      #
      # See AbstractAdapter#discard!
      def discard! # :nodoc:
        synchronize do
          return if @connections.nil? # already discarded
          @connections.each do |conn|
            conn.discard!
          end
          @connections = @available = @thread_cached_conns = nil
        end
      end

473 474 475 476
      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # Raises:
477
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
478
      #   connections in the pool within a timeout interval (default duration is
479
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
480 481 482 483
      def clear_reloadable_connections(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
484 485 486 487
              if conn.in_use?
                conn.steal!
                checkin conn
              end
488 489 490
              conn.disconnect! if conn.requires_reloading?
            end
            @connections.delete_if(&:requires_reloading?)
491
            @available.clear
492
          end
493
        end
494 495 496 497 498
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
499
      # The pool first tries to gain ownership of all connections. If unable to
500
      # do so within a timeout interval (default duration is
501
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool forcefully
502 503 504 505
      # clears the cache and reloads connections without any regard for other
      # connection owning threads.
      def clear_reloadable_connections!
        clear_reloadable_connections(false)
N
Nick 已提交
506 507
      end

P
Pratik Naik 已提交
508 509 510
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
511 512 513 514 515
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
516
      # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
P
Pratik Naik 已提交
517 518 519 520
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
521
      # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
522 523
      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
N
Nick 已提交
524 525
      end

P
Pratik Naik 已提交
526 527 528 529
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
530
      # calling #checkout on this pool.
531
      def checkin(conn)
532 533 534
        conn.lock.synchronize do
          synchronize do
            remove_connection_from_thread_cache conn
535

536 537 538
            conn._run_checkin_callbacks do
              conn.expire
            end
539

540 541
            @available.add conn
          end
542
        end
N
Nick 已提交
543
      end
544

545
      # Remove a connection from the connection pool. The connection will
546 547
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
548 549
        needs_new_connection = false

550
        synchronize do
551 552
          remove_connection_from_thread_cache conn

553
          @connections.delete conn
554
          @available.delete conn
555

556
          # @available.any_waiting? => true means that prior to removing this
557 558
          # conn, the pool was at its max size (@connections.size == @size).
          # This would mean that any threads stuck waiting in the queue wouldn't
559 560 561
          # know they could checkout_new_connection, so let's do it for them.
          # Because condition-wait loop is encapsulated in the Queue class
          # (that in turn is oblivious to ConnectionPool implementation), threads
562
          # that are "stuck" there are helpless. They have no way of creating
563 564
          # new connections and are completely reliant on us feeding available
          # connections into the Queue.
565 566 567 568
          needs_new_connection = @available.any_waiting?
        end

        # This is intentionally done outside of the synchronized section as we
569 570
        # would like not to hold the main mutex while checking out new connections.
        # Thus there is some chance that needs_new_connection information is now
571
        # stale, we can live with that (bulk_make_new_connections will make
572
        # sure not to exceed the pool's @size limit).
573
        bulk_make_new_connections(1) if needs_new_connection
574 575
      end

576
      # Recover lost connections for the pool. A lost connection can occur if
577
      # a programmer forgets to checkin a connection at the end of a thread
578 579
      # or a thread dies unexpectedly.
      def reap
580 581 582
        stale_connections = synchronize do
          @connections.select do |conn|
            conn.in_use? && !conn.owner.alive?
583
          end.each do |conn|
584
            conn.steal!
585 586 587 588
          end
        end

        stale_connections.each do |conn|
589 590 591 592 593
          if conn.active?
            conn.reset!
            checkin conn
          else
            remove conn
594 595 596 597
          end
        end
      end

M
Matthew Draper 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
      # Disconnect all connections that have been idle for at least
      # +minimum_idle+ seconds. Connections currently checked out, or that were
      # checked in less than +minimum_idle+ seconds ago, are unaffected.
      def flush(minimum_idle = @idle_timeout)
        return if minimum_idle.nil?

        idle_connections = synchronize do
          @connections.select do |conn|
            !conn.in_use? && conn.seconds_idle >= minimum_idle
          end.each do |conn|
            conn.lease

            @available.delete conn
            @connections.delete conn
          end
        end

        idle_connections.each do |conn|
          conn.disconnect!
        end
      end

      # Disconnect all currently idle connections. Connections currently checked
      # out are unaffected.
      def flush!
        reap
        flush(-1)
      end

627 628 629 630
      def num_waiting_in_queue # :nodoc:
        @available.num_waiting
      end

631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
      # Return connection pool's usage statistic
      # Example:
      #
      #    ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
      def stat
        synchronize do
          {
            size: size,
            connections: @connections.size,
            busy: @connections.count { |c| c.in_use? && c.owner.alive? },
            dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
            idle: @connections.count { |c| !c.in_use? },
            waiting: num_waiting_in_queue,
            checkout_timeout: checkout_timeout
          }
        end
      end

649
      private
650 651
        #--
        # this is unfortunately not concurrent
652 653 654 655 656 657 658
        def bulk_make_new_connections(num_new_conns_needed)
          num_new_conns_needed.times do
            # try_to_checkout_new_connection will not exceed pool's @size limit
            if new_conn = try_to_checkout_new_connection
              # make the new_conn available to the starving threads stuck @available Queue
              checkin(new_conn)
            end
659 660 661
          end
        end

662 663 664 665 666
        #--
        # From the discussion on GitHub:
        #  https://github.com/rails/rails/pull/14938#commitcomment-6601951
        # This hook-in method allows for easier monkey-patching fixes needed by
        # JRuby users that use Fibers.
667 668 669
        def connection_cache_key(thread)
          thread
        end
670

671 672 673 674
        # Take control of all existing connections so a "group" action such as
        # reload/disconnect can be performed safely. It is no longer enough to
        # wrap it in +synchronize+ because some pool's actions are allowed
        # to be performed outside of the main +synchronize+ block.
675 676 677 678 679
        def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
          with_new_connections_blocked do
            attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
            yield
          end
680 681
        end

682 683 684
        def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
          collected_conns = synchronize do
            # account for our own connections
685
            @connections.select { |conn| conn.owner == Thread.current }
686
          end
687

688
          newly_checked_out = []
B
bogdanvlviv 已提交
689
          timeout_time      = Concurrent.monotonic_time + (@checkout_timeout * 2)
690

691 692 693 694
          @available.with_a_bias_for(Thread.current) do
            loop do
              synchronize do
                return if collected_conns.size == @connections.size && @now_connecting == 0
B
bogdanvlviv 已提交
695
                remaining_timeout = timeout_time - Concurrent.monotonic_time
696 697 698 699 700
                remaining_timeout = 0 if remaining_timeout < 0
                conn = checkout_for_exclusive_access(remaining_timeout)
                collected_conns   << conn
                newly_checked_out << conn
              end
701 702
            end
          end
703 704 705 706 707 708 709 710 711 712 713 714 715
        rescue ExclusiveConnectionTimeoutError
          # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
          # timeouts and are expected to just give up: we've obtained as many connections
          # as possible, note that in a case like that we don't return any of the
          # +newly_checked_out+ connections.

          if raise_on_acquisition_timeout
            release_newly_checked_out = true
            raise
          end
        rescue Exception # if something else went wrong
          # this can't be a "naked" rescue, because we have should return conns
          # even for non-StandardErrors
716 717
          release_newly_checked_out = true
          raise
718 719 720 721
        ensure
          if release_newly_checked_out && newly_checked_out
            # releasing only those conns that were checked out in this method, conns
            # checked outside this method (before it was called) are not for us to release
722
            newly_checked_out.each { |conn| checkin(conn) }
723
          end
724 725
        end

726 727
        #--
        # Must be called in a synchronize block.
728 729 730 731 732 733
        def checkout_for_exclusive_access(checkout_timeout)
          checkout(checkout_timeout)
        rescue ConnectionTimeoutError
          # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
          # rescue block, because doing so would put it outside of synchronize section, without
          # being in a critical section thread_report might become inaccurate
734
          msg = +"could not obtain ownership of all database connections in #{checkout_timeout} seconds"
735 736 737 738 739 740

          thread_report = []
          @connections.each do |conn|
            unless conn.owner == Thread.current
              thread_report << "#{conn} is owned by #{conn.owner}"
            end
741 742
          end

743
          msg << " (#{thread_report.join(', ')})" if thread_report.any?
744

745 746
          raise ExclusiveConnectionTimeoutError, msg
        end
747

748 749
        def with_new_connections_blocked
          synchronize do
750
            @threads_blocking_new_connections += 1
751
          end
752

753 754
          yield
        ensure
755 756
          num_new_conns_required = 0

757 758
          synchronize do
            @threads_blocking_new_connections -= 1
759 760 761 762 763 764 765 766 767 768 769 770 771

            if @threads_blocking_new_connections.zero?
              @available.clear

              num_new_conns_required = num_waiting_in_queue

              @connections.each do |conn|
                next if conn.in_use?

                @available.add conn
                num_new_conns_required -= 1
              end
            end
772
          end
773 774

          bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
775
        end
776

777 778 779 780 781 782 783 784 785 786 787
        # Acquire a connection by one of 1) immediately removing one
        # from the queue of available connections, 2) creating a new
        # connection if the pool is not at capacity, 3) waiting on the
        # queue for a connection to become available.
        #
        # Raises:
        # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired
        #
        #--
        # Implementation detail: the connection returned by +acquire_connection+
        # will already be "+connection.lease+ -ed" to the current thread.
788
        def acquire_connection(checkout_timeout)
789
          # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to
790 791
          # +conn.lease+ the returned connection (and to do this in a +synchronized+
          # section). This is not the cleanest implementation, as ideally we would
792
          # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt>
793 794 795 796 797 798 799 800
          # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
          # of the said methods and avoid an additional +synchronize+ overhead.
          if conn = @available.poll || try_to_checkout_new_connection
            conn
          else
            reap
            @available.poll(checkout_timeout)
          end
801 802
        end

803 804
        #--
        # if owner_thread param is omitted, this must be called in synchronize block
805 806 807 808
        def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
          @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
        end
        alias_method :release, :remove_connection_from_thread_cache
809

810 811 812 813
        def new_connection
          Base.send(spec.adapter_method, spec.config).tap do |conn|
            conn.schema_cache = schema_cache.dup if schema_cache
          end
814
        end
815

816
        # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting
817 818 819 820
        # to the DB is done outside main synchronized section.
        #--
        # Implementation constraint: a newly established connection returned by this
        # method must be in the +.leased+ state.
821 822 823 824 825
        def try_to_checkout_new_connection
          # first in synchronized section check if establishing new conns is allowed
          # and increment @now_connecting, to prevent overstepping this pool's @size
          # constraint
          do_checkout = synchronize do
826
            if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
              @now_connecting += 1
            end
          end
          if do_checkout
            begin
              # if successfully incremented @now_connecting establish new connection
              # outside of synchronized section
              conn = checkout_new_connection
            ensure
              synchronize do
                if conn
                  adopt_connection(conn)
                  # returned conn needs to be already leased
                  conn.lease
                end
                @now_connecting -= 1
843 844 845 846 847
              end
            end
          end
        end

848 849 850 851
        def adopt_connection(conn)
          conn.pool = self
          @connections << conn
        end
852

853 854 855 856
        def checkout_new_connection
          raise ConnectionNotEstablished unless @automatic_reconnect
          new_connection
        end
857

858 859 860 861 862 863 864 865 866
        def checkout_and_verify(c)
          c._run_checkout_callbacks do
            c.verify!
          end
          c
        rescue
          remove c
          c.disconnect!
          raise
867
        end
868 869
    end

P
Pratik Naik 已提交
870
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
871
    # for keeping separate connection pools that connect to different databases.
P
Pratik Naik 已提交
872 873 874
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
875 876
    #   class Author < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
877
    #
878 879
    #   class BankAccount < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
880
    #
881
    #   class Book < ActiveRecord::Base
882
    #     establish_connection :library_db
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
    #   end
    #
    #   class ScaryBook < Book
    #   end
    #
    #   class GoodBook < Book
    #   end
    #
    # And a database.yml that looked like this:
    #
    #   development:
    #     database: my_application
    #     host: localhost
    #
    #   library_db:
    #     database: library
    #     host: some.library.org
    #
    # Your primary database in the development environment is "my_application"
    # but the Book model connects to a separate database called "library_db"
    # (this can even be a database on a different machine).
    #
    # Book, ScaryBook and GoodBook will all use the same connection pool to
    # "library_db" while Author, BankAccount, and any other models you create
    # will use the default connection pool to "my_application".
    #
    # The various connection pools are managed by a single instance of
    # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
    # All Active Record models use this handler to determine the connection pool that they
    # should use.
913
    #
T
Tobias Fankhänel 已提交
914
    # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge
915
    # about the model. The model needs to pass a specification name to the handler,
T
Tobias Fankhänel 已提交
916
    # in order to look up the correct connection pool.
917
    class ConnectionHandler
918
      def self.create_owner_to_pool # :nodoc:
919 920 921
        Concurrent::Map.new(initial_capacity: 2) do |h, k|
          # Discard the parent's connection pools immediately; we have no need
          # of them
922
          discard_unowned_pools(h)
923 924 925 926 927

          h[k] = Concurrent::Map.new(initial_capacity: 2)
        end
      end

M
Matthew Draper 已提交
928 929 930 931 932 933 934 935 936 937 938 939
      def self.unowned_pool_finalizer(pid_map) # :nodoc:
        lambda do |_|
          discard_unowned_pools(pid_map)
        end
      end

      def self.discard_unowned_pools(pid_map) # :nodoc:
        pid_map.each do |pid, pools|
          pools.values.compact.each(&:discard!) unless pid == Process.pid
        end
      end

J
Jon Leighton 已提交
940
      def initialize
941
        # These caches are keyed by spec.name (ConnectionSpecification#name).
942
        @owner_to_pool = ConnectionHandler.create_owner_to_pool
M
Matthew Draper 已提交
943 944 945 946

        # Backup finalizer: if the forked child never needed a pool, the above
        # early discard has not occurred
        ObjectSpace.define_finalizer self, ConnectionHandler.unowned_pool_finalizer(@owner_to_pool)
947 948
      end

949
      def connection_pool_list
950
        owner_to_pool.values.compact
951
      end
952
      alias :connection_pools :connection_pool_list
953

954 955 956
      def establish_connection(config)
        resolver = ConnectionSpecification::Resolver.new(Base.configurations)
        spec = resolver.spec(config)
957 958

        remove_connection(spec.name)
959 960 961 962 963 964

        message_bus = ActiveSupport::Notifications.instrumenter
        payload = {
          connection_id: object_id
        }
        if spec
965
          payload[:spec_name] = spec.name
966 967 968
          payload[:config] = spec.config
        end

969
        message_bus.instrument("!connection.active_record", payload) do
970 971 972 973
          owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec)
        end

        owner_to_pool[spec.name]
974 975
      end

976 977 978
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
979
        connection_pool_list.any?(&:active_connection?)
980 981
      end

982 983 984
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
985
      def clear_active_connections!
986
        connection_pool_list.each(&:release_connection)
987 988
      end

S
Sebastian Martinez 已提交
989
      # Clears the cache which maps classes.
990 991
      #
      # See ConnectionPool#clear_reloadable_connections! for details.
992
      def clear_reloadable_connections!
993
        connection_pool_list.each(&:clear_reloadable_connections!)
994 995 996
      end

      def clear_all_connections!
997
        connection_pool_list.each(&:disconnect!)
998 999
      end

M
Matthew Draper 已提交
1000 1001 1002 1003 1004 1005 1006
      # Disconnects all currently idle connections.
      #
      # See ConnectionPool#flush! for details.
      def flush_idle_connections!
        connection_pool_list.each(&:flush!)
      end

1007 1008 1009 1010
      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
1011 1012
      def retrieve_connection(spec_name) #:nodoc:
        pool = retrieve_connection_pool(spec_name)
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022

        unless pool
          # multiple database application
          if ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
            raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
          else
            raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found."
          end
        end

O
ojab 已提交
1023
        pool.connection
1024 1025
      end

1026 1027
      # Returns true if a connection that's accessible to this class has
      # already been opened.
1028
      def connected?(spec_name)
1029 1030
        pool = retrieve_connection_pool(spec_name)
        pool && pool.connected?
1031 1032 1033 1034
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
1035
      # can be used as an argument for #establish_connection, for easily
1036
      # re-establishing the connection.
1037 1038
      def remove_connection(spec_name)
        if pool = owner_to_pool.delete(spec_name)
J
Jon Leighton 已提交
1039 1040 1041 1042
          pool.automatic_reconnect = false
          pool.disconnect!
          pool.spec.config
        end
1043 1044
      end

1045
      # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool.
1046 1047
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
1048 1049
      def retrieve_connection_pool(spec_name)
        owner_to_pool.fetch(spec_name) do
J
Jon Moss 已提交
1050 1051
          # Check if a connection was previously established in an ancestor process,
          # which may have been forked.
1052
          if ancestor_pool = pool_from_any_process_for(spec_name)
J
Jon Leighton 已提交
1053 1054 1055
            # A connection was established in an ancestor process that must have
            # subsequently forked. We can't reuse the connection, but we can copy
            # the specification and establish a new connection with it.
A
Arthur Neves 已提交
1056
            establish_connection(ancestor_pool.spec.to_hash).tap do |pool|
1057 1058
              pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
            end
1059
          else
1060
            owner_to_pool[spec_name] = nil
1061
          end
A
Arthur Neves 已提交
1062 1063 1064 1065 1066
        end
      end

      private

1067 1068 1069
        def owner_to_pool
          @owner_to_pool[Process.pid]
        end
J
Jon Leighton 已提交
1070

1071
        def pool_from_any_process_for(spec_name)
1072
          owner_to_pool = @owner_to_pool.values.reverse.find { |v| v[spec_name] }
1073 1074
          owner_to_pool && owner_to_pool[spec_name]
        end
1075
    end
N
Nick 已提交
1076
  end
1077
end