connection_pool.rb 36.4 KB
Newer Older
1 2 3
require "thread"
require "concurrent/map"
require "monitor"
4

N
Nick 已提交
5
module ActiveRecord
6
  # Raised when a connection could not be obtained within the connection
7
  # acquisition timeout period: because max connections in pool
8
  # are in use.
9 10 11
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

12
  # Raised when a pool was unable to get ahold of all its connections
13 14 15
  # to perform a "group" action such as
  # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!]
  # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!].
16 17 18
  class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
  end

N
Nick 已提交
19
  module ConnectionAdapters
20
    # Connection pool base class for managing Active Record database
21 22
    # connections.
    #
P
Pratik Naik 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
38 39 40
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
41 42
    # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection]
    #    as with Active Record 2.1 and
43 44
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
45 46
    #    {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!].
    #    This will be the default behavior for Active Record when used in conjunction with
47
    #    Action Pack's request handling cycle.
48
    # 2. Manually check out a connection from the pool with
49
    #    {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for
50
    #    returning this connection to the pool when finished by calling
51 52
    #    {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin].
    # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which
53 54
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
55
    #
P
Pratik Naik 已提交
56 57 58 59 60
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
61
    # There are several connection-pooling-related options that you can add to
62 63 64
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
65
    # * +checkout_timeout+: number of seconds to block and wait for a connection
66
    #   before giving up and raising a timeout error (default 5 seconds).
67
    # * +reaping_frequency+: frequency in seconds to periodically run the
68 69 70 71
    #   Reaper, which attempts to find and recover connections from dead
    #   threads, which can occur if a programmer forgets to close a
    #   connection at the end of a thread or a thread dies unexpectedly.
    #   Regardless of this setting, the Reaper will be invoked before every
72
    #   blocking wait. (Default +nil+, which means don't schedule the Reaper).
73 74 75 76
    #
    #--
    # Synchronization policy:
    # * all public methods can be called outside +synchronize+
77
    # * access to these instance variables needs to be in +synchronize+:
78 79 80 81
    #   * @connections
    #   * @now_connecting
    # * private methods that require being called in a +synchronize+ blocks
    #   are now explicitly documented
82
    class ConnectionPool
83 84
      # Threadsafe, fair, FIFO queue.  Meant to be used by ConnectionPool
      # with which it shares a Monitor.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

100
        # Returns the number of threads currently waiting on this
101 102 103 104 105 106 107 108 109 110
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
          end
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
111
            @queue.push element
112 113 114 115
            @cond.signal
          end
        end

116
        # If +element+ is in the queue, remove and return it, or +nil+.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
        # If +timeout+ is not given, remove and return the head the
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
135
        # is, don't jump ahead in line).  Otherwise, return +nil+.
136
        #
N
Neeraj Singh 已提交
137
        # If +timeout+ is given, block if there is no element
138 139 140 141
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
142
        # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
N
Neeraj Singh 已提交
143
        # becomes available within +timeout+ seconds,
144
        def poll(timeout = nil)
145
          synchronize { internal_poll(timeout) }
146 147 148 149
        end

        private

150 151 152
          def internal_poll(timeout)
            no_wait_poll || (timeout && wait_poll(timeout))
          end
153

154 155 156
          def synchronize(&block)
            @lock.synchronize(&block)
          end
157

158
          # Test if the queue currently contains any elements.
159 160 161
          def any?
            !@queue.empty?
          end
162

163 164 165 166
          # A thread can remove an element from the queue without
          # waiting if and only if the number of currently available
          # connections is strictly greater than the number of waiting
          # threads.
167 168 169
          def can_remove_no_wait?
            @queue.size > @num_waiting
          end
170

171
          # Removes and returns the head of the queue if possible, or +nil+.
172
          def remove
173
            @queue.pop
174
          end
175

176 177
          # Remove and return the head the queue if the number of
          # available elements is strictly greater than the number of
