connection_pool.rb 14.2 KB
Newer Older
1
require 'thread'
2
require 'monitor'
3
require 'set'
J
Jeremy Kemper 已提交
4
require 'active_support/core_ext/module/synchronization'
5

N
Nick 已提交
6
module ActiveRecord
7 8 9 10 11
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

N
Nick 已提交
12
  module ConnectionAdapters
13
    # Connection pool base class for managing Active Record database
14 15
    # connections.
    #
P
Pratik Naik 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
31 32 33
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
34
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
35 36 37
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
38 39
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
40 41 42 43 44 45 46
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
47
    #
P
Pratik Naik 已提交
48 49 50 51 52
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
53 54 55 56 57 58
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +wait_timeout+: number of seconds to block and wait for a connection
    #   before giving up and raising a timeout error (default 5 seconds).
59
    class ConnectionPool
60
      attr_reader :spec, :connections
61
      attr_reader :columns, :columns_hash, :primary_keys
62

P
Pratik Naik 已提交
63 64 65 66 67 68
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
69 70
      def initialize(spec)
        @spec = spec
71

72 73
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
74

75 76
        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
77
        @queue = @connection_mutex.new_cond
78
        @timeout = spec.config[:wait_timeout] || 5
79

80 81
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
82

83 84
        @connections = []
        @checked_out = []
85 86 87

        @columns     = Hash.new do |h, table_name|
          h[table_name] = with_connection do |conn|
88 89 90 91 92 93 94 95 96

            # Fetch a list of columns
            conn.columns(table_name, "#{table_name} Columns").tap do |columns|

              # set primary key information
              columns.each do |column|
                column.primary = column.name == primary_keys[table_name]
              end
            end
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
          end
        end

        @columns_hash = Hash.new do |h, table_name|
          h[table_name] = Hash[columns[table_name].map { |col|
            [col.name, col]
          }]
        end

        @primary_keys = Hash.new do |h, table_name|
          h[table_name] = with_connection do |conn|
            if conn.table_exists?(table_name)
              conn.primary_key(table_name)
            else
              'id'
            end
          end
        end
      end

      # Clears out internal caches:
      #
      #   * columns
      #   * columns_hash
      #   * primary_keys
      def clear_cache!
        @columns.clear
        @columns_hash.clear
        @primary_keys.clear
N
Nick 已提交
126 127
      end

A
Aaron Patterson 已提交
128 129 130 131 132 133 134
      # Clear out internal caches for table with +table_name+
      def clear_table_cache!(table_name)
        @columns.delete table_name
        @columns_hash.delete table_name
        @primary_keys.delete table_name
      end

135 136 137 138 139 140
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
A
Aaron Patterson 已提交
141
        @reserved_connections[current_connection_id] ||= checkout
N
Nick 已提交
142 143
      end

144
      # Signal that the thread is finished with the current connection.
145
      # #release_connection releases the connection-thread association
146
      # and returns the connection to the pool.
147 148
      def release_connection(with_id = current_connection_id)
        conn = @reserved_connections.delete(with_id)
149 150 151
        checkin conn if conn
      end

152
      # If a connection already exists yield it to the block.  If no connection
153
      # exists checkout a connection, yield it to the block, and checkin the
154
      # connection when finished.
155
      def with_connection
156 157
        connection_id = current_connection_id
        fresh_connection = true unless @reserved_connections[connection_id]
158
        yield connection
159
      ensure
160
        release_connection(connection_id) if fresh_connection
N
Nick 已提交
161 162
      end

163 164
      # Returns true if a connection has already been opened.
      def connected?
165
        !@connections.empty?
N
Nick 已提交
166 167
      end

P
Pratik Naik 已提交
168
      # Disconnects all connections in the pool, and clears the pool.
169 170
      def disconnect!
        @reserved_connections.each do |name,conn|
171
          checkin conn
172
        end
173 174
        @reserved_connections = {}
        @connections.each do |conn|
N
Nick 已提交
175 176
          conn.disconnect!
        end
177
        @connections = []
N
Nick 已提交
178 179
      end

180
      # Clears the cache which maps classes
N
Nick 已提交
181
      def clear_reloadable_connections!
182
        @reserved_connections.each do |name, conn|
183
          checkin conn
184 185
        end
        @reserved_connections = {}
186 187
        @connections.each do |conn|
          conn.disconnect! if conn.requires_reloading?
N
Nick 已提交
188
        end
189 190 191
        @connections.delete_if do |conn|
          conn.requires_reloading?
        end
N
Nick 已提交
192 193
      end

194 195
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
196
      def verify_active_connections! #:nodoc:
197
        clear_stale_cached_connections!
198
        @connections.each do |connection|
199
          connection.verify!
