connection_pool.rb 45.5 KB
Newer Older
1 2
# frozen_string_literal: true

3 4 5
require "thread"
require "concurrent/map"
require "monitor"
6
require "weakref"
7

N
Nick 已提交
8
module ActiveRecord
9
  # Raised when a connection could not be obtained within the connection
10
  # acquisition timeout period: because max connections in pool
11
  # are in use.
12 13 14
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

15
  # Raised when a pool was unable to get ahold of all its connections
16 17 18
  # to perform a "group" action such as
  # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!]
  # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!].
19 20 21
  class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
  end

N
Nick 已提交
22
  module ConnectionAdapters
23 24
    module AbstractPool # :nodoc:
      def get_schema_cache(connection)
J
Jean Boussier 已提交
25 26 27
        self.schema_cache ||= SchemaCache.new(connection)
        schema_cache.connection = connection
        schema_cache
28 29 30
      end

      def set_schema_cache(cache)
J
Jean Boussier 已提交
31
        self.schema_cache = cache
32 33 34 35 36 37
      end
    end

    class NullPool # :nodoc:
      include ConnectionAdapters::AbstractPool

J
Jean Boussier 已提交
38
      attr_accessor :schema_cache
39 40
    end

41
    # Connection pool base class for managing Active Record database
42 43
    # connections.
    #
P
Pratik Naik 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
59 60 61
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
62 63
    # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection]
    #    as with Active Record 2.1 and
64 65
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
66 67
    #    {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!].
    #    This will be the default behavior for Active Record when used in conjunction with
68
    #    Action Pack's request handling cycle.
69
    # 2. Manually check out a connection from the pool with
70
    #    {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for
71
    #    returning this connection to the pool when finished by calling
72 73
    #    {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin].
    # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which
74 75
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
76
    #
P
Pratik Naik 已提交
77 78 79 80 81
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
82
    # There are several connection-pooling-related options that you can add to
83 84
    # your database connection configuration:
    #
M
Matthew Draper 已提交
85 86 87 88 89 90 91
    # * +pool+: maximum number of connections the pool may manage (default 5).
    # * +idle_timeout+: number of seconds that a connection will be kept
    #   unused in the pool before it is automatically disconnected (default
    #   300 seconds). Set this to zero to keep connections forever.
    # * +checkout_timeout+: number of seconds to wait for a connection to
    #   become available before giving up and raising a timeout error (default
    #   5 seconds).
92 93 94 95
    #
    #--
    # Synchronization policy:
    # * all public methods can be called outside +synchronize+
96
    # * access to these instance variables needs to be in +synchronize+:
97 98 99 100
    #   * @connections
    #   * @now_connecting
    # * private methods that require being called in a +synchronize+ blocks
    #   are now explicitly documented
101
    class ConnectionPool
D
Devin Christensen 已提交
102
      # Threadsafe, fair, LIFO queue.  Meant to be used by ConnectionPool
103
      # with which it shares a Monitor.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

119 120 121 122 123
        # Returns the number of threads currently waiting on this
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
124
          end
125 126 127 128 129 130 131 132 133 134
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
            @queue.push element
            @cond.signal
          end
        end

135
        # If +element+ is in the queue, remove and return it, or +nil+.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
A
Wording  
Akira Matsuda 已提交
151
        # If +timeout+ is not given, remove and return the head of the
152 153
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
154
        # is, don't jump ahead in line).  Otherwise, return +nil+.
155
        #
N
Neeraj Singh 已提交
156
        # If +timeout+ is given, block if there is no element
157 158 159 160
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
161
        # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
N
Neeraj Singh 已提交
162
        # becomes available within +timeout+ seconds,
163
        def poll(timeout = nil)
164
          synchronize { internal_poll(timeout) }
165 166 167
        end

        private
168 169 170
          def internal_poll(timeout)
            no_wait_poll || (timeout && wait_poll(timeout))
          end
171

172 173 174
          def synchronize(&block)
            @lock.synchronize(&block)
          end
175

176
          # Test if the queue currently contains any elements.
177 178 179
          def any?
            !@queue.empty?
          end
180

181 182 183 184
          # A thread can remove an element from the queue without
          # waiting if and only if the number of currently available
          # connections is strictly greater than the number of waiting
          # threads.
185 186 187
          def can_remove_no_wait?
            @queue.size > @num_waiting
          end
188

189
          # Removes and returns the head of the queue if possible, or +nil+.
190
          def remove
191
            @queue.pop
192
          end
193

A
Wording  
Akira Matsuda 已提交
194
          # Remove and return the head of the queue if the number of
195
          # available elements is strictly greater than the number of
196
          # threads currently waiting.  Otherwise, return +nil+.
197 198 199
          def no_wait_poll
            remove if can_remove_no_wait?
          end
200

201 202
          # Waits on the queue up to +timeout+ seconds, then removes and
          # returns the head of the queue.