178
          # threads currently waiting.  Otherwise, return +nil+.
179 180 181
          def no_wait_poll
            remove if can_remove_no_wait?
          end
182

183 184
          # Waits on the queue up to +timeout+ seconds, then removes and
          # returns the head of the queue.
185 186
          def wait_poll(timeout)
            @num_waiting += 1
187

188 189 190 191
            t0 = Time.now
            elapsed = 0
            loop do
              @cond.wait(timeout - elapsed)
192

193
              return remove if any?
194

195 196 197 198 199 200
              elapsed = Time.now - t0
              if elapsed >= timeout
                msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
                  [timeout, elapsed]
                raise ConnectionTimeoutError, msg
              end
201
            end
202 203
          ensure
            @num_waiting -= 1
204 205 206
          end
      end

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
      # Adds the ability to turn a basic fair FIFO queue into one
      # biased to some thread.
      module BiasableQueue # :nodoc:
        class BiasedConditionVariable # :nodoc:
          # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
          # +signal+ and +wait+ methods are only called while holding a lock
          def initialize(lock, other_cond, preferred_thread)
            @real_cond = lock.new_cond
            @other_cond = other_cond
            @preferred_thread = preferred_thread
            @num_waiting_on_real_cond = 0
          end

          def broadcast
            broadcast_on_biased
            @other_cond.broadcast
          end

          def broadcast_on_biased
            @num_waiting_on_real_cond = 0
            @real_cond.broadcast
          end

          def signal
            if @num_waiting_on_real_cond > 0
              @num_waiting_on_real_cond -= 1
              @real_cond
            else
              @other_cond
            end.signal
          end

          def wait(timeout)
            if Thread.current == @preferred_thread
              @num_waiting_on_real_cond += 1
              @real_cond
            else
              @other_cond
            end.wait(timeout)
          end
        end

        def with_a_bias_for(thread)
          previous_cond = nil
          new_cond      = nil
          synchronize do
            previous_cond = @cond
            @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
          end
          yield
        ensure
          synchronize do
            @cond = previous_cond if previous_cond
            new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
          end
        end
      end

265 266 267 268 269 270
      # Connections must be leased while holding the main pool mutex. This is
      # an internal subclass that also +.leases+ returned connections while
      # still in queue's critical section (queue synchronizes with the same
      # +@lock+ as the main pool) so that a returned connection is already
      # leased and there is no need to re-enter synchronized block.
      class ConnectionLeasingQueue < Queue # :nodoc:
271 272
        include BiasableQueue

273
        private
274 275 276 277 278
          def internal_poll(timeout)
            conn = super
            conn.lease if conn
            conn
          end
279 280
      end

281
      # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
282
      # A reaper instantiated with a +nil+ frequency will never reap the
283
      # connection pool.
284 285
      #
      # Configure the frequency by setting "reaping_frequency" in your
286
      # database yaml file.
287 288 289 290 291 292 293 294
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

295
        def run
296 297
          return unless frequency
          Thread.new(frequency, pool) { |t, p|
298
            loop do
299 300 301 302 303 304 305
              sleep t
              p.reap
            end
          }
        end
      end

306
      include MonitorMixin
307
      include QueryCache::ConnectionPoolConfiguration
308

309
      attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
310
      attr_reader :spec, :connections, :size, :reaper
311

P
Pratik Naik 已提交
312 313 314 315 316 317
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
318
      def initialize(spec)
319 320
        super()

N
Nick 已提交
321
        @spec = spec
322

323
        @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
K
korbin 已提交
324
        @reaper = Reaper.new(self, (spec.config[:reaping_frequency] && spec.config[:reaping_frequency].to_f))
325
        @reaper.run
326

327 328
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
329

330 331 332 333
        # This variable tracks the cache of threads mapped to reserved connections, with the
        # sole purpose of speeding up the +connection+ method. It is not the authoritative
        # registry of which thread owns which connection. Connection ownership is tracked by
        # the +connection.owner+ attr on each +connection+ instance.