N
Nick 已提交
200 201 202
        end
      end

203 204 205
      # Return any checked-out connections back to the pool by threads that
      # are no longer alive.
      def clear_stale_cached_connections!
206 207 208 209 210 211
        keys = @reserved_connections.keys - Thread.list.find_all { |t|
          t.alive?
        }.map { |thread| thread.object_id }
        keys.each do |key|
          checkin @reserved_connections[key]
          @reserved_connections.delete(key)
212 213 214
        end
      end

P
Pratik Naik 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
      # This is done by either returning an existing connection, or by creating
      # a new connection. If the maximum number of connections for this pool has
      # already been reached, but the pool is empty (i.e. they're all being used),
      # then this method will wait until a thread has checked in a connection.
      # The wait time is bounded however: if no connection can be checked out
      # within the timeout specified for this pool, then a ConnectionTimeoutError
      # exception will be raised.
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
      # - ConnectionTimeoutError: no connection can be obtained from the pool
      #   within the timeout period.
231
      def checkout
232 233
        # Checkout an available connection
        @connection_mutex.synchronize do
234 235 236 237 238 239 240
          loop do
            conn = if @checked_out.size < @connections.size
                     checkout_existing_connection
                   elsif @connections.size < @size
                     checkout_new_connection
                   end
            return conn if conn
241 242 243 244

            @queue.wait(@timeout)

            if(@checked_out.size < @connections.size)
245 246
              next
            else
247 248
              clear_stale_cached_connections!
              if @size == @checked_out.size
249
                raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}.  The max pool size is currently #{@size}; consider increasing it."
250
              end
251
            end
252

253 254
          end
        end
N
Nick 已提交
255 256
      end

P
Pratik Naik 已提交
257 258 259 260 261
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
262 263
      def checkin(conn)
        @connection_mutex.synchronize do
264
          conn.run_callbacks :checkin do
265 266 267
            @checked_out.delete conn
            @queue.signal
          end
268
        end
N
Nick 已提交
269
      end
270

271
      synchronize :clear_reloadable_connections!, :verify_active_connections!,
272 273 274
        :connected?, :disconnect!, :with => :@connection_mutex

      private
275
      def new_connection
276
        ActiveRecord::Base.send(spec.adapter_method, spec.config)
277 278
      end

279
      def current_connection_id #:nodoc:
280 281
        Thread.current.object_id
      end
N
Nick 已提交
282

283 284 285
      def checkout_new_connection
        c = new_connection
        @connections << c
286
        checkout_and_verify(c)
287 288 289
      end

      def checkout_existing_connection
290
        c = (@connections - @checked_out).first
291 292 293 294
        checkout_and_verify(c)
      end

      def checkout_and_verify(c)
295 296 297 298
        c.run_callbacks :checkout do
          c.verify!
          @checked_out << c
        end
299 300 301 302
        c
      end
    end

P
Pratik Naik 已提交
303
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
304
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
324
    # ActiveRecord::Base.connection_handler. Active Record models use this to
P
Pratik Naik 已提交
325
    # determine that connection pool that they should use.
326
    class ConnectionHandler
327 328
      attr_reader :connection_pools

329 330 331
      def initialize(pools = {})
        @connection_pools = pools
      end
332 333

      def establish_connection(name, spec)
334
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
335 336
      end

337 338 339
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
340
      def clear_active_connections!
341
        @connection_pools.each_value {|pool| pool.release_connection }
342 343 344 345 346 347 348 349
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
350
        @connection_pools.each_value {|pool| pool.disconnect! }
351 352 353 354
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
355
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
356 357 358 359 360 361 362 363 364 365 366
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

367 368
      # Returns true if a connection that's accessible to this class has
      # already been opened.
369
      def connected?(klass)
370
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
371
        conn && conn.connected?
372 373 374 375 376 377 378 379
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
380 381
        return nil unless pool

382
        @connection_pools.delete_if { |key, value| value == pool }
383 384
        pool.disconnect!
        pool.spec.config
385 386
      end

387
      def retrieve_connection_pool(klass)
388 389 390 391
        pool = @connection_pools[klass.name]
        return pool if pool
        return nil if ActiveRecord::Base == klass
        retrieve_connection_pool klass.superclass
392
      end
393
    end
394 395 396 397 398 399 400 401 402

    class ConnectionManagement
      def initialize(app)
        @app = app
      end

      def call(env)
        @app.call(env)
      ensure
P
Pratik Naik 已提交
403
        # Don't return connection (and perform implicit rollback) if
404 405 406 407 408 409
        # this request is a part of integration test
        unless env.key?("rack.test")
          ActiveRecord::Base.clear_active_connections!
        end
      end
    end
N
Nick 已提交
410
  end
411
end