connection_pool.rb 35.4 KB
Newer Older
1
require 'thread'
2
require 'concurrent'
3
require 'monitor'
4

N
Nick 已提交
5
module ActiveRecord
6
  # Raised when a connection could not be obtained within the connection
7
  # acquisition timeout period: because max connections in pool
8
  # are in use.
9 10 11
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

12 13 14 15 16 17
  # Raised when a pool was unable to get ahold of all its connections
  # to perform a "group" action such as +ConnectionPool#disconnect!+
  # or +ConnectionPool#clear_reloadable_connections!+.
  class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
  end

N
Nick 已提交
18
  module ConnectionAdapters
19
    # Connection pool base class for managing Active Record database
20 21
    # connections.
    #
P
Pratik Naik 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
37 38 39
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
40
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
41 42 43
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
44 45
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
46 47 48 49 50 51 52
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
53
    #
P
Pratik Naik 已提交
54 55 56 57 58
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
59
    # There are several connection-pooling-related options that you can add to
60 61 62
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
63
    # * +checkout_timeout+: number of seconds to block and wait for a connection
64
    #   before giving up and raising a timeout error (default 5 seconds).
65
    # * +reaping_frequency+: frequency in seconds to periodically run the
66 67 68 69 70
    #   Reaper, which attempts to find and recover connections from dead
    #   threads, which can occur if a programmer forgets to close a
    #   connection at the end of a thread or a thread dies unexpectedly.
    #   Regardless of this setting, the Reaper will be invoked before every
    #   blocking wait. (Default nil, which means don't schedule the Reaper).
71 72 73 74 75 76 77 78 79
    #
    #--
    # Synchronization policy:
    # * all public methods can be called outside +synchronize+
    # * access to these i-vars needs to be in +synchronize+:
    #   * @connections
    #   * @now_connecting
    # * private methods that require being called in a +synchronize+ blocks
    #   are now explicitly documented
80
    class ConnectionPool
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
      # Threadsafe, fair, FIFO queue.  Meant to be used by ConnectionPool
      # with which it shares a Monitor.  But could be a generic Queue.
      #
      # The Queue in stdlib's 'thread' could replace this class except
      # stdlib's doesn't support waiting with a timeout.
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

101
        # Returns the number of threads currently waiting on this
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
          end
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
            @queue.push element
            @cond.signal
          end
        end

        # If +element+ is in the queue, remove and return it, or nil.
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
        # If +timeout+ is not given, remove and return the head the
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
        # is, don't jump ahead in line).  Otherwise, return nil.
        #
N
Neeraj Singh 已提交
138
        # If +timeout+ is given, block if there is no element
139 140 141 142 143
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
        # - ConnectionTimeoutError if +timeout+ is given and no element
N
Neeraj Singh 已提交
144
        # becomes available within +timeout+ seconds,
145
        def poll(timeout = nil)
146
          synchronize { internal_poll(timeout) }
147 148 149 150
        end

        private

151 152 153 154
        def internal_poll(timeout)
          no_wait_poll || (timeout && wait_poll(timeout))
        end

155 156 157 158 159 160 161 162 163 164
        def synchronize(&block)
          @lock.synchronize(&block)
        end

        # Test if the queue currently contains any elements.
        def any?
          !@queue.empty?
        end

        # A thread can remove an element from the queue without
N
Neeraj Singh 已提交
165
        # waiting if and only if the number of currently available
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
        # connections is strictly greater than the number of waiting
        # threads.
        def can_remove_no_wait?
          @queue.size > @num_waiting
        end

        # Removes and returns the head of the queue if possible, or nil.
        def remove
          @queue.shift
        end

        # Remove and return the head the queue if the number of
        # available elements is strictly greater than the number of
        # threads currently waiting.  Otherwise, return nil.
        def no_wait_poll
          remove if can_remove_no_wait?
        end

        # Waits on the queue up to +timeout+ seconds, then removes and
        # returns the head of the queue.
        def wait_poll(timeout)
          @num_waiting += 1

          t0 = Time.now
          elapsed = 0
          loop do
            @cond.wait(timeout - elapsed)

            return remove if any?

            elapsed = Time.now - t0
197 198 199 200 201
            if elapsed >= timeout
              msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
                [timeout, elapsed]
              raise ConnectionTimeoutError, msg
            end
202 203 204 205 206 207
          end
        ensure
          @num_waiting -= 1
        end
      end