334
        # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
335 336
        # then that +thread+ does indeed own that +conn+. However, an absence of a such
        # mapping does not mean that the +thread+ doesn't own the said connection. In
337 338 339
        # that case +conn.owner+ attr should be consulted.
        # Access and modification of +@thread_cached_conns+ does not require
        # synchronization.
340
        @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
341

342
        @connections         = []
343
        @automatic_reconnect = true
344

345
        # Connection pool allows for concurrent (outside the main +synchronize+ section)
346 347 348 349
        # establishment of new connections. This variable tracks the number of threads
        # currently in the process of independently establishing connections to the DB.
        @now_connecting = 0

350
        @threads_blocking_new_connections = 0
351

352
        @available = ConnectionLeasingQueue.new self
353 354 355 356 357 358 359 360 361 362

        @lock_thread = false
      end

      def lock_thread=(lock_thread)
        if lock_thread
          @lock_thread = Thread.current
        else
          @lock_thread = nil
        end
363 364
      end

365 366 367 368
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
369
      # held in a cache keyed by a thread.
370
      def connection
371
        @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout
N
Nick 已提交
372 373
      end

374
      # Returns true if there is an open connection being used for the current thread.
375
      #
376
      # This method only works for connections that have been obtained through
377
      # #connection or #with_connection methods. Connections obtained through
378
      # #checkout will not be detected by #active_connection?
379
      def active_connection?
380
        @thread_cached_conns[connection_cache_key(Thread.current)]
381 382
      end

383
      # Signal that the thread is finished with the current connection.
384
      # #release_connection releases the connection-thread association
385
      # and returns the connection to the pool.
386 387 388 389 390 391 392
      #
      # This method only works for connections that have been obtained through
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be automatically released.
      def release_connection(owner_thread = Thread.current)
        if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
          checkin conn
393
        end
394 395
      end

396 397
      # If a connection obtained through #connection or #with_connection methods
      # already exists yield it to the block. If no such connection
398
      # exists checkout a connection, yield it to the block, and checkin the
399
      # connection when finished.
400
      def with_connection
401 402 403 404 405
        unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
          conn = connection
          fresh_connection = true
        end
        yield conn
406
      ensure
407
        release_connection if fresh_connection
N
Nick 已提交
408 409
      end

410 411
      # Returns true if a connection has already been opened.
      def connected?
412
        synchronize { @connections.any? }
N
Nick 已提交
413 414
      end

P
Pratik Naik 已提交
415
      # Disconnects all connections in the pool, and clears the pool.
416 417
      #
      # Raises:
418
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
419
      #   connections in the pool within a timeout interval (default duration is