203 204
          def wait_poll(timeout)
            @num_waiting += 1
205

B
bogdanvlviv 已提交
206
            t0 = Concurrent.monotonic_time
207 208
            elapsed = 0
            loop do
209 210 211
              ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
                @cond.wait(timeout - elapsed)
              end
212

213
              return remove if any?
214

B
bogdanvlviv 已提交
215
              elapsed = Concurrent.monotonic_time - t0
216 217 218 219 220
              if elapsed >= timeout
                msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
                  [timeout, elapsed]
                raise ConnectionTimeoutError, msg
              end
221
            end
222 223
          ensure
            @num_waiting -= 1
224 225 226
          end
      end

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
      # Adds the ability to turn a basic fair FIFO queue into one
      # biased to some thread.
      module BiasableQueue # :nodoc:
        class BiasedConditionVariable # :nodoc:
          # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
          # +signal+ and +wait+ methods are only called while holding a lock
          def initialize(lock, other_cond, preferred_thread)
            @real_cond = lock.new_cond
            @other_cond = other_cond
            @preferred_thread = preferred_thread
            @num_waiting_on_real_cond = 0
          end

          def broadcast
            broadcast_on_biased
            @other_cond.broadcast
          end

          def broadcast_on_biased
            @num_waiting_on_real_cond = 0
            @real_cond.broadcast
          end

          def signal
            if @num_waiting_on_real_cond > 0
              @num_waiting_on_real_cond -= 1
              @real_cond
            else
              @other_cond
            end.signal
          end

          def wait(timeout)
            if Thread.current == @preferred_thread
              @num_waiting_on_real_cond += 1
              @real_cond
            else
              @other_cond
            end.wait(timeout)
          end
        end

        def with_a_bias_for(thread)
          previous_cond = nil
          new_cond      = nil
          synchronize do
            previous_cond = @cond
            @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
          end
          yield
        ensure
          synchronize do
            @cond = previous_cond if previous_cond
            new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
          end
        end
      end

285 286 287
      # Connections must be leased while holding the main pool mutex. This is
      # an internal subclass that also +.leases+ returned connections while
      # still in queue's critical section (queue synchronizes with the same
288
      # <tt>@lock</tt> as the main pool) so that a returned connection is already
289 290
      # leased and there is no need to re-enter synchronized block.
      class ConnectionLeasingQueue < Queue # :nodoc:
291 292
        include BiasableQueue

293
        private
294 295 296 297 298
          def internal_poll(timeout)
            conn = super
            conn.lease if conn
            conn
          end
299 300
      end

M
Matthew Draper 已提交
301 302 303
      # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on
      # +pool+. A reaper instantiated with a zero frequency will never reap
      # the connection pool.
304
      #
M
Matthew Draper 已提交
305 306
      # Configure the frequency by setting +reaping_frequency+ in your database
      # yaml file (default 60 seconds).
307 308 309 310 311 312 313 314
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

315 316
        @mutex = Mutex.new
        @pools = {}
317
        @threads = {}
318

319 320 321
        class << self
          def register_pool(pool, frequency) # :nodoc:
            @mutex.synchronize do
322 323
              unless @threads[frequency]&.alive?
                @threads[frequency] = spawn_thread(frequency)
324
              end
325
              @pools[frequency] ||= []
326 327 328 329 330 331
              @pools[frequency] << WeakRef.new(pool)
            end
          end

          private
            def spawn_thread(frequency)
332
              Thread.new(frequency) do |t|
333
                running = true
334
                while running
335
                  sleep t
336
                  @mutex.synchronize do
337 338 339 340
                    @pools[frequency].select! do |pool|
                      pool.weakref_alive? && !pool.discarded?
                    end

341
                    @pools[frequency].each do |p|
342 343
                      p.reap
                      p.flush
344
                    rescue WeakRef::RefError
345
                    end
346 347 348

                    if @pools[frequency].empty?
                      @pools.delete(frequency)
349
                      @threads.delete(frequency)
350 351
                      running = false
                    end
352 353 354 355 356 357
                  end
                end
              end
            end
        end

358
        def run
M
Matthew Draper 已提交
359
          return unless frequency && frequency > 0
360
          self.class.register_pool(pool, frequency)
361 362 363
        end
      end

364
      include MonitorMixin
365
      include QueryCache::ConnectionPoolConfiguration
366
      include ConnectionAdapters::AbstractPool
367

J
Jean Boussier 已提交
368
      attr_accessor :automatic_reconnect, :checkout_timeout
E
eileencodes 已提交
369
      attr_reader :db_config, :size, :reaper, :pool_config
370

E
eileencodes 已提交
371
      delegate :schema_cache, :schema_cache=, to: :pool_config
J
Jean Boussier 已提交
372

E
eileencodes 已提交
373
      # Creates a new ConnectionPool object. +pool_config+ is a PoolConfig
P
Pratik Naik 已提交
374 375 376 377 378
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
E
eileencodes 已提交
379
      def initialize(pool_config)
380 381
        super()

E
eileencodes 已提交
382 383
        @pool_config = pool_config
        @db_config = pool_config.db_config
384

385 386 387
        @checkout_timeout = db_config.checkout_timeout
        @idle_timeout = db_config.idle_timeout
        @size = db_config.pool
388

389 390 391 392
        # This variable tracks the cache of threads mapped to reserved connections, with the
        # sole purpose of speeding up the +connection+ method. It is not the authoritative
        # registry of which thread owns which connection. Connection ownership is tracked by
        # the +connection.owner+ attr on each +connection+ instance.
393
        # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
394 395
        # then that +thread+ does indeed own that +conn+. However, an absence of a such
        # mapping does not mean that the +thread+ doesn't own the said connection. In
396
        # that case +conn.owner+ attr should be consulted.
397
        # Access and modification of <tt>@thread_cached_conns</tt> does not require
398
        # synchronization.
399
        @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
400

401
        @connections         = []
402
        @automatic_reconnect = true
403

404
        # Connection pool allows for concurrent (outside the main +synchronize+ section)
405 406 407 408
        # establishment of new connections. This variable tracks the number of threads
        # currently in the process of independently establishing connections to the DB.
        @now_connecting = 0

409
        @threads_blocking_new_connections = 0
410

411
        @available = ConnectionLeasingQueue.new self
412 413

        @lock_thread = false
414

415
        @reaper = Reaper.new(self, db_config.reaping_frequency)
416
        @reaper.run
417 418 419 420 421 422 423 424
      end

      def lock_thread=(lock_thread)
        if lock_thread
          @lock_thread = Thread.current
        else
          @lock_thread = nil
        end
425 426
      end

427 428 429 430
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
431
      # held in a cache keyed by a thread.
432
      def connection
433
        @thread_cached_conns[connection_cache_key(current_thread)] ||= checkout
N
Nick 已提交
434 435
      end

436
      # Returns true if there is an open connection being used for the current thread.
437
      #
438
      # This method only works for connections that have been obtained through
439
      # #connection or #with_connection methods. Connections obtained through
440
      # #checkout will not be detected by #active_connection?
441
      def active_connection?
442
        @thread_cached_conns[connection_cache_key(current_thread)]
443 444
      end

445
      # Signal that the thread is finished with the current connection.
446
      # #release_connection releases the connection-thread association
447
      # and returns the connection to the pool.
448 449 450 451 452 453 454
      #
      # This method only works for connections that have been obtained through
      # #connection or #with_connection methods, connections obtained through
      # #checkout will not be automatically released.
      def release_connection(owner_thread = Thread.current)
        if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
          checkin conn
455
        end
456 457
      end

458 459
      # If a connection obtained through #connection or #with_connection methods
      # already exists yield it to the block. If no such connection
460
      # exists checkout a connection, yield it to the block, and checkin the
461
      # connection when finished.
462
      def with_connection
463 464 465 466 467
        unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
          conn = connection
          fresh_connection = true
        end
        yield conn
468
      ensure
469
        release_connection if fresh_connection
N
Nick 已提交
470 471
      end

472 473
      # Returns true if a connection has already been opened.
      def connected?
474
        synchronize { @connections.any? }
N
Nick 已提交
475 476
      end

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
      # Returns an array containing the connections currently in the pool.
      # Access to the array does not require synchronization on the pool because
      # the array is newly created and not retained by the pool.
      #
      # However; this method bypasses the ConnectionPool's thread-safe connection
      # access pattern. A returned connection may be owned by another thread,
      # unowned, or by happen-stance owned by the calling thread.
      #
      # Calling methods on a connection without ownership is subject to the
      # thread-safety guarantees of the underlying method. Many of the methods
      # on connection adapter classes are inherently multi-thread unsafe.
      def connections
        synchronize { @connections.dup }
      end

P
Pratik Naik 已提交
492
      # Disconnects all connections in the pool, and clears the pool.
493 494
      #
      # Raises:
495
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
496
      #   connections in the pool within a timeout interval (default duration is
497
      #   <tt>spec.db_config.checkout_timeout * 2</tt> seconds).
498 499 500 501
      def disconnect(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
502 503 504 505
              if conn.in_use?
                conn.steal!
                checkin conn
              end
506 507 508
              conn.disconnect!
            end
            @connections = []
509
            @available.clear
510
          end
N
Nick 已提交
511 512 513
        end
      end

514 515
      # Disconnects all connections in the pool, and clears the pool.
      #
516
      # The pool first tries to gain ownership of all connections. If unable to
517
      # do so within a timeout interval (default duration is
518
      # <tt>spec.db_config.checkout_timeout * 2</tt> seconds), then the pool is forcefully
519
      # disconnected without any regard for other connection owning threads.
520 521 522 523
      def disconnect!
        disconnect(false)
      end

M
Matthew Draper 已提交
524 525 526 527 528 529 530
      # Discards all connections in the pool (even if they're currently
      # leased!), along with the pool itself. Any further interaction with the
      # pool (except #spec and #schema_cache) is undefined.
      #
      # See AbstractAdapter#discard!
      def discard! # :nodoc:
        synchronize do
531
          return if self.discarded?
M
Matthew Draper 已提交
532 533 534 535 536 537 538
          @connections.each do |conn|
            conn.discard!
          end
          @connections = @available = @thread_cached_conns = nil
        end
      end

539 540 541 542
      def discarded? # :nodoc:
        @connections.nil?
      end

543 544 545 546
      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
      # Raises:
547
      # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
548
      #   connections in the pool within a timeout interval (default duration is
549
      #   <tt>spec.db_config.checkout_timeout * 2</tt> seconds).
550 551 552 553
      def clear_reloadable_connections(raise_on_acquisition_timeout = true)
        with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
          synchronize do
            @connections.each do |conn|
554 555 556 557
              if conn.in_use?
                conn.steal!
                checkin conn
              end
558 559 560
              conn.disconnect! if conn.requires_reloading?
            end
            @connections.delete_if(&:requires_reloading?)
561
            @available.clear
562
          end
563
        end
564 565 566 567 568
      end

      # Clears the cache which maps classes and re-connects connections that
      # require reloading.
      #
569
      # The pool first tries to gain ownership of all connections. If unable to
570
      # do so within a timeout interval (default duration is
571
      # <tt>spec.db_config.checkout_timeout * 2</tt> seconds), then the pool forcefully
572 573 574 575
      # clears the cache and reloads connections without any regard for other
      # connection owning threads.
      def clear_reloadable_connections!
        clear_reloadable_connections(false)
N
Nick 已提交
576 577
      end

P
Pratik Naik 已提交
578 579 580
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
581 582 583 584 585
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
586
      # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
P
Pratik Naik 已提交
587 588 589 590
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
591
      # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
592 593
      def checkout(checkout_timeout = @checkout_timeout)
        checkout_and_verify(acquire_connection(checkout_timeout))
N
Nick 已提交
594 595
      end

P
Pratik Naik 已提交
596 597 598 599
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
600
      # calling #checkout on this pool.
601
      def checkin(conn)
602 603 604
        conn.lock.synchronize do
          synchronize do
            remove_connection_from_thread_cache conn
605

606 607 608
            conn._run_checkin_callbacks do
              conn.expire
            end
609

610 611
            @available.add conn
          end
612
        end
N
Nick 已提交
613
      end
614

615
      # Remove a connection from the connection pool. The connection will
616 617
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
618 619
        needs_new_connection = false

620
        synchronize do
621 622
          remove_connection_from_thread_cache conn

623
          @connections.delete conn
624
          @available.delete conn
625

626
          # @available.any_waiting? => true means that prior to removing this
627 628
          # conn, the pool was at its max size (@connections.size == @size).
          # This would mean that any threads stuck waiting in the queue wouldn't
629 630 631
          # know they could checkout_new_connection, so let's do it for them.
          # Because condition-wait loop is encapsulated in the Queue class
          # (that in turn is oblivious to ConnectionPool implementation), threads
632
          # that are "stuck" there are helpless. They have no way of creating
633 634
          # new connections and are completely reliant on us feeding available
          # connections into the Queue.
635 636 637 638
          needs_new_connection = @available.any_waiting?
        end

        # This is intentionally done outside of the synchronized section as we
639 640
        # would like not to hold the main mutex while checking out new connections.
        # Thus there is some chance that needs_new_connection information is now
641
        # stale, we can live with that (bulk_make_new_connections will make
642
        # sure not to exceed the pool's @size limit).
643
        bulk_make_new_connections(1) if needs_new_connection
644 645
      end

646
      # Recover lost connections for the pool. A lost connection can occur if
647
      # a programmer forgets to checkin a connection at the end of a thread
648 649
      # or a thread dies unexpectedly.
      def reap
650
        stale_connections = synchronize do
651
          return if self.discarded?
652 653
          @connections.select do |conn|
            conn.in_use? && !conn.owner.alive?
654
          end.each do |conn|
655
            conn.steal!
656 657 658 659
          end
        end

        stale_connections.each do |conn|
660 661 662 663 664
          if conn.active?
            conn.reset!
            checkin conn
          else
            remove conn
665 666 667 668
          end
        end
      end

M
Matthew Draper 已提交
669 670 671 672 673 674 675
      # Disconnect all connections that have been idle for at least
      # +minimum_idle+ seconds. Connections currently checked out, or that were
      # checked in less than +minimum_idle+ seconds ago, are unaffected.
      def flush(minimum_idle = @idle_timeout)
        return if minimum_idle.nil?

        idle_connections = synchronize do
676
          return if self.discarded?
M
Matthew Draper 已提交
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698
          @connections.select do |conn|
            !conn.in_use? && conn.seconds_idle >= minimum_idle
          end.each do |conn|
            conn.lease

            @available.delete conn
            @connections.delete conn
          end
        end

        idle_connections.each do |conn|
          conn.disconnect!
        end
      end

      # Disconnect all currently idle connections. Connections currently checked
      # out are unaffected.
      def flush!
        reap
        flush(-1)
      end

699 700 701 702
      def num_waiting_in_queue # :nodoc:
        @available.num_waiting
      end

703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
      # Return connection pool's usage statistic
      # Example:
      #
      #    ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
      def stat
        synchronize do
          {
            size: size,
            connections: @connections.size,
            busy: @connections.count { |c| c.in_use? && c.owner.alive? },
            dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
            idle: @connections.count { |c| !c.in_use? },
            waiting: num_waiting_in_queue,
            checkout_timeout: checkout_timeout
          }
        end
      end

721
      private
722 723
        #--
        # this is unfortunately not concurrent
724 725 726 727 728 729 730
        def bulk_make_new_connections(num_new_conns_needed)
          num_new_conns_needed.times do
            # try_to_checkout_new_connection will not exceed pool's @size limit
            if new_conn = try_to_checkout_new_connection
              # make the new_conn available to the starving threads stuck @available Queue
              checkin(new_conn)
            end
731 732 733
          end
        end

734 735 736 737 738
        #--
        # From the discussion on GitHub:
        #  https://github.com/rails/rails/pull/14938#commitcomment-6601951
        # This hook-in method allows for easier monkey-patching fixes needed by
        # JRuby users that use Fibers.
739 740 741
        def connection_cache_key(thread)
          thread
        end
742

743 744 745 746
        def current_thread
          @lock_thread || Thread.current
        end

747 748 749 750
        # Take control of all existing connections so a "group" action such as
        # reload/disconnect can be performed safely. It is no longer enough to
        # wrap it in +synchronize+ because some pool's actions are allowed
        # to be performed outside of the main +synchronize+ block.
751 752 753 754 755
        def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
          with_new_connections_blocked do
            attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
            yield
          end
756 757
        end

758 759 760
        def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
          collected_conns = synchronize do
            # account for our own connections
761
            @connections.select { |conn| conn.owner == Thread.current }
762
          end
763

764
          newly_checked_out = []
B
bogdanvlviv 已提交
765
          timeout_time      = Concurrent.monotonic_time + (@checkout_timeout * 2)
766

767 768 769 770
          @available.with_a_bias_for(Thread.current) do
            loop do
              synchronize do
                return if collected_conns.size == @connections.size && @now_connecting == 0
B
bogdanvlviv 已提交
771
                remaining_timeout = timeout_time - Concurrent.monotonic_time
772 773 774 775 776
                remaining_timeout = 0 if remaining_timeout < 0
                conn = checkout_for_exclusive_access(remaining_timeout)
                collected_conns   << conn
                newly_checked_out << conn
              end
777 778
            end
          end
779 780 781 782 783 784 785 786 787 788 789 790 791
        rescue ExclusiveConnectionTimeoutError
          # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
          # timeouts and are expected to just give up: we've obtained as many connections
          # as possible, note that in a case like that we don't return any of the
          # +newly_checked_out+ connections.

          if raise_on_acquisition_timeout
            release_newly_checked_out = true
            raise
          end
        rescue Exception # if something else went wrong
          # this can't be a "naked" rescue, because we have should return conns
          # even for non-StandardErrors
792 793
          release_newly_checked_out = true
          raise
794 795 796 797
        ensure
          if release_newly_checked_out && newly_checked_out
            # releasing only those conns that were checked out in this method, conns
            # checked outside this method (before it was called) are not for us to release
798
            newly_checked_out.each { |conn| checkin(conn) }
799
          end
800 801
        end

802 803
        #--
        # Must be called in a synchronize block.
804 805 806 807 808 809
        def checkout_for_exclusive_access(checkout_timeout)
          checkout(checkout_timeout)
        rescue ConnectionTimeoutError
          # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
          # rescue block, because doing so would put it outside of synchronize section, without
          # being in a critical section thread_report might become inaccurate
810
          msg = +"could not obtain ownership of all database connections in #{checkout_timeout} seconds"
811 812 813 814 815 816

          thread_report = []
          @connections.each do |conn|
            unless conn.owner == Thread.current
              thread_report << "#{conn} is owned by #{conn.owner}"
            end
817 818
          end

819
          msg << " (#{thread_report.join(', ')})" if thread_report.any?
820

821 822
          raise ExclusiveConnectionTimeoutError, msg
        end
823

824
        def with_new_connections_blocked
825
          synchronize do
826
            @threads_blocking_new_connections += 1
827
          end
828

829 830
          yield
        ensure
831 832
          num_new_conns_required = 0

833 834
          synchronize do
            @threads_blocking_new_connections -= 1
835 836 837 838 839 840 841 842 843 844 845 846 847

            if @threads_blocking_new_connections.zero?
              @available.clear

              num_new_conns_required = num_waiting_in_queue

              @connections.each do |conn|
                next if conn.in_use?

                @available.add conn
                num_new_conns_required -= 1
              end
            end
848
          end
849 850

          bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
851
        end
852

853 854 855 856 857 858 859 860 861 862 863
        # Acquire a connection by one of 1) immediately removing one
        # from the queue of available connections, 2) creating a new
        # connection if the pool is not at capacity, 3) waiting on the
        # queue for a connection to become available.
        #
        # Raises:
        # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired
        #
        #--
        # Implementation detail: the connection returned by +acquire_connection+
        # will already be "+connection.lease+ -ed" to the current thread.
864
        def acquire_connection(checkout_timeout)
865
          # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to
866 867
          # +conn.lease+ the returned connection (and to do this in a +synchronized+
          # section). This is not the cleanest implementation, as ideally we would
868
          # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt>
869 870 871 872 873 874 875 876
          # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
          # of the said methods and avoid an additional +synchronize+ overhead.
          if conn = @available.poll || try_to_checkout_new_connection
            conn
          else
            reap
            @available.poll(checkout_timeout)
          end
877 878
        end

879 880
        #--
        # if owner_thread param is omitted, this must be called in synchronize block
881 882 883 884
        def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
          @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
        end
        alias_method :release, :remove_connection_from_thread_cache
885

886
        def new_connection
887
          Base.send(db_config.adapter_method, db_config.configuration_hash).tap do |conn|
888
            conn.check_version
889
          end
890
        end
891

892
        # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting
893 894 895 896
        # to the DB is done outside main synchronized section.
        #--
        # Implementation constraint: a newly established connection returned by this
        # method must be in the +.leased+ state.
897 898 899 900 901
        def try_to_checkout_new_connection
          # first in synchronized section check if establishing new conns is allowed
          # and increment @now_connecting, to prevent overstepping this pool's @size
          # constraint
          do_checkout = synchronize do
902
            if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
              @now_connecting += 1
            end
          end
          if do_checkout
            begin
              # if successfully incremented @now_connecting establish new connection
              # outside of synchronized section
              conn = checkout_new_connection
            ensure
              synchronize do
                if conn
                  adopt_connection(conn)
                  # returned conn needs to be already leased
                  conn.lease
                end
                @now_connecting -= 1
919 920 921 922 923
              end
            end
          end
        end

924 925 926 927
        def adopt_connection(conn)
          conn.pool = self
          @connections << conn
        end
928

929 930 931 932
        def checkout_new_connection
          raise ConnectionNotEstablished unless @automatic_reconnect
          new_connection
        end
933

934 935 936 937 938 939 940 941 942
        def checkout_and_verify(c)
          c._run_checkout_callbacks do
            c.verify!
          end
          c
        rescue
          remove c
          c.disconnect!
          raise
943
        end
944 945
    end

P
Pratik Naik 已提交
946
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
947
    # for keeping separate connection pools that connect to different databases.
P
Pratik Naik 已提交
948 949 950
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
951 952
    #   class Author < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
953
    #
954 955
    #   class BankAccount < ActiveRecord::Base
    #   end
P
Pratik Naik 已提交
956
    #
957
    #   class Book < ActiveRecord::Base
958
    #     establish_connection :library_db
959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
    #   end
    #
    #   class ScaryBook < Book
    #   end
    #
    #   class GoodBook < Book
    #   end
    #
    # And a database.yml that looked like this:
    #
    #   development:
    #     database: my_application
    #     host: localhost
    #
    #   library_db:
    #     database: library
    #     host: some.library.org
    #
    # Your primary database in the development environment is "my_application"
    # but the Book model connects to a separate database called "library_db"
    # (this can even be a database on a different machine).
    #
    # Book, ScaryBook and GoodBook will all use the same connection pool to
    # "library_db" while Author, BankAccount, and any other models you create
    # will use the default connection pool to "my_application".
    #
    # The various connection pools are managed by a single instance of
    # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
    # All Active Record models use this handler to determine the connection pool that they
    # should use.
989
    #
T
Tobias Fankhänel 已提交
990
    # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge
991
    # about the model. The model needs to pass a connection specification name to the handler,
T
Tobias Fankhänel 已提交
992
    # in order to look up the correct connection pool.
993
    class ConnectionHandler
J
Jean Boussier 已提交
994 995
      FINALIZER = lambda { |_| ActiveSupport::ForkTracker.check! }
      private_constant :FINALIZER
M
Matthew Draper 已提交
996

J
Jon Leighton 已提交
997
      def initialize
E
eileencodes 已提交
998 999
        # These caches are keyed by pool_config.connection_specification_name (PoolConfig#connection_specification_name).
        @owner_to_pool_manager = Concurrent::Map.new(initial_capacity: 2)
M
Matthew Draper 已提交
1000

J
Jean Boussier 已提交
1001 1002
        # Backup finalizer: if the forked child skipped Kernel#fork the early discard has not occurred
        ObjectSpace.define_finalizer self, FINALIZER
1003 1004
      end

1005 1006 1007 1008 1009 1010 1011 1012
      def prevent_writes # :nodoc:
        Thread.current[:prevent_writes]
      end

      def prevent_writes=(prevent_writes) # :nodoc:
        Thread.current[:prevent_writes] = prevent_writes
      end

1013 1014 1015 1016 1017
      # Prevent writing to the database regardless of role.
      #
      # In some cases you may want to prevent writes to the database
      # even if you are on a database that can write. `while_preventing_writes`
      # will prevent writes to the database for the duration of the block.
1018 1019 1020 1021 1022 1023
      #
      # This method does not provide the same protection as a readonly
      # user and is meant to be a safeguard against accidental writes.
      #
      # See `READ_QUERY` for the queries that are blocked by this
      # method.
1024
      def while_preventing_writes(enabled = true)
1025
        original, self.prevent_writes = self.prevent_writes, enabled
1026 1027
        yield
      ensure
1028
        self.prevent_writes = original
1029 1030
      end

1031
      def connection_pool_names # :nodoc:
E
eileencodes 已提交
1032
        owner_to_pool_manager.keys
1033 1034
      end

1035
      def connection_pool_list
E
eileencodes 已提交
1036
        owner_to_pool_manager.values.flat_map { |m| m.pool_configs.map(&:pool) }
1037
      end
1038
      alias :connection_pools :connection_pool_list
1039

1040
      def establish_connection(config, owner_name: Base.name, shard: Base.default_shard)
E
eileencodes 已提交
1041 1042 1043
        owner_name = config.to_s if config.is_a?(Symbol)

        pool_config = resolve_pool_config(config, owner_name)
E
eileencodes 已提交
1044
        db_config = pool_config.db_config
1045

1046 1047 1048
        # Protects the connection named `ActiveRecord::Base` from being removed
        # if the user calls `establish_connection :primary`.
        if owner_to_pool_manager.key?(pool_config.connection_specification_name)
1049
          remove_connection_pool(pool_config.connection_specification_name, shard: shard)
1050
        end
1051 1052

        message_bus = ActiveSupport::Notifications.instrumenter
1053
        payload = {}
E
eileencodes 已提交
1054 1055
        if pool_config
          payload[:spec_name] = pool_config.connection_specification_name
1056
          payload[:config] = db_config.configuration_hash
1057 1058
        end

E
eileencodes 已提交
1059
        owner_to_pool_manager[pool_config.connection_specification_name] ||= PoolManager.new
1060
        pool_manager = get_pool_manager(pool_config.connection_specification_name)
E
eileencodes 已提交
1061
        pool_manager.set_pool_config(shard, pool_config)
J
Jean Boussier 已提交
1062

1063
        message_bus.instrument("!connection.active_record", payload) do
E
eileencodes 已提交
1064
          pool_config.pool
1065
        end
1066 1067
      end

1068 1069 1070
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
1071
        connection_pool_list.any?(&:active_connection?)
1072 1073
      end

1074 1075 1076
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
1077
      def clear_active_connections!
1078
        connection_pool_list.each(&:release_connection)
1079 1080
      end

S
Sebastian Martinez 已提交
1081
      # Clears the cache which maps classes.
1082 1083
      #
      # See ConnectionPool#clear_reloadable_connections! for details.
1084
      def clear_reloadable_connections!
1085
        connection_pool_list.each(&:clear_reloadable_connections!)
1086 1087 1088
      end

      def clear_all_connections!
1089
        connection_pool_list.each(&:disconnect!)
1090 1091
      end

M
Matthew Draper 已提交
1092 1093 1094 1095 1096 1097 1098
      # Disconnects all currently idle connections.
      #
      # See ConnectionPool#flush! for details.
      def flush_idle_connections!
        connection_pool_list.each(&:flush!)
      end

1099 1100 1101 1102
      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
1103 1104
      def retrieve_connection(spec_name, shard: ActiveRecord::Base.default_shard) # :nodoc:
        pool = retrieve_connection_pool(spec_name, shard: shard)
1105 1106

        unless pool
E
eileencodes 已提交
1107 1108
          if shard != ActiveRecord::Base.default_shard
            message = "No connection pool for '#{spec_name}' found for the '#{shard}' shard."
1109 1110
          elsif ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
            message = "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
1111
          else
1112
            message = "No connection pool for '#{spec_name}' found."
1113
          end
1114 1115

          raise ConnectionNotEstablished, message
1116 1117
        end

O
ojab 已提交
1118
        pool.connection
1119 1120
      end

1121 1122
      # Returns true if a connection that's accessible to this class has
      # already been opened.
1123 1124
      def connected?(spec_name, shard: ActiveRecord::Base.default_shard)
        pool = retrieve_connection_pool(spec_name, shard: shard)
1125
        pool && pool.connected?
1126 1127 1128 1129
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
1130
      # can be used as an argument for #establish_connection, for easily
1131
      # re-establishing the connection.
1132 1133
      def remove_connection(owner, shard: ActiveRecord::Base.default_shard)
        remove_connection_pool(owner, shard: shard)&.configuration_hash
1134 1135 1136
      end
      deprecate remove_connection: "Use #remove_connection_pool, which now returns a DatabaseConfig object instead of a Hash"

1137
      def remove_connection_pool(owner, shard: ActiveRecord::Base.default_shard)
1138
        if pool_manager = get_pool_manager(owner)
E
eileencodes 已提交
1139
          pool_config = pool_manager.remove_pool_config(shard)
1140

E
eileencodes 已提交
1141 1142
          if pool_config
            pool_config.disconnect!
1143
            pool_config.db_config
1144
          end
J
Jon Leighton 已提交
1145
        end
1146 1147
      end

E
eileencodes 已提交
1148
      # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool_manager.
1149 1150
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
1151
      def retrieve_connection_pool(owner, shard: ActiveRecord::Base.default_shard)
E
eileencodes 已提交
1152
        pool_config = get_pool_manager(owner)&.get_pool_config(shard)
E
eileencodes 已提交
1153
        pool_config&.pool
A
Arthur Neves 已提交
1154 1155 1156
      end

      private
E
eileencodes 已提交
1157
        attr_reader :owner_to_pool_manager
1158

1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
        # Returns the pool manager for an owner.
        #
        # Using `"primary"` to look up the pool manager for `ActiveRecord::Base` is
        # deprecated in favor of looking it up by `"ActiveRecord::Base"`.
        #
        # During the deprecation period, if `"primary"` is passed, the pool manager
        # for `ActiveRecord::Base` will still be returned.
        def get_pool_manager(owner)
          return owner_to_pool_manager[owner] if owner_to_pool_manager.key?(owner)

          if owner == "primary"
            ActiveSupport::Deprecation.warn("Using `\"primary\"` as a `connection_specification_name` is deprecated and will be removed in Rails 6.2.0. Please use `ActiveRecord::Base`.")
            owner_to_pool_manager[Base.name]
          end
        end

1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
        # Returns an instance of PoolConfig for a given adapter.
        # Accepts a hash one layer deep that contains all connection information.
        #
        # == Example
        #
        #   config = { "production" => { "host" => "localhost", "database" => "foo", "adapter" => "sqlite3" } }
        #   pool_config = Base.configurations.resolve_pool_config(:production)
        #   pool_config.db_config.configuration_hash
        #   # => { host: "localhost", database: "foo", adapter: "sqlite3" }
        #
E
eileencodes 已提交
1185 1186
        def resolve_pool_config(config, owner_name)
          db_config = Base.configurations.resolve(config)
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214

          raise(AdapterNotSpecified, "database configuration does not specify adapter") unless db_config.adapter

          # Require the adapter itself and give useful feedback about
          #   1. Missing adapter gems and
          #   2. Adapter gems' missing dependencies.
          path_to_adapter = "active_record/connection_adapters/#{db_config.adapter}_adapter"
          begin
            require path_to_adapter
          rescue LoadError => e
            # We couldn't require the adapter itself. Raise an exception that
            # points out config typos and missing gems.
            if e.path == path_to_adapter
              # We can assume that a non-builtin adapter was specified, so it's
              # either misspelled or missing from Gemfile.
              raise LoadError, "Could not load the '#{db_config.adapter}' Active Record adapter. Ensure that the adapter is spelled correctly in config/database.yml and that you've added the necessary adapter gem to your Gemfile.", e.backtrace

              # Bubbled up from the adapter require. Prefix the exception message
              # with some guidance about how to address it and reraise.
            else
              raise LoadError, "Error loading the '#{db_config.adapter}' Active Record adapter. Missing a gem it depends on? #{e.message}", e.backtrace
            end
          end

          unless ActiveRecord::Base.respond_to?(db_config.adapter_method)
            raise AdapterNotFound, "database configuration specifies nonexistent #{db_config.adapter} adapter"
          end

E
eileencodes 已提交
1215
          ConnectionAdapters::PoolConfig.new(owner_name, db_config)
1216
        end
1217
    end
N
Nick 已提交
1218
  end
1219
end