connection_pool.rb 42.7 KB
Newer Older
1 2
# frozen_string_literal: true

3 4 5
require "thread"
require "concurrent/map"
require "monitor"
6
require "weakref"
7

N
Nick 已提交
8
module ActiveRecord
9
  # Raised when a connection could not be obtained within the connection
10
  # acquisition timeout period: because max connections in pool
11
  # are in use.
12 13 14
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

15
  # Raised when a pool was unable to get ahold of all its connections
16 17 18
  # to perform a "group" action such as
  # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!]
  # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!].
19 20 21
  class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
  end

N
Nick 已提交
22
  module ConnectionAdapters
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
    module AbstractPool # :nodoc:
      def get_schema_cache(connection)
        @schema_cache ||= SchemaCache.new(connection)
        @schema_cache.connection = connection
        @schema_cache
      end

      def set_schema_cache(cache)
        @schema_cache = cache
      end
    end

    class NullPool # :nodoc:
      include ConnectionAdapters::AbstractPool

      def initialize
        @schema_cache = nil
      end
    end

43
    # Connection pool base class for managing Active Record database
44 45
    # connections.
    #
P
Pratik Naik 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
61 62 63
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
64 65
    # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection]
    #    as with Active Record 2.1 and
66 67
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
68 69
    #    {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!].
    #    This will be the default behavior for Active Record when used in conjunction with
70
    #    Action Pack's request handling cycle.
71
    # 2. Manually check out a connection from the pool with
72
    #    {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for
73
    #    returning this connection to the pool when finished by calling
74 75
    #    {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin].
    # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which
76 77
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
78
    #
P
Pratik Naik 已提交
79 80 81 82 83
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
84
    # There are several connection-pooling-related options that you can add to
85 86
    # your database connection configuration:
    #
M
Matthew Draper 已提交
87 88 89 90 91 92 93
    # * +pool+: maximum number of connections the pool may manage (default 5).
    # * +idle_timeout+: number of seconds that a connection will be kept
    #   unused in the pool before it is automatically disconnected (default
    #   300 seconds). Set this to zero to keep connections forever.
    # * +checkout_timeout+: number of seconds to wait for a connection to
    #   become available before giving up and raising a timeout error (default
    #   5 seconds).
94 95 96 97
    #
    #--
    # Synchronization policy:
    # * all public methods can be called outside +synchronize+
98
    # * access to these instance variables needs to be in +synchronize+:
99 100 101 102
    #   * @connections
    #   * @now_connecting
    # * private methods that require being called in a +synchronize+ blocks
    #   are now explicitly documented
103
    class ConnectionPool
D
Devin Christensen 已提交
104
      # Threadsafe, fair, LIFO queue.  Meant to be used by ConnectionPool
105
      # with which it shares a Monitor.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

121
        # Returns the number of threads currently waiting on this
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
          end
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
            @queue.push element
            @cond.signal
          end
        end

137
        # If +element+ is in the queue, remove and return it, or +nil+.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
        # If +timeout+ is not given, remove and return the head the
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
156
        # is, don't jump ahead in line).  Otherwise, return +nil+.
157
        #
N
Neeraj Singh 已提交
158
        # If +timeout+ is given, block if there is no element
159 160 161 162
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
163
        # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
N
Neeraj Singh 已提交
164
        # becomes available within +timeout+ seconds,
165
        def poll(timeout = nil)
166
          synchronize { internal_poll(timeout) }
167 168 169
        end

        private
170 171 172
          def internal_poll(timeout)
            no_wait_poll || (timeout && wait_poll(timeout))
          end
173

174 175 176
          def synchronize(&block)
            @lock.synchronize(&block)
          end
177

178
          # Test if the queue currently contains any elements.
179 180 181
          def any?
            !@queue.empty?
          end
182

183 184 185 186
          # A thread can remove an element from the queue without
          # waiting if and only if the number of currently available
          # connections is strictly greater than the number of waiting
          # threads.
187 188 189
          def can_remove_no_wait?
            @queue.size > @num_waiting
          end
190

191
          # Removes and returns the head of the queue if possible, or +nil+.
192
          def remove
193
            @queue.pop
194
          end
195

196 197
          # Remove and return the head the queue if the number of
          # available elements is strictly greater than the number of
