connection_pool.rb 14.5 KB
Newer Older
1
require 'thread'
2
require 'monitor'
3
require 'set'
J
Jeremy Kemper 已提交
4
require 'active_support/core_ext/module/synchronization'
5

N
Nick 已提交
6
module ActiveRecord
7 8 9 10 11
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

N
Nick 已提交
12
  module ConnectionAdapters
13
    # Connection pool base class for managing Active Record database
14 15
    # connections.
    #
P
Pratik Naik 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
31 32 33
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
34
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
35 36 37
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
38 39
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
40 41 42 43 44 45 46
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
47
    #
P
Pratik Naik 已提交
48 49 50 51 52
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
53 54 55 56 57 58
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +wait_timeout+: number of seconds to block and wait for a connection
    #   before giving up and raising a timeout error (default 5 seconds).
59
    class ConnectionPool
60
      attr_accessor :automatic_reconnect
61
      attr_reader :spec, :connections
62
      attr_reader :columns, :columns_hash, :primary_keys, :tables
63

P
Pratik Naik 已提交
64 65 66 67 68 69
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
70 71
      def initialize(spec)
        @spec = spec
72

73 74
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
75

76 77
        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
78
        @queue = @connection_mutex.new_cond
79
        @timeout = spec.config[:wait_timeout] || 5
80

81 82
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
83

84 85
        @connections = []
        @checked_out = []
86
        @automatic_reconnect = true
87
        @tables = {}
88

89 90
        @columns     = Hash.new do |h, table_name|
          h[table_name] = with_connection do |conn|
91 92 93 94 95 96 97 98 99

            # Fetch a list of columns
            conn.columns(table_name, "#{table_name} Columns").tap do |columns|

              # set primary key information
              columns.each do |column|
                column.primary = column.name == primary_keys[table_name]
              end
            end
100 101 102 103 104 105 106 107 108 109 110
          end
        end

        @columns_hash = Hash.new do |h, table_name|
          h[table_name] = Hash[columns[table_name].map { |col|
            [col.name, col]
          }]
        end

        @primary_keys = Hash.new do |h, table_name|
          h[table_name] = with_connection do |conn|
111
            table_exists?(table_name) ? conn.primary_key(table_name) : 'id'
112 113 114 115
          end
        end
      end

116 117 118 119 120 121 122 123 124 125 126
      # A cached lookup for table existence
      def table_exists?(name)
        return true if @tables.key? name

        with_connection do |conn|
          conn.tables.each { |table| @tables[table] = true }
        end

        @tables.key? name
      end

127 128 129 130
      # Clears out internal caches:
      #
      #   * columns
      #   * columns_hash
131
      #   * tables
132 133 134
      def clear_cache!
        @columns.clear
        @columns_hash.clear
135
        @tables.clear
N
Nick 已提交
136 137
      end

A
Aaron Patterson 已提交
138 139 140 141 142 143 144
      # Clear out internal caches for table with +table_name+
      def clear_table_cache!(table_name)
        @columns.delete table_name
        @columns_hash.delete table_name
        @primary_keys.delete table_name
      end

145 146 147 148 149 150
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
A
Aaron Patterson 已提交
151
        @reserved_connections[current_connection_id] ||= checkout
N
Nick 已提交
152 153
      end

154
      # Signal that the thread is finished with the current connection.
155
      # #release_connection releases the connection-thread association
156
      # and returns the connection to the pool.
157 158
      def release_connection(with_id = current_connection_id)
        conn = @reserved_connections.delete(with_id)
159 160 161
        checkin conn if conn
      end

162
      # If a connection already exists yield it to the block.  If no connection
163
      # exists checkout a connection, yield it to the block, and checkin the
164
      # connection when finished.
165
      def with_connection
166 167
        connection_id = current_connection_id
        fresh_connection = true unless @reserved_connections[connection_id]
168
        yield connection
169
      ensure
170
        release_connection(connection_id) if fresh_connection
N
Nick 已提交
171 172
      end

173 174
      # Returns true if a connection has already been opened.
      def connected?
175
        !@connections.empty?
N
Nick 已提交
176 177
      end

P
Pratik Naik 已提交
178
      # Disconnects all connections in the pool, and clears the pool.
179 180
      def disconnect!
        @reserved_connections.each do |name,conn|
181
          checkin conn
182
        end
183 184
        @reserved_connections = {}
        @connections.each do |conn|
N
Nick 已提交
185 186
          conn.disconnect!
        end
187
        @connections = []
N
Nick 已提交
188 189
      end

190
      # Clears the cache which maps classes
N
Nick 已提交
191
      def clear_reloadable_connections!
192
        @reserved_connections.each do |name, conn|
193
          checkin conn
194 195
        end
        @reserved_connections = {}
196 197
        @connections.each do |conn|
          conn.disconnect! if conn.requires_reloading?
N
Nick 已提交
198
        end
199 200 201
        @connections.delete_if do |conn|
          conn.requires_reloading?
        end
N
Nick 已提交
202 203
      end

204 205
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
206
      def verify_active_connections! #:nodoc:
207
        clear_stale_cached_connections!
208
        @connections.each do |connection|
209
          connection.verify!
N
Nick 已提交
210 211 212
        end
      end

213 214 215
      # Return any checked-out connections back to the pool by threads that
      # are no longer alive.
      def clear_stale_cached_connections!
216 217 218 219 220 221
        keys = @reserved_connections.keys - Thread.list.find_all { |t|
          t.alive?
        }.map { |thread| thread.object_id }
        keys.each do |key|
          checkin @reserved_connections[key]
          @reserved_connections.delete(key)
222 223 224
        end
      end

P
Pratik Naik 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
      # This is done by either returning an existing connection, or by creating
      # a new connection. If the maximum number of connections for this pool has
      # already been reached, but the pool is empty (i.e. they're all being used),
      # then this method will wait until a thread has checked in a connection.
      # The wait time is bounded however: if no connection can be checked out
      # within the timeout specified for this pool, then a ConnectionTimeoutError
      # exception will be raised.
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
      # - ConnectionTimeoutError: no connection can be obtained from the pool
      #   within the timeout period.
241
      def checkout
242 243
        # Checkout an available connection
        @connection_mutex.synchronize do
244 245 246 247 248 249 250
          loop do
            conn = if @checked_out.size < @connections.size
                     checkout_existing_connection
                   elsif @connections.size < @size
                     checkout_new_connection
                   end
            return conn if conn
251 252 253 254

            @queue.wait(@timeout)

            if(@checked_out.size < @connections.size)
255 256
              next
            else
257 258
              clear_stale_cached_connections!
              if @size == @checked_out.size
259
                raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}.  The max pool size is currently #{@size}; consider increasing it."
260
              end
261
            end
262

263 264
          end
        end
N
Nick 已提交
265 266
      end

P
Pratik Naik 已提交
267 268 269 270 271
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
272 273
      def checkin(conn)
        @connection_mutex.synchronize do
274
          conn.run_callbacks :checkin do
275 276 277
            @checked_out.delete conn
            @queue.signal
          end
278
        end
N
Nick 已提交
279
      end
280

281
      synchronize :clear_reloadable_connections!, :verify_active_connections!,
282 283 284
        :connected?, :disconnect!, :with => :@connection_mutex

      private
285
      def new_connection
286
        ActiveRecord::Base.send(spec.adapter_method, spec.config)
287 288
      end

289
      def current_connection_id #:nodoc:
290 291
        Thread.current.object_id
      end
N
Nick 已提交
292

293
      def checkout_new_connection
294 295
        raise ConnectionNotEstablished unless @automatic_reconnect

296 297
        c = new_connection
        @connections << c
298
        checkout_and_verify(c)
299 300 301
      end

      def checkout_existing_connection
302
        c = (@connections - @checked_out).first
303 304 305 306
        checkout_and_verify(c)
      end

      def checkout_and_verify(c)
307 308 309 310
        c.run_callbacks :checkout do
          c.verify!
          @checked_out << c
        end
311 312 313 314
        c
      end
    end

P
Pratik Naik 已提交
315
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
316
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
336
    # ActiveRecord::Base.connection_handler. Active Record models use this to
P
Pratik Naik 已提交
337
    # determine that connection pool that they should use.
338
    class ConnectionHandler
339 340
      attr_reader :connection_pools

341 342 343
      def initialize(pools = {})
        @connection_pools = pools
      end
344 345

      def establish_connection(name, spec)
346
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
347 348
      end

349 350 351
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
352
      def clear_active_connections!
353
        @connection_pools.each_value {|pool| pool.release_connection }
354 355 356 357 358 359 360 361
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
362
        @connection_pools.each_value {|pool| pool.disconnect! }
363 364 365 366
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
367
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
368 369 370 371 372 373 374 375 376 377 378
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

379 380
      # Returns true if a connection that's accessible to this class has
      # already been opened.
381
      def connected?(klass)
382
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
383
        conn && conn.connected?
384 385 386 387 388 389 390 391
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
392 393
        return nil unless pool

394
        pool.automatic_reconnect = false
395 396
        pool.disconnect!
        pool.spec.config
397 398
      end

399
      def retrieve_connection_pool(klass)
400 401 402 403
        pool = @connection_pools[klass.name]
        return pool if pool
        return nil if ActiveRecord::Base == klass
        retrieve_connection_pool klass.superclass
404
      end
405
    end
406 407 408 409 410 411 412 413 414

    class ConnectionManagement
      def initialize(app)
        @app = app
      end

      def call(env)
        @app.call(env)
      ensure
P
Pratik Naik 已提交
415
        # Don't return connection (and perform implicit rollback) if
416 417 418 419 420 421
        # this request is a part of integration test
        unless env.key?("rack.test")
          ActiveRecord::Base.clear_active_connections!
        end
      end
    end
N
Nick 已提交
422
  end
423
end