420
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
421 422 423 424
      def disconnect(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
425 426 427 428
              if conn.in_use?
                conn.steal!
                checkin conn
              end
429 430 431
              conn.disconnect!
            end
            @connections = []
432
            @available.clear
433
          end
N
Nick 已提交
434 435 436
        end
      end

437 438
      # Disconnects all connections in the pool, and clears the pool.
      #
439
      # The pool first tries to gain ownership of all connections. If unable to
440
      # do so within a timeout interval (default duration is
441
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool is forcefully
442
      # disconnected without any regard for other connection owning threads.
443 444 445 446 447 448 449 450
      def disconnect!
        disconnect(false)
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # Raises:
451
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
452
      #   connections in the pool within a timeout interval (default duration is
453
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
454 455 456 457
      def clear_reloadable_connections(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
458 459 460 461
              if conn.in_use?
                conn.steal!
                checkin conn
              end
462 463 464
              conn.disconnect! if conn.requires_reloading?
            end
            @connections.delete_if(&:requires_reloading?)
465
            @available.clear
466
          end
467
        end
468 469 470 471 472
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
473
      # The pool first tries to gain ownership of all connections. If unable to
474
      # do so within a timeout interval (default duration is
475
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool forcefully
476 477 478 479
      # clears the cache and reloads connections without any regard for other
      # connection owning threads.
      def clear_reloadable_connections!
        clear_reloadable_connections(false)
N
Nick 已提交
480 481
      end

P
Pratik Naik 已提交
482 483 484
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
485 486 487 488 489
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
490
      # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
P
Pratik Naik 已提交
491 492 493 494
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
495
      # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
496 497
      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
N
Nick 已提交
498 499
      end

P
Pratik Naik 已提交
500 501 502 503
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
504
      # calling #checkout on this pool.
505
      def checkin(conn)
506
        synchronize do
507
          remove_connection_from_thread_cache conn
508

509
          conn._run_checkin_callbacks do
510
            conn.expire
511
          end
512

513
          @available.add conn
514
        end
N
Nick 已提交
515
      end
516

517
      # Remove a connection from the connection pool. The connection will
518 519
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
520 521
        needs_new_connection = false

522
        synchronize do
523 524
          remove_connection_from_thread_cache conn

525
          @connections.delete conn
526
          @available.delete conn
527

528
          # @available.any_waiting? => true means that prior to removing this
529 530
          # conn, the pool was at its max size (@connections.size == @size).
          # This would mean that any threads stuck waiting in the queue wouldn't
531 532 533
          # know they could checkout_new_connection, so let's do it for them.
          # Because condition-wait loop is encapsulated in the Queue class
          # (that in turn is oblivious to ConnectionPool implementation), threads
534
          # that are "stuck" there are helpless. They have no way of creating
535 536
          # new connections and are completely reliant on us feeding available
          # connections into the Queue.
537 538 539 540
          needs_new_connection = @available.any_waiting?
        end

        # This is intentionally done outside of the synchronized section as we
541 542
        # would like not to hold the main mutex while checking out new connections.
        # Thus there is some chance that needs_new_connection information is now
543
        # stale, we can live with that (bulk_make_new_connections will make
544
        # sure not to exceed the pool's @size limit).
545
        bulk_make_new_connections(1) if needs_new_connection
546 547
      end

548
      # Recover lost connections for the pool. A lost connection can occur if
549
      # a programmer forgets to checkin a connection at the end of a thread
550 551
      # or a thread dies unexpectedly.
      def reap
552 553 554
        stale_connections = synchronize do
          @connections.select do |conn|
            conn.in_use? && !conn.owner.alive?
555
          end.each do |conn|
556
            conn.steal!
557 558 559 560
          end
        end

        stale_connections.each do |conn|
561 562 563 564 565
          if conn.active?
            conn.reset!
            checkin conn
          else
            remove conn
566 567 568 569
          end
        end
      end

570 571 572 573
      def num_waiting_in_queue # :nodoc:
        @available.num_waiting
      end

574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
      # Return connection pool's usage statistic
      # Example:
      #
      #    ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
      def stat
        synchronize do
          {
            size: size,
            connections: @connections.size,
            busy: @connections.count { |c| c.in_use? && c.owner.alive? },
            dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
            idle: @connections.count { |c| !c.in_use? },
            waiting: num_waiting_in_queue,
            checkout_timeout: checkout_timeout
          }
        end
      end

592
      private
593 594
        #--
        # this is unfortunately not concurrent
595 596 597 598 599 600 601
        def bulk_make_new_connections(num_new_conns_needed)
          num_new_conns_needed.times do
            # try_to_checkout_new_connection will not exceed pool's @size limit
            if new_conn = try_to_checkout_new_connection
              # make the new_conn available to the starving threads stuck @available Queue
              checkin(new_conn)
            end
602 603 604
          end
        end

605 606 607 608 609
        #--
        # From the discussion on GitHub:
        #  https://github.com/rails/rails/pull/14938#commitcomment-6601951
        # This hook-in method allows for easier monkey-patching fixes needed by
        # JRuby users that use Fibers.
610 611 612
        def connection_cache_key(thread)
          thread
        end
613

614 615 616 617
        # Take control of all existing connections so a "group" action such as
        # reload/disconnect can be performed safely. It is no longer enough to
        # wrap it in +synchronize+ because some pool's actions are allowed
        # to be performed outside of the main +synchronize+ block.
618 619 620 621 622
        def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
          with_new_connections_blocked do
            attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
            yield
          end
623 624
        end

625 626 627
        def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
          collected_conns = synchronize do
            # account for our own connections
628
            @connections.select { |conn| conn.owner == Thread.current }
629
          end
630

631 632
          newly_checked_out = []
          timeout_time      = Time.now + (@checkout_timeout * 2)
633

634 635 636 637 638 639 640 641 642 643
          @available.with_a_bias_for(Thread.current) do
            loop do
              synchronize do
                return if collected_conns.size == @connections.size && @now_connecting == 0
                remaining_timeout = timeout_time - Time.now
                remaining_timeout = 0 if remaining_timeout < 0
                conn = checkout_for_exclusive_access(remaining_timeout)
                collected_conns   << conn
                newly_checked_out << conn
              end
644 645
            end
          end
646 647 648 649 650 651 652 653 654 655 656 657 658
        rescue ExclusiveConnectionTimeoutError
          # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
          # timeouts and are expected to just give up: we've obtained as many connections
          # as possible, note that in a case like that we don't return any of the
          # +newly_checked_out+ connections.

          if raise_on_acquisition_timeout
            release_newly_checked_out = true
            raise
          end
        rescue Exception # if something else went wrong
          # this can't be a "naked" rescue, because we have should return conns
          # even for non-StandardErrors
659 660
          release_newly_checked_out = true
          raise
661 662 663 664
        ensure
          if release_newly_checked_out && newly_checked_out
            # releasing only those conns that were checked out in this method, conns
            # checked outside this method (before it was called) are not for us to release
665
            newly_checked_out.each { |conn| checkin(conn) }
666
          end
667 668
        end

669 670
        #--
        # Must be called in a synchronize block.
671 672 673 674 675 676 677 678 679 680 681 682 683
        def checkout_for_exclusive_access(checkout_timeout)
          checkout(checkout_timeout)
        rescue ConnectionTimeoutError
          # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
          # rescue block, because doing so would put it outside of synchronize section, without
          # being in a critical section thread_report might become inaccurate
          msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds"

          thread_report = []
          @connections.each do |conn|
            unless conn.owner == Thread.current
              thread_report << "#{conn} is owned by #{conn.owner}"
            end
684 685
          end

686
          msg << " (#{thread_report.join(', ')})" if thread_report.any?
687

688 689
          raise ExclusiveConnectionTimeoutError, msg
        end
690

691 692
        def with_new_connections_blocked
          synchronize do
693
            @threads_blocking_new_connections += 1
694
          end
695

696 697
          yield
        ensure
698 699
          num_new_conns_required = 0

700 701
          synchronize do
            @threads_blocking_new_connections -= 1
702 703 704 705 706 707 708 709 710 711 712 713 714

            if @threads_blocking_new_connections.zero?
              @available.clear

              num_new_conns_required = num_waiting_in_queue

              @connections.each do |conn|
                next if conn.in_use?

                @available.add conn
                num_new_conns_required -= 1
              end
            end
715
          end
716 717

          bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
718
        end
719

720 721 722 723 724 725 726 727 728 729 730
        # Acquire a connection by one of 1) immediately removing one
        # from the queue of available connections, 2) creating a new
        # connection if the pool is not at capacity, 3) waiting on the
        # queue for a connection to become available.
        #
        # Raises:
        # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired
        #
        #--
        # Implementation detail: the connection returned by +acquire_connection+
        # will already be "+connection.lease+ -ed" to the current thread.
731 732 733 734 735 736 737 738 739 740 741 742 743
        def acquire_connection(checkout_timeout)
          # NOTE: we rely on +@available.poll+ and +try_to_checkout_new_connection+ to
          # +conn.lease+ the returned connection (and to do this in a +synchronized+
          # section). This is not the cleanest implementation, as ideally we would
          # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to +@available.poll+
          # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
          # of the said methods and avoid an additional +synchronize+ overhead.
          if conn = @available.poll || try_to_checkout_new_connection
            conn
          else
            reap
            @available.poll(checkout_timeout)
          end
744 745
        end

746 747
        #--
        # if owner_thread param is omitted, this must be called in synchronize block
748 749 750 751
        def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
          @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
        end
        alias_method :release, :remove_connection_from_thread_cache
752

753 754 755 756
        def new_connection
          Base.send(spec.adapter_method, spec.config).tap do |conn|
            conn.schema_cache = schema_cache.dup if schema_cache
          end
757
        end
758

759 760 761 762 763
        # If the pool is not at a +@size+ limit, establish new connection. Connecting
        # to the DB is done outside main synchronized section.
        #--
        # Implementation constraint: a newly established connection returned by this
        # method must be in the +.leased+ state.
764 765 766 767 768
        def try_to_checkout_new_connection
          # first in synchronized section check if establishing new conns is allowed
          # and increment @now_connecting, to prevent overstepping this pool's @size
          # constraint
          do_checkout = synchronize do
769
            if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
              @now_connecting += 1
            end
          end
          if do_checkout
            begin
              # if successfully incremented @now_connecting establish new connection
              # outside of synchronized section
              conn = checkout_new_connection
            ensure
              synchronize do
                if conn
                  adopt_connection(conn)
                  # returned conn needs to be already leased
                  conn.lease
                end
                @now_connecting -= 1
786 787 788 789 790
              end
            end
          end
        end

791 792 793 794
        def adopt_connection(conn)
          conn.pool = self
          @connections << conn
        end
795

796 797 798 799
        def checkout_new_connection
          raise ConnectionNotEstablished unless @automatic_reconnect
          new_connection
        end
800

801 802 803 804 805 806 807 808 809
        def checkout_and_verify(c)
          c._run_checkout_callbacks do
            c.verify!
          end
          c
        rescue
          remove c
          c.disconnect!
          raise
810
        end
811 812
    end

P
Pratik Naik 已提交
813
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
814
    # for keeping separate connection pools that connect to different databases.
P
Pratik Naik 已提交
815 816 817
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
818 819
    #   class Author < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
820
    #
821 822
    #   class BankAccount < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
823
    #
824
    #   class Book < ActiveRecord::Base
825
    #     establish_connection :library_db
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
    #   end
    #
    #   class ScaryBook < Book
    #   end
    #
    #   class GoodBook < Book
    #   end
    #
    # And a database.yml that looked like this:
    #
    #   development:
    #     database: my_application
    #     host: localhost
    #
    #   library_db:
    #     database: library
    #     host: some.library.org
    #
    # Your primary database in the development environment is "my_application"
    # but the Book model connects to a separate database called "library_db"
    # (this can even be a database on a different machine).
    #
    # Book, ScaryBook and GoodBook will all use the same connection pool to
    # "library_db" while Author, BankAccount, and any other models you create
    # will use the default connection pool to "my_application".
    #
    # The various connection pools are managed by a single instance of
    # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
    # All Active Record models use this handler to determine the connection pool that they
    # should use.
856 857
    #
    # The ConnectionHandler class is not coupled with the Active models, as it has no knowlodge
858
    # about the model. The model needs to pass a specification name to the handler,
859
    # in order to lookup the correct connection pool.
860
    class ConnectionHandler
J
Jon Leighton 已提交
861
      def initialize
862
        # These caches are keyed by spec.name (ConnectionSpecification#name).
863
        @owner_to_pool = Concurrent::Map.new(initial_capacity: 2) do |h, k|
864
          h[k] = Concurrent::Map.new(initial_capacity: 2)
865
        end
866 867
      end

868
      def connection_pool_list
869
        owner_to_pool.values.compact
870
      end
871
      alias :connection_pools :connection_pool_list
872

873 874 875
      def establish_connection(config)
        resolver = ConnectionSpecification::Resolver.new(Base.configurations)
        spec = resolver.spec(config)
876 877

        remove_connection(spec.name)
878 879 880 881 882 883

        message_bus = ActiveSupport::Notifications.instrumenter
        payload = {
          connection_id: object_id
        }
        if spec
884
          payload[:spec_name] = spec.name
885 886 887
          payload[:config] = spec.config
        end

888
        message_bus.instrument("!connection.active_record", payload) do
889 890 891 892
          owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec)
        end

        owner_to_pool[spec.name]
893 894
      end

895 896 897
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
898
        connection_pool_list.any?(&:active_connection?)
899 900
      end

901 902 903
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
904
      def clear_active_connections!
905
        connection_pool_list.each(&:release_connection)
906 907
      end

S
Sebastian Martinez 已提交
908
      # Clears the cache which maps classes.
909 910
      #
      # See ConnectionPool#clear_reloadable_connections! for details.
911
      def clear_reloadable_connections!
912
        connection_pool_list.each(&:clear_reloadable_connections!)
913 914 915
      end

      def clear_all_connections!
916
        connection_pool_list.each(&:disconnect!)
917 918 919 920 921 922
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
923 924
      def retrieve_connection(spec_name) #:nodoc:
        pool = retrieve_connection_pool(spec_name)
925
        raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found." unless pool
926
        conn = pool.connection
927
        raise ConnectionNotEstablished, "No connection for '#{spec_name}' in connection pool" unless conn
928
        conn
929 930
      end

931 932
      # Returns true if a connection that's accessible to this class has
      # already been opened.
933 934
      def connected?(spec_name)
        conn = retrieve_connection_pool(spec_name)
A
Aaron Patterson 已提交
935
        conn && conn.connected?
936 937 938 939
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
940
      # can be used as an argument for #establish_connection, for easily
941
      # re-establishing the connection.
942 943
      def remove_connection(spec_name)
        if pool = owner_to_pool.delete(spec_name)
J
Jon Leighton 已提交
944 945 946 947
          pool.automatic_reconnect = false
          pool.disconnect!
          pool.spec.config
        end
948 949
      end

950
      # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool.
951 952
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
953 954
      def retrieve_connection_pool(spec_name)
        owner_to_pool.fetch(spec_name) do
J
Jon Moss 已提交
955 956
          # Check if a connection was previously established in an ancestor process,
          # which may have been forked.
957
          if ancestor_pool = pool_from_any_process_for(spec_name)
J
Jon Leighton 已提交
958 959 960
            # A connection was established in an ancestor process that must have
            # subsequently forked. We can't reuse the connection, but we can copy
            # the specification and establish a new connection with it.
A
Arthur Neves 已提交
961
            establish_connection(ancestor_pool.spec.to_hash).tap do |pool|
962 963
              pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
            end
964
          else
965
            owner_to_pool[spec_name] = nil
966
          end
A
Arthur Neves 已提交
967 968 969 970 971
        end
      end

      private

972 973 974
        def owner_to_pool
          @owner_to_pool[Process.pid]
        end
J
Jon Leighton 已提交
975

976
        def pool_from_any_process_for(spec_name)
977
          owner_to_pool = @owner_to_pool.values.reverse.find { |v| v[spec_name] }
978 979
          owner_to_pool && owner_to_pool[spec_name]
        end
980
    end
N
Nick 已提交
981
  end
982
end