208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
      # Adds the ability to turn a basic fair FIFO queue into one
      # biased to some thread.
      module BiasableQueue # :nodoc:
        class BiasedConditionVariable # :nodoc:
          # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
          # +signal+ and +wait+ methods are only called while holding a lock
          def initialize(lock, other_cond, preferred_thread)
            @real_cond = lock.new_cond
            @other_cond = other_cond
            @preferred_thread = preferred_thread
            @num_waiting_on_real_cond = 0
          end

          def broadcast
            broadcast_on_biased
            @other_cond.broadcast
          end

          def broadcast_on_biased
            @num_waiting_on_real_cond = 0
            @real_cond.broadcast
          end

          def signal
            if @num_waiting_on_real_cond > 0
              @num_waiting_on_real_cond -= 1
              @real_cond
            else
              @other_cond
            end.signal
          end

          def wait(timeout)
            if Thread.current == @preferred_thread
              @num_waiting_on_real_cond += 1
              @real_cond
            else
              @other_cond
            end.wait(timeout)
          end
        end

        def with_a_bias_for(thread)
          previous_cond = nil
          new_cond      = nil
          synchronize do
            previous_cond = @cond
            @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
          end
          yield
        ensure
          synchronize do
            @cond = previous_cond if previous_cond
            new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
          end
        end
      end

266 267 268 269 270 271
      # Connections must be leased while holding the main pool mutex. This is
      # an internal subclass that also +.leases+ returned connections while
      # still in queue's critical section (queue synchronizes with the same
      # +@lock+ as the main pool) so that a returned connection is already
      # leased and there is no need to re-enter synchronized block.
      class ConnectionLeasingQueue < Queue # :nodoc:
272 273
        include BiasableQueue

274 275 276 277 278 279 280 281
        private
        def internal_poll(timeout)
          conn = super
          conn.lease if conn
          conn
        end
      end

282 283 284
      # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
      # A reaper instantiated with a nil frequency will never reap the
      # connection pool.
285 286
      #
      # Configure the frequency by setting "reaping_frequency" in your
287
      # database yaml file.
288 289 290 291 292 293 294 295
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

296
        def run
297 298 299 300 301 302 303 304 305 306
          return unless frequency
          Thread.new(frequency, pool) { |t, p|
            while true
              sleep t
              p.reap
            end
          }
        end
      end

307 308
      include MonitorMixin

309
      attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
310
      attr_reader :spec, :connections, :size, :reaper
311

P
Pratik Naik 已提交
312 313 314 315 316 317
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
318
      def initialize(spec)
319 320
        super()

N
Nick 已提交
321
        @spec = spec
322

323
        @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
K
korbin 已提交
324
        @reaper = Reaper.new(self, (spec.config[:reaping_frequency] && spec.config[:reaping_frequency].to_f))
325
        @reaper.run
326

327 328
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
329

330 331 332 333
        # The cache of threads mapped to reserved connections, the sole purpose
        # of the cache is to speed-up +connection+ method, it is not the authoritative
        # registry of which thread owns which connection, that is tracked by
        # +connection.owner+ attr on each +connection+ instance.
334
        # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
335 336 337 338 339
        # then that +thread+ does indeed own that +conn+, however an absence of a such
        # mapping does not mean that the +thread+ doesn't own the said connection, in
        # that case +conn.owner+ attr should be consulted.
        # Access and modification of +@thread_cached_conns+ does not require
        # synchronization.
340
        @thread_cached_conns = Concurrent::Map.new(:initial_capacity => @size)
341

342
        @connections         = []
343
        @automatic_reconnect = true
344

345
        # Connection pool allows for concurrent (outside the main +synchronize+ section)
346 347 348 349
        # establishment of new connections. This variable tracks the number of threads
        # currently in the process of independently establishing connections to the DB.
        @now_connecting = 0

350 351 352
        # A boolean toggle that allows/disallows new connections.
        @new_cons_enabled = true

353
        @available = ConnectionLeasingQueue.new self
354 355
      end

356 357 358 359
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
360
      # held in a cache keyed by a thread.
361
      def connection
362
        @thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout
N
Nick 已提交
363 364
      end

365
      # Is there an open connection that is being used for the current thread?
366
      #
367
      # This method only works for connections that have been obtained through
368 369
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be detected by #active_connection?
370
      def active_connection?
371
        @thread_cached_conns[connection_cache_key(Thread.current)]
372 373
      end

374
      # Signal that the thread is finished with the current connection.
375
      # #release_connection releases the connection-thread association