198
          # threads currently waiting.  Otherwise, return +nil+.
199 200 201
          def no_wait_poll
            remove if can_remove_no_wait?
          end
202

203 204
          # Waits on the queue up to +timeout+ seconds, then removes and
          # returns the head of the queue.
205 206
          def wait_poll(timeout)
            @num_waiting += 1
207

B
bogdanvlviv 已提交
208
            t0 = Concurrent.monotonic_time
209 210
            elapsed = 0
            loop do
211 212 213
              ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
                @cond.wait(timeout - elapsed)
              end
214

215
              return remove if any?
216

B
bogdanvlviv 已提交
217
              elapsed = Concurrent.monotonic_time - t0
218 219 220 221 222
              if elapsed >= timeout
                msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
                  [timeout, elapsed]
                raise ConnectionTimeoutError, msg
              end
223
            end
224 225
          ensure
            @num_waiting -= 1
226 227 228
          end
      end

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
      # Adds the ability to turn a basic fair FIFO queue into one
      # biased to some thread.
      module BiasableQueue # :nodoc:
        class BiasedConditionVariable # :nodoc:
          # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
          # +signal+ and +wait+ methods are only called while holding a lock
          def initialize(lock, other_cond, preferred_thread)
            @real_cond = lock.new_cond
            @other_cond = other_cond
            @preferred_thread = preferred_thread
            @num_waiting_on_real_cond = 0
          end

          def broadcast
            broadcast_on_biased
            @other_cond.broadcast
          end

          def broadcast_on_biased
            @num_waiting_on_real_cond = 0
            @real_cond.broadcast
          end

          def signal
            if @num_waiting_on_real_cond > 0
              @num_waiting_on_real_cond -= 1
              @real_cond
            else
              @other_cond
            end.signal
          end

          def wait(timeout)
            if Thread.current == @preferred_thread
              @num_waiting_on_real_cond += 1
              @real_cond
            else
              @other_cond
            end.wait(timeout)
          end
        end

        def with_a_bias_for(thread)
          previous_cond = nil
          new_cond      = nil
          synchronize do
            previous_cond = @cond
            @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
          end
          yield
        ensure
          synchronize do
            @cond = previous_cond if previous_cond
            new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
          end
        end
      end

287 288 289
      # Connections must be leased while holding the main pool mutex. This is
      # an internal subclass that also +.leases+ returned connections while
      # still in queue's critical section (queue synchronizes with the same
290
      # <tt>@lock</tt> as the main pool) so that a returned connection is already
291 292
      # leased and there is no need to re-enter synchronized block.
      class ConnectionLeasingQueue < Queue # :nodoc:
293 294
        include BiasableQueue

295
        private
296 297 298 299 300
          def internal_poll(timeout)
            conn = super
            conn.lease if conn
            conn
          end
301 302
      end

M
Matthew Draper 已提交
303 304 305
      # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on
      # +pool+. A reaper instantiated with a zero frequency will never reap
      # the connection pool.
306
      #
M
Matthew Draper 已提交
307 308
      # Configure the frequency by setting +reaping_frequency+ in your database
      # yaml file (default 60 seconds).
309 310 311 312 313 314 315 316
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

317 318
        @mutex = Mutex.new
        @pools = {}
319
        @threads = {}
320

321 322 323
        class << self
          def register_pool(pool, frequency) # :nodoc:
            @mutex.synchronize do
324 325
              unless @threads[frequency]&.alive?
                @threads[frequency] = spawn_thread(frequency)
326
              end
327
              @pools[frequency] ||= []
328 329 330 331 332 333
              @pools[frequency] << WeakRef.new(pool)
            end
          end

          private
            def spawn_thread(frequency)
334
              Thread.new(frequency) do |t|
335
                running = true
336
                while running
337
                  sleep t
338
                  @mutex.synchronize do
339 340 341 342
                    @pools[frequency].select! do |pool|
                      pool.weakref_alive? && !pool.discarded?
                    end

343
                    @pools[frequency].each do |p|
344 345
                      p.reap
                      p.flush
346
                    rescue WeakRef::RefError
347
                    end
348 349 350

                    if @pools[frequency].empty?
                      @pools.delete(frequency)
351
                      @threads.delete(frequency)
352 353
                      running = false
                    end
354 355 356 357 358 359
                  end
                end
              end
            end
        end

360
        def run
M
Matthew Draper 已提交
361
          return unless frequency && frequency > 0
362
          self.class.register_pool(pool, frequency)
363 364 365
        end
      end

366
      include MonitorMixin
367
      include QueryCache::ConnectionPoolConfiguration
368
      include ConnectionAdapters::AbstractPool
369

370
      attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
371
      attr_reader :spec, :size, :reaper
372

P
Pratik Naik 已提交
373 374 375 376 377 378
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
379
      def initialize(spec)
380 381
        super()

N
Nick 已提交
382
        @spec = spec
383

384
        @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
M
Matthew Draper 已提交
385 386 387 388
        if @idle_timeout = spec.config.fetch(:idle_timeout, 300)
          @idle_timeout = @idle_timeout.to_f
          @idle_timeout = nil if @idle_timeout <= 0
        end
389

390 391
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
392

393 394 395 396
        # This variable tracks the cache of threads mapped to reserved connections, with the
        # sole purpose of speeding up the +connection+ method. It is not the authoritative
        # registry of which thread owns which connection. Connection ownership is tracked by
        # the +connection.owner+ attr on each +connection+ instance.
397
        # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
398 399
        # then that +thread+ does indeed own that +conn+. However, an absence of a such
        # mapping does not mean that the +thread+ doesn't own the said connection. In
400
        # that case +conn.owner+ attr should be consulted.
401
        # Access and modification of <tt>@thread_cached_conns</tt> does not require
402
        # synchronization.
403
        @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
404

405
        @connections         = []
406
        @automatic_reconnect = true
407

408
        # Connection pool allows for concurrent (outside the main +synchronize+ section)
409 410 411 412
        # establishment of new connections. This variable tracks the number of threads
        # currently in the process of independently establishing connections to the DB.
        @now_connecting = 0

413
        @threads_blocking_new_connections = 0
414

415
        @available = ConnectionLeasingQueue.new self
416 417

        @lock_thread = false
418

M
Matthew Draper 已提交
419 420 421 422
        # +reaping_frequency+ is configurable mostly for historical reasons, but it could
        # also be useful if someone wants a very low +idle_timeout+.
        reaping_frequency = spec.config.fetch(:reaping_frequency, 60)
        @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f)
423
        @reaper.run
424 425 426 427 428 429 430 431
      end

      def lock_thread=(lock_thread)
        if lock_thread
          @lock_thread = Thread.current
        else
          @lock_thread = nil
        end
432 433
      end

434 435 436 437
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
438
      # held in a cache keyed by a thread.
439
      def connection
440
        @thread_cached_conns[connection_cache_key(current_thread)] ||= checkout
N
Nick 已提交
441 442
      end

443
      # Returns true if there is an open connection being used for the current thread.
444
      #
445
      # This method only works for connections that have been obtained through
446
      # #connection or #with_connection methods. Connections obtained through
447
      # #checkout will not be detected by #active_connection?
448
      def active_connection?
449
        @thread_cached_conns[connection_cache_key(current_thread)]
450 451
      end

452
      # Signal that the thread is finished with the current connection.
453
      # #release_connection releases the connection-thread association
454
      # and returns the connection to the pool.
455 456 457 458 459 460 461
      #
      # This method only works for connections that have been obtained through
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be automatically released.
      def release_connection(owner_thread = Thread.current)
        if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
          checkin conn
462
        end
463 464
      end

465 466
      # If a connection obtained through #connection or #with_connection methods
      # already exists yield it to the block. If no such connection
467
      # exists checkout a connection, yield it to the block, and checkin the
468
      # connection when finished.
469
      def with_connection
470 471 472 473 474
        unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
          conn = connection
          fresh_connection = true
        end
        yield conn
475
      ensure
476
        release_connection if fresh_connection
N
Nick 已提交
477 478
      end

479 480
      # Returns true if a connection has already been opened.
      def connected?
481
        synchronize { @connections.any? }
N
Nick 已提交
482 483
      end

484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
      # Returns an array containing the connections currently in the pool.
      # Access to the array does not require synchronization on the pool because
      # the array is newly created and not retained by the pool.
      #
      # However; this method bypasses the ConnectionPool's thread-safe connection
      # access pattern. A returned connection may be owned by another thread,
      # unowned, or by happen-stance owned by the calling thread.
      #
      # Calling methods on a connection without ownership is subject to the
      # thread-safety guarantees of the underlying method. Many of the methods
      # on connection adapter classes are inherently multi-thread unsafe.
      def connections
        synchronize { @connections.dup }
      end

P
Pratik Naik 已提交
499
      # Disconnects all connections in the pool, and clears the pool.
500 501
      #
      # Raises:
502
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
503
      #   connections in the pool within a timeout interval (default duration is
504
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
505 506 507 508
      def disconnect(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
509 510 511 512
              if conn.in_use?
                conn.steal!
                checkin conn
              end
513 514 515
              conn.disconnect!
            end
            @connections = []
516
            @available.clear
517
          end
N
Nick 已提交
518 519 520
        end
      end

521 522
      # Disconnects all connections in the pool, and clears the pool.
      #
523
      # The pool first tries to gain ownership of all connections. If unable to
524
      # do so within a timeout interval (default duration is
525
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool is forcefully
526
      # disconnected without any regard for other connection owning threads.
527 528 529 530
      def disconnect!
        disconnect(false)
      end

M
Matthew Draper 已提交
531 532 533 534 535 536 537
      # Discards all connections in the pool (even if they're currently
      # leased!), along with the pool itself. Any further interaction with the
      # pool (except #spec and #schema_cache) is undefined.
      #
      # See AbstractAdapter#discard!
      def discard! # :nodoc:
        synchronize do
538
          return if self.discarded?
M
Matthew Draper 已提交
539 540 541 542 543 544 545
          @connections.each do |conn|
            conn.discard!
          end
          @connections = @available = @thread_cached_conns = nil
        end
      end

546 547 548 549
      def discarded? # :nodoc:
        @connections.nil?
      end

550 551 552 553
      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # Raises:
554
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
555
      #   connections in the pool within a timeout interval (default duration is
556
      #   <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
557 558 559 560
      def clear_reloadable_connections(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
561 562 563 564
              if conn.in_use?
                conn.steal!
                checkin conn
              end
565 566 567
              conn.disconnect! if conn.requires_reloading?
            end
            @connections.delete_if(&:requires_reloading?)
568
            @available.clear
569
          end
570
        end
571 572 573 574 575
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
576
      # The pool first tries to gain ownership of all connections. If unable to
577
      # do so within a timeout interval (default duration is
578
      # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool forcefully
579 580 581 582
      # clears the cache and reloads connections without any regard for other
      # connection owning threads.
      def clear_reloadable_connections!
        clear_reloadable_connections(false)
N
Nick 已提交
583 584
      end

P
Pratik Naik 已提交
585 586 587
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
588 589 590 591 592
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
593
      # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
P
Pratik Naik 已提交
594 595 596 597
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
598
      # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
599 600
      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
N
Nick 已提交
601 602
      end

P
Pratik Naik 已提交
603 604 605 606
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
607
      # calling #checkout on this pool.
608
      def checkin(conn)
609 610 611
        conn.lock.synchronize do
          synchronize do
            remove_connection_from_thread_cache conn
612

613 614 615
            conn._run_checkin_callbacks do
              conn.expire
            end
616

617 618
            @available.add conn
          end
619
        end
N
Nick 已提交
620
      end
621

622
      # Remove a connection from the connection pool. The connection will
623 624
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
625 626
        needs_new_connection = false

627
        synchronize do
628 629
          remove_connection_from_thread_cache conn

630
          @connections.delete conn
631
          @available.delete conn
632

633
          # @available.any_waiting? => true means that prior to removing this
634 635
          # conn, the pool was at its max size (@connections.size == @size).
          # This would mean that any threads stuck waiting in the queue wouldn't
636 637 638
          # know they could checkout_new_connection, so let's do it for them.
          # Because condition-wait loop is encapsulated in the Queue class
          # (that in turn is oblivious to ConnectionPool implementation), threads
639
          # that are "stuck" there are helpless. They have no way of creating
640 641
          # new connections and are completely reliant on us feeding available
          # connections into the Queue.
642 643 644 645
          needs_new_connection = @available.any_waiting?
        end

        # This is intentionally done outside of the synchronized section as we
646 647
        # would like not to hold the main mutex while checking out new connections.
        # Thus there is some chance that needs_new_connection information is now
648
        # stale, we can live with that (bulk_make_new_connections will make
649
        # sure not to exceed the pool's @size limit).
650
        bulk_make_new_connections(1) if needs_new_connection
651 652
      end

653
      # Recover lost connections for the pool. A lost connection can occur if
654
      # a programmer forgets to checkin a connection at the end of a thread
655 656
      # or a thread dies unexpectedly.
      def reap
657
        stale_connections = synchronize do
658
          return if self.discarded?
659 660
          @connections.select do |conn|
            conn.in_use? && !conn.owner.alive?
661
          end.each do |conn|
662
            conn.steal!
663 664 665 666
          end
        end

        stale_connections.each do |conn|
667 668 669 670 671
          if conn.active?
            conn.reset!
            checkin conn
          else
            remove conn
672 673 674 675
          end
        end
      end

M
Matthew Draper 已提交
676 677 678 679 680 681 682
      # Disconnect all connections that have been idle for at least
      # +minimum_idle+ seconds. Connections currently checked out, or that were
      # checked in less than +minimum_idle+ seconds ago, are unaffected.
      def flush(minimum_idle = @idle_timeout)
        return if minimum_idle.nil?

        idle_connections = synchronize do
683
          return if self.discarded?
M
Matthew Draper 已提交
684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
          @connections.select do |conn|
            !conn.in_use? && conn.seconds_idle >= minimum_idle
          end.each do |conn|
            conn.lease

            @available.delete conn
            @connections.delete conn
          end
        end

        idle_connections.each do |conn|
          conn.disconnect!
        end
      end

      # Disconnect all currently idle connections. Connections currently checked
      # out are unaffected.
      def flush!
        reap
        flush(-1)
      end

706 707 708 709
      def num_waiting_in_queue # :nodoc:
        @available.num_waiting
      end

710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
      # Return connection pool's usage statistic
      # Example:
      #
      #    ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
      def stat
        synchronize do
          {
            size: size,
            connections: @connections.size,
            busy: @connections.count { |c| c.in_use? && c.owner.alive? },
            dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
            idle: @connections.count { |c| !c.in_use? },
            waiting: num_waiting_in_queue,
            checkout_timeout: checkout_timeout
          }
        end
      end

728
      private
729 730
        #--
        # this is unfortunately not concurrent
731 732 733 734 735 736 737
        def bulk_make_new_connections(num_new_conns_needed)
          num_new_conns_needed.times do
            # try_to_checkout_new_connection will not exceed pool's @size limit
            if new_conn = try_to_checkout_new_connection
              # make the new_conn available to the starving threads stuck @available Queue
              checkin(new_conn)
            end
738 739 740
          end
        end

741 742 743 744 745
        #--
        # From the discussion on GitHub:
        #  https://github.com/rails/rails/pull/14938#commitcomment-6601951
        # This hook-in method allows for easier monkey-patching fixes needed by
        # JRuby users that use Fibers.
746 747 748
        def connection_cache_key(thread)
          thread
        end
749

750 751 752 753
        def current_thread
          @lock_thread || Thread.current
        end

754 755 756 757
        # Take control of all existing connections so a "group" action such as
        # reload/disconnect can be performed safely. It is no longer enough to
        # wrap it in +synchronize+ because some pool's actions are allowed
        # to be performed outside of the main +synchronize+ block.
758 759 760 761 762
        def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
          with_new_connections_blocked do
            attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
            yield
          end
763 764
        end

765 766 767
        def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
          collected_conns = synchronize do
            # account for our own connections
768
            @connections.select { |conn| conn.owner == Thread.current }
769
          end
770

771
          newly_checked_out = []
B
bogdanvlviv 已提交
772
          timeout_time      = Concurrent.monotonic_time + (@checkout_timeout * 2)
773

774 775 776 777
          @available.with_a_bias_for(Thread.current) do
            loop do
              synchronize do
                return if collected_conns.size == @connections.size && @now_connecting == 0
B
bogdanvlviv 已提交
778
                remaining_timeout = timeout_time - Concurrent.monotonic_time
779 780 781 782 783
                remaining_timeout = 0 if remaining_timeout < 0
                conn = checkout_for_exclusive_access(remaining_timeout)
                collected_conns   << conn
                newly_checked_out << conn
              end
784 785
            end
          end
786 787 788 789 790 791 792 793 794 795 796 797 798
        rescue ExclusiveConnectionTimeoutError
          # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
          # timeouts and are expected to just give up: we've obtained as many connections
          # as possible, note that in a case like that we don't return any of the
          # +newly_checked_out+ connections.

          if raise_on_acquisition_timeout
            release_newly_checked_out = true
            raise
          end
        rescue Exception # if something else went wrong
          # this can't be a "naked" rescue, because we have should return conns
          # even for non-StandardErrors
799 800
          release_newly_checked_out = true
          raise
801 802 803 804
        ensure
          if release_newly_checked_out && newly_checked_out
            # releasing only those conns that were checked out in this method, conns
            # checked outside this method (before it was called) are not for us to release
805
            newly_checked_out.each { |conn| checkin(conn) }
806
          end
807 808
        end

809 810
        #--
        # Must be called in a synchronize block.
811 812 813 814 815 816
        def checkout_for_exclusive_access(checkout_timeout)
          checkout(checkout_timeout)
        rescue ConnectionTimeoutError
          # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
          # rescue block, because doing so would put it outside of synchronize section, without
          # being in a critical section thread_report might become inaccurate
817
          msg = +"could not obtain ownership of all database connections in #{checkout_timeout} seconds"
818 819 820 821 822 823

          thread_report = []
          @connections.each do |conn|
            unless conn.owner == Thread.current
              thread_report << "#{conn} is owned by #{conn.owner}"
            end
824 825
          end

826
          msg << " (#{thread_report.join(', ')})" if thread_report.any?
827

828 829
          raise ExclusiveConnectionTimeoutError, msg
        end
830

831 832
        def with_new_connections_blocked
          synchronize do
833
            @threads_blocking_new_connections += 1
834
          end
835

836 837
          yield
        ensure
838 839
          num_new_conns_required = 0

840 841
          synchronize do
            @threads_blocking_new_connections -= 1
842 843 844 845 846 847 848 849 850 851 852 853 854

            if @threads_blocking_new_connections.zero?
              @available.clear

              num_new_conns_required = num_waiting_in_queue

              @connections.each do |conn|
                next if conn.in_use?

                @available.add conn
                num_new_conns_required -= 1
              end
            end
855
          end
856 857

          bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
858
        end
859

860 861 862 863 864 865 866 867 868 869 870
        # Acquire a connection by one of 1) immediately removing one
        # from the queue of available connections, 2) creating a new
        # connection if the pool is not at capacity, 3) waiting on the
        # queue for a connection to become available.
        #
        # Raises:
        # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired
        #
        #--
        # Implementation detail: the connection returned by +acquire_connection+
        # will already be "+connection.lease+ -ed" to the current thread.
871
        def acquire_connection(checkout_timeout)
872
          # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to
873 874
          # +conn.lease+ the returned connection (and to do this in a +synchronized+
          # section). This is not the cleanest implementation, as ideally we would
875
          # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt>
876 877 878 879 880 881 882 883
          # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
          # of the said methods and avoid an additional +synchronize+ overhead.
          if conn = @available.poll || try_to_checkout_new_connection
            conn
          else
            reap
            @available.poll(checkout_timeout)
          end
884 885
        end

886 887
        #--
        # if owner_thread param is omitted, this must be called in synchronize block
888 889 890 891
        def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
          @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
        end
        alias_method :release, :remove_connection_from_thread_cache
892

893 894
        def new_connection
          Base.send(spec.adapter_method, spec.config).tap do |conn|
895
            conn.check_version
896
          end
897
        end
898

899
        # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting
900 901 902 903
        # to the DB is done outside main synchronized section.
        #--
        # Implementation constraint: a newly established connection returned by this
        # method must be in the +.leased+ state.
904 905 906 907 908
        def try_to_checkout_new_connection
          # first in synchronized section check if establishing new conns is allowed
          # and increment @now_connecting, to prevent overstepping this pool's @size
          # constraint
          do_checkout = synchronize do
909
            if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
              @now_connecting += 1
            end
          end
          if do_checkout
            begin
              # if successfully incremented @now_connecting establish new connection
              # outside of synchronized section
              conn = checkout_new_connection
            ensure
              synchronize do
                if conn
                  adopt_connection(conn)
                  # returned conn needs to be already leased
                  conn.lease
                end
                @now_connecting -= 1
926 927 928 929 930
              end
            end
          end
        end

931 932 933 934
        def adopt_connection(conn)
          conn.pool = self
          @connections << conn
        end
935

936 937 938 939
        def checkout_new_connection
          raise ConnectionNotEstablished unless @automatic_reconnect
          new_connection
        end
940

941 942 943 944 945 946 947 948 949
        def checkout_and_verify(c)
          c._run_checkout_callbacks do
            c.verify!
          end
          c
        rescue
          remove c
          c.disconnect!
          raise
950
        end
951 952
    end

P
Pratik Naik 已提交
953
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
954
    # for keeping separate connection pools that connect to different databases.
P
Pratik Naik 已提交
955 956 957
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
958 959
    #   class Author < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
960
    #
961 962
    #   class BankAccount < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
963
    #
964
    #   class Book < ActiveRecord::Base
965
    #     establish_connection :library_db
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
    #   end
    #
    #   class ScaryBook < Book
    #   end
    #
    #   class GoodBook < Book
    #   end
    #
    # And a database.yml that looked like this:
    #
    #   development:
    #     database: my_application
    #     host: localhost
    #
    #   library_db:
    #     database: library
    #     host: some.library.org
    #
    # Your primary database in the development environment is "my_application"
    # but the Book model connects to a separate database called "library_db"
    # (this can even be a database on a different machine).
    #
    # Book, ScaryBook and GoodBook will all use the same connection pool to
    # "library_db" while Author, BankAccount, and any other models you create
    # will use the default connection pool to "my_application".
    #
    # The various connection pools are managed by a single instance of
    # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
    # All Active Record models use this handler to determine the connection pool that they
    # should use.
996
    #
T
Tobias Fankhänel 已提交
997
    # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge
998
    # about the model. The model needs to pass a specification name to the handler,
T
Tobias Fankhänel 已提交
999
    # in order to look up the correct connection pool.
1000
    class ConnectionHandler
1001
      def self.create_owner_to_pool # :nodoc:
1002 1003 1004
        Concurrent::Map.new(initial_capacity: 2) do |h, k|
          # Discard the parent's connection pools immediately; we have no need
          # of them
1005
          discard_unowned_pools(h)
1006 1007 1008 1009 1010

          h[k] = Concurrent::Map.new(initial_capacity: 2)
        end
      end

M
Matthew Draper 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
      def self.unowned_pool_finalizer(pid_map) # :nodoc:
        lambda do |_|
          discard_unowned_pools(pid_map)
        end
      end

      def self.discard_unowned_pools(pid_map) # :nodoc:
        pid_map.each do |pid, pools|
          pools.values.compact.each(&:discard!) unless pid == Process.pid
        end
      end

J
Jon Leighton 已提交
1023
      def initialize
1024
        # These caches are keyed by spec.name (ConnectionSpecification#name).
1025
        @owner_to_pool = ConnectionHandler.create_owner_to_pool
M
Matthew Draper 已提交
1026 1027 1028 1029

        # Backup finalizer: if the forked child never needed a pool, the above
        # early discard has not occurred
        ObjectSpace.define_finalizer self, ConnectionHandler.unowned_pool_finalizer(@owner_to_pool)
1030 1031
      end

1032 1033 1034 1035 1036 1037 1038 1039
      def prevent_writes # :nodoc:
        Thread.current[:prevent_writes]
      end

      def prevent_writes=(prevent_writes) # :nodoc:
        Thread.current[:prevent_writes] = prevent_writes
      end

1040 1041 1042 1043 1044
      # Prevent writing to the database regardless of role.
      #
      # In some cases you may want to prevent writes to the database
      # even if you are on a database that can write. `while_preventing_writes`
      # will prevent writes to the database for the duration of the block.
1045
      def while_preventing_writes(enabled = true)
1046
        original, self.prevent_writes = self.prevent_writes, enabled
1047 1048
        yield
      ensure
1049
        self.prevent_writes = original
1050 1051
      end

1052
      def connection_pool_list
1053
        owner_to_pool.values.compact
1054
      end
1055
      alias :connection_pools :connection_pool_list
1056

1057 1058 1059
      def establish_connection(config)
        resolver = ConnectionSpecification::Resolver.new(Base.configurations)
        spec = resolver.spec(config)
1060 1061

        remove_connection(spec.name)
1062 1063 1064 1065 1066 1067

        message_bus = ActiveSupport::Notifications.instrumenter
        payload = {
          connection_id: object_id
        }
        if spec
1068
          payload[:spec_name] = spec.name
1069 1070 1071
          payload[:config] = spec.config
        end

1072
        message_bus.instrument("!connection.active_record", payload) do
1073 1074 1075 1076
          owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec)
        end

        owner_to_pool[spec.name]
1077 1078
      end

1079 1080 1081
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
1082
        connection_pool_list.any?(&:active_connection?)
1083 1084
      end

1085 1086 1087
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
1088
      def clear_active_connections!
1089
        connection_pool_list.each(&:release_connection)
1090 1091
      end

S
Sebastian Martinez 已提交
1092
      # Clears the cache which maps classes.
1093 1094
      #
      # See ConnectionPool#clear_reloadable_connections! for details.
1095
      def clear_reloadable_connections!
1096
        connection_pool_list.each(&:clear_reloadable_connections!)
1097 1098 1099
      end

      def clear_all_connections!
1100
        connection_pool_list.each(&:disconnect!)
1101 1102
      end

M
Matthew Draper 已提交
1103 1104 1105 1106 1107 1108 1109
      # Disconnects all currently idle connections.
      #
      # See ConnectionPool#flush! for details.
      def flush_idle_connections!
        connection_pool_list.each(&:flush!)
      end

1110 1111 1112 1113
      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
1114 1115
      def retrieve_connection(spec_name) #:nodoc:
        pool = retrieve_connection_pool(spec_name)
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125

        unless pool
          # multiple database application
          if ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
            raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
          else
            raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found."
          end
        end

O
ojab 已提交
1126
        pool.connection
1127 1128
      end

1129 1130
      # Returns true if a connection that's accessible to this class has
      # already been opened.
1131
      def connected?(spec_name)
1132 1133
        pool = retrieve_connection_pool(spec_name)
        pool && pool.connected?
1134 1135 1136 1137
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
1138
      # can be used as an argument for #establish_connection, for easily
1139
      # re-establishing the connection.
1140 1141
      def remove_connection(spec_name)
        if pool = owner_to_pool.delete(spec_name)
J
Jon Leighton 已提交
1142 1143 1144 1145
          pool.automatic_reconnect = false
          pool.disconnect!
          pool.spec.config
        end
1146 1147
      end

1148
      # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool.
1149 1150
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
1151 1152
      def retrieve_connection_pool(spec_name)
        owner_to_pool.fetch(spec_name) do
J
Jon Moss 已提交
1153 1154
          # Check if a connection was previously established in an ancestor process,
          # which may have been forked.
1155
          if ancestor_pool = pool_from_any_process_for(spec_name)
J
Jon Leighton 已提交
1156 1157 1158
            # A connection was established in an ancestor process that must have
            # subsequently forked. We can't reuse the connection, but we can copy
            # the specification and establish a new connection with it.
A
Arthur Neves 已提交
1159
            establish_connection(ancestor_pool.spec.to_hash).tap do |pool|
1160 1161
              pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
            end
1162
          else
1163
            owner_to_pool[spec_name] = nil
1164
          end
A
Arthur Neves 已提交
1165 1166 1167 1168
        end
      end

      private
1169 1170 1171
        def owner_to_pool
          @owner_to_pool[Process.pid]
        end
J
Jon Leighton 已提交
1172

1173
        def pool_from_any_process_for(spec_name)
1174
          owner_to_pool = @owner_to_pool.values.reverse.find { |v| v[spec_name] }
1175 1176
          owner_to_pool && owner_to_pool[spec_name]
        end
1177
    end
N
Nick 已提交
1178
  end
1179
end