376
      # and returns the connection to the pool.
377 378 379 380 381 382 383
      #
      # This method only works for connections that have been obtained through
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be automatically released.
      def release_connection(owner_thread = Thread.current)
        if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
          checkin conn
384
        end
385 386
      end

387 388
      # If a connection obtained through #connection or #with_connection methods
      # already exists yield it to the block. If no such connection
389
      # exists checkout a connection, yield it to the block, and checkin the
390
      # connection when finished.
391
      def with_connection
392 393 394 395 396
        unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
          conn = connection
          fresh_connection = true
        end
        yield conn
397
      ensure
398
        release_connection if fresh_connection
N
Nick 已提交
399 400
      end

401 402
      # Returns true if a connection has already been opened.
      def connected?
403
        synchronize { @connections.any? }
N
Nick 已提交
404 405
      end

P
Pratik Naik 已提交
406
      # Disconnects all connections in the pool, and clears the pool.
407 408 409 410
      #
      # Raises:
      # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
      #   connections in the pool within a timeout interval (default duration is
411
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
412 413 414 415 416 417 418 419 420
      def disconnect(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
              checkin conn
              conn.disconnect!
            end
            @connections = []
            @available.clear
421
          end
N
Nick 已提交
422 423 424
        end
      end

425 426 427 428
      # Disconnects all connections in the pool, and clears the pool.
      #
      # The pool first tries to gain ownership of all connections, if unable to
      # do so within a timeout interval (default duration is
429
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), the pool is forcefully
430
      # disconnected without any regard for other connection owning threads.
431 432 433 434 435 436 437 438 439 440
      def disconnect!
        disconnect(false)
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # Raises:
      # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
      #   connections in the pool within a timeout interval (default duration is
441
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
      def clear_reloadable_connections(raise_on_acquisition_timeout = true)
        num_new_conns_required = 0

        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
              checkin conn
              conn.disconnect! if conn.requires_reloading?
            end
            @connections.delete_if(&:requires_reloading?)

            @available.clear

            if @connections.size < @size
              # because of the pruning done by this method, we might be running
              # low on connections, while threads stuck in queue are helpless
              # (not being able to establish new connections for themselves),
              # see also more detailed explanation in +remove+
              num_new_conns_required = num_waiting_in_queue - @connections.size
            end

            @connections.each do |conn|
              @available.add conn
            end
466
          end
467
        end
468 469 470 471 472 473 474 475 476

        bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # The pool first tries to gain ownership of all connections, if unable to
      # do so within a timeout interval (default duration is
477
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), the pool forcefully
478 479 480 481
      # clears the cache and reloads connections without any regard for other
      # connection owning threads.
      def clear_reloadable_connections!
        clear_reloadable_connections(false)
N
Nick 已提交
482 483
      end

P
Pratik Naik 已提交
484 485 486
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
487 488 489 490 491
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
492
      # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
P
Pratik Naik 已提交
493 494 495 496
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
497
      # - ConnectionTimeoutError: no connection can be obtained from the pool.
498 499
      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
N
Nick 已提交
500 501
      end

P
Pratik Naik 已提交
502 503 504 505 506
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
507
      def checkin(conn)
508
        synchronize do
509
          remove_connection_from_thread_cache conn
510

511
          conn._run_checkin_callbacks do
512
            conn.expire
513
          end
514

515
          @available.add conn
516
        end
N
Nick 已提交
517
      end
518

519 520 521
      # Remove a connection from the connection pool.  The connection will
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
522 523
        needs_new_connection = false

524
        synchronize do
525 526
          remove_connection_from_thread_cache conn

527
          @connections.delete conn
528
          @available.delete conn
529

530 531 532 533 534 535 536 537 538
          # @available.any_waiting? => true means that prior to removing this
          # conn, the pool was at its max size (@connections.size == @size)
          # this would mean that any threads stuck waiting in the queue wouldn't
          # know they could checkout_new_connection, so let's do it for them.
          # Because condition-wait loop is encapsulated in the Queue class
          # (that in turn is oblivious to ConnectionPool implementation), threads
          # that are "stuck" there are helpless, they have no way of creating
          # new connections and are completely reliant on us feeding available
          # connections into the Queue.
539 540 541 542 543 544
          needs_new_connection = @available.any_waiting?
        end

        # This is intentionally done outside of the synchronized section as we
        # would like not to hold the main mutex while checking out new connections,
        # thus there is some chance that needs_new_connection information is now
545
        # stale, we can live with that (bulk_make_new_connections will make
546
        # sure not to exceed the pool's @size limit).
547
        bulk_make_new_connections(1) if needs_new_connection
548 549
      end

550 551
      # Recover lost connections for the pool.  A lost connection can occur if
      # a programmer forgets to checkin a connection at the end of a thread
552 553
      # or a thread dies unexpectedly.
      def reap
554 555 556 557 558 559 560 561 562 563 564 565
        stale_connections = synchronize do
          @connections.select do |conn|
            conn.in_use? && !conn.owner.alive?
          end
        end

        stale_connections.each do |conn|
          synchronize do
            if conn.active?
              conn.reset!
              checkin conn
            else
566 567
              remove conn
            end
568 569 570 571
          end
        end
      end

572 573 574 575
      def num_waiting_in_queue # :nodoc:
        @available.num_waiting
      end

576
      private
577 578 579 580 581 582 583 584 585 586 587 588 589
      #--
      # this is unfortunately not concurrent
      def bulk_make_new_connections(num_new_conns_needed)
        num_new_conns_needed.times do
          # try_to_checkout_new_connection will not exceed pool's @size limit
          if new_conn = try_to_checkout_new_connection
            # make the new_conn available to the starving threads stuck @available Queue
            checkin(new_conn)
          end
        end
      end

      #--
A
Akira Matsuda 已提交
590
      # From the discussion on GitHub:
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
      #  https://github.com/rails/rails/pull/14938#commitcomment-6601951
      # This hook-in method allows for easier monkey-patching fixes needed by
      # JRuby users that use Fibers.
      def connection_cache_key(thread)
        thread
      end

      # Take control of all existing connections so a "group" action such as
      # reload/disconnect can be performed safely. It is no longer enough to
      # wrap it in +synchronize+ because some pool's actions are allowed
      # to be performed outside of the main +synchronize+ block.
      def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
        with_new_connections_blocked do
          attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
          yield
        end
      end

      def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
        collected_conns = synchronize do
          # account for our own connections
          @connections.select {|conn| conn.owner == Thread.current}
        end

        newly_checked_out = []
        timeout_time      = Time.now + (@checkout_timeout * 2)

        @available.with_a_bias_for(Thread.current) do
          while true
            synchronize do
              return if collected_conns.size == @connections.size && @now_connecting == 0
              remaining_timeout = timeout_time - Time.now
              remaining_timeout = 0 if remaining_timeout < 0
              conn = checkout_for_exclusive_access(remaining_timeout)
              collected_conns   << conn
              newly_checked_out << conn
            end
          end
        end
      rescue ExclusiveConnectionTimeoutError
631
        # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
632 633
        # timeouts and are expected to just give up: we've obtained as many connections
        # as possible, note that in a case like that we don't return any of the
634
        # +newly_checked_out+ connections.
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675

        if raise_on_acquisition_timeout
          release_newly_checked_out = true
          raise
        end
      rescue Exception # if something else went wrong
        # this can't be a "naked" rescue, because we have should return conns
        # even for non-StandardErrors
        release_newly_checked_out = true
        raise
      ensure
        if release_newly_checked_out && newly_checked_out
          # releasing only those conns that were checked out in this method, conns
          # checked outside this method (before it was called) are not for us to release
          newly_checked_out.each {|conn| checkin(conn)}
        end
      end

      #--
      # Must be called in a synchronize block.
      def checkout_for_exclusive_access(checkout_timeout)
        checkout(checkout_timeout)
      rescue ConnectionTimeoutError
        # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
        # rescue block, because doing so would put it outside of synchronize section, without
        # being in a critical section thread_report might become inaccurate
        msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds"

        thread_report = []
        @connections.each do |conn|
          unless conn.owner == Thread.current
            thread_report << "#{conn} is owned by #{conn.owner}"
          end
        end

        msg << " (#{thread_report.join(', ')})" if thread_report.any?

        raise ExclusiveConnectionTimeoutError, msg
      end

      def with_new_connections_blocked
676
        previous_value = nil
677 678
        synchronize do
          previous_value, @new_cons_enabled = @new_cons_enabled, false
679
        end
680 681 682
        yield
      ensure
        synchronize { @new_cons_enabled = previous_value }
683
      end
684

685 686 687 688 689 690
      # Acquire a connection by one of 1) immediately removing one
      # from the queue of available connections, 2) creating a new
      # connection if the pool is not at capacity, 3) waiting on the
      # queue for a connection to become available.
      #
      # Raises:
691
      # - ConnectionTimeoutError if a connection could not be acquired
692 693 694 695
      #
      #--
      # Implementation detail: the connection returned by +acquire_connection+
      # will already be "+connection.lease+ -ed" to the current thread.
696
      def acquire_connection(checkout_timeout)
697 698
        # NOTE: we rely on +@available.poll+ and +try_to_checkout_new_connection+ to
        # +conn.lease+ the returned connection (and to do this in a +synchronized+
699
        # section), this is not the cleanest implementation, as ideally we would
700 701 702
        # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to +@available.poll+
        # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
        # of the said methods and avoid an additional +synchronize+ overhead.
703
        if conn = @available.poll || try_to_checkout_new_connection
704 705
          conn
        else
706
          reap
707
          @available.poll(checkout_timeout)
708 709 710
        end
      end

711 712 713 714
      #--
      # if owner_thread param is omitted, this must be called in synchronize block
      def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
        @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
715
      end
716
      alias_method :release, :remove_connection_from_thread_cache
717

718
      def new_connection
719 720 721
        Base.send(spec.adapter_method, spec.config).tap do |conn|
          conn.schema_cache = schema_cache.dup if schema_cache
        end
722 723
      end

724 725 726 727 728 729 730 731 732 733
      # If the pool is not at a +@size+ limit, establish new connection. Connecting
      # to the DB is done outside main synchronized section.
      #--
      # Implementation constraint: a newly established connection returned by this
      # method must be in the +.leased+ state.
      def try_to_checkout_new_connection
        # first in synchronized section check if establishing new conns is allowed
        # and increment @now_connecting, to prevent overstepping this pool's @size
        # constraint
        do_checkout = synchronize do
734
          if @new_cons_enabled && (@connections.size + @now_connecting) < @size
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
            @now_connecting += 1
          end
        end
        if do_checkout
          begin
            # if successfully incremented @now_connecting establish new connection
            # outside of synchronized section
            conn = checkout_new_connection
          ensure
            synchronize do
              if conn
                adopt_connection(conn)
                # returned conn needs to be already leased
                conn.lease
              end
              @now_connecting -= 1
            end
          end
        end
      end

      def adopt_connection(conn)
        conn.pool = self
        @connections << conn
      end

761
      def checkout_new_connection
762
        raise ConnectionNotEstablished unless @automatic_reconnect
763
        new_connection
764 765 766
      end

      def checkout_and_verify(c)
767
        c._run_checkout_callbacks do
768 769
          c.verify!
        end
770
        c
771
      rescue
772 773
        remove c
        c.disconnect!
774
        raise
775 776 777
      end
    end

P
Pratik Naik 已提交
778
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
779
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
780 781 782 783
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
784 785
    #   class Author < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
786
    #
787 788
    #   class BankAccount < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
789
    #
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
    #   class Book < ActiveRecord::Base
    #     establish_connection "library_db"
    #   end
    #
    #   class ScaryBook < Book
    #   end
    #
    #   class GoodBook < Book
    #   end
    #
    # And a database.yml that looked like this:
    #
    #   development:
    #     database: my_application
    #     host: localhost
    #
    #   library_db:
    #     database: library
    #     host: some.library.org
    #
    # Your primary database in the development environment is "my_application"
    # but the Book model connects to a separate database called "library_db"
    # (this can even be a database on a different machine).
    #
    # Book, ScaryBook and GoodBook will all use the same connection pool to
    # "library_db" while Author, BankAccount, and any other models you create
    # will use the default connection pool to "my_application".
    #
    # The various connection pools are managed by a single instance of
    # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
    # All Active Record models use this handler to determine the connection pool that they
    # should use.
822
    class ConnectionHandler
J
Jon Leighton 已提交
823
      def initialize
824
        # These caches are keyed by klass.name, NOT klass. Keying them by klass
J
Jon Leighton 已提交
825 826
        # alone would lead to memory leaks in development mode as all previous
        # instances of the class would stay in memory.
827 828
        @owner_to_pool = Concurrent::Map.new(:initial_capacity => 2) do |h,k|
          h[k] = Concurrent::Map.new(:initial_capacity => 2)
829
        end
830 831
        @class_to_pool = Concurrent::Map.new(:initial_capacity => 2) do |h,k|
          h[k] = Concurrent::Map.new
832
        end
833 834
      end

835
      def connection_pool_list
836
        owner_to_pool.values.compact
837
      end
838
      alias :connection_pools :connection_pool_list
839

840 841
      def establish_connection(owner, spec)
        @class_to_pool.clear
D
David 已提交
842
        raise RuntimeError, "Anonymous class is not allowed." unless owner.name
J
Jon Leighton 已提交
843
        owner_to_pool[owner.name] = ConnectionAdapters::ConnectionPool.new(spec)
844 845
      end

846 847 848
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
849
        connection_pool_list.any?(&:active_connection?)
850 851
      end

852 853 854
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
855
      def clear_active_connections!
856
        connection_pool_list.each(&:release_connection)
857 858
      end

S
Sebastian Martinez 已提交
859
      # Clears the cache which maps classes.
860
      def clear_reloadable_connections!
861
        connection_pool_list.each(&:clear_reloadable_connections!)
862 863 864
      end

      def clear_all_connections!
865
        connection_pool_list.each(&:disconnect!)
866 867 868 869 870 871 872 873
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
874 875 876 877
        raise ConnectionNotEstablished, "No connection pool for #{klass}" unless pool
        conn = pool.connection
        raise ConnectionNotEstablished, "No connection for #{klass} in connection pool" unless conn
        conn
878 879
      end

880 881
      # Returns true if a connection that's accessible to this class has
      # already been opened.
882
      def connected?(klass)
883
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
884
        conn && conn.connected?
885 886 887 888 889 890
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
891
      def remove_connection(owner)
J
Jon Leighton 已提交
892
        if pool = owner_to_pool.delete(owner.name)
893
          @class_to_pool.clear
J
Jon Leighton 已提交
894 895 896 897
          pool.automatic_reconnect = false
          pool.disconnect!
          pool.spec.config
        end
898 899
      end

900 901 902 903 904
      # Retrieving the connection pool happens a lot so we cache it in @class_to_pool.
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
      #
      # Ideally we would use #fetch here, as class_to_pool[klass] may sometimes be nil.
A
Akira Matsuda 已提交
905 906 907 908
      # However, benchmarking (https://gist.github.com/jonleighton/3552829) showed that
      # #fetch is significantly slower than #[]. So in the nil case, no caching will
      # take place, but that's ok since the nil case is not the common one that we wish
      # to optimise for.
909
      def retrieve_connection_pool(klass)
J
Jon Leighton 已提交
910
        class_to_pool[klass.name] ||= begin
911 912
          until pool = pool_for(klass)
            klass = klass.superclass
J
Jon Leighton 已提交
913
            break unless klass <= Base
914
          end
915

J
Jon Leighton 已提交
916
          class_to_pool[klass.name] = pool
J
Jon Leighton 已提交
917
        end
918
      end
919 920 921

      private

922 923 924 925
      def owner_to_pool
        @owner_to_pool[Process.pid]
      end

926
      def class_to_pool
A
Aaron Patterson 已提交
927
        @class_to_pool[Process.pid]
928 929
      end

930
      def pool_for(owner)
J
Jon Leighton 已提交
931
        owner_to_pool.fetch(owner.name) {
932
          if ancestor_pool = pool_from_any_process_for(owner)
J
Jon Leighton 已提交
933 934 935
            # A connection was established in an ancestor process that must have
            # subsequently forked. We can't reuse the connection, but we can copy
            # the specification and establish a new connection with it.
936 937 938
            establish_connection(owner, ancestor_pool.spec).tap do |pool|
              pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
            end
939
          else
J
Jon Leighton 已提交
940
            owner_to_pool[owner.name] = nil
941 942 943
          end
        }
      end
J
Jon Leighton 已提交
944

945
      def pool_from_any_process_for(owner)
J
Jon Leighton 已提交
946 947
        owner_to_pool = @owner_to_pool.values.find { |v| v[owner.name] }
        owner_to_pool && owner_to_pool[owner.name]
J
Jon Leighton 已提交
948
      end
949
    end
950 951 952 953 954 955 956

    class ConnectionManagement
      def initialize(app)
        @app = app
      end

      def call(env)
957
        testing = env['rack.test']
958

959 960 961 962
        response = @app.call(env)
        response[2] = ::Rack::BodyProxy.new(response[2]) do
          ActiveRecord::Base.clear_active_connections! unless testing
        end
963

964
        response
965
      rescue Exception
966
        ActiveRecord::Base.clear_active_connections! unless testing
967
        raise
968 969
      end
    end
N
Nick 已提交
970
  end
971
end