connection_pool.rb 14.5 KB
Newer Older
1
require 'thread'
2
require 'monitor'
3
require 'set'
4
require 'active_support/core_ext/module/deprecation'
5

N
Nick 已提交
6
module ActiveRecord
7 8 9 10 11
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

12 13 14 15 16 17 18
  # Raised when a connection pool is full and another connection is requested
  class PoolFullError < ConnectionNotEstablished
    def initialize size, timeout
      super("Connection pool of size #{size} and timeout #{timeout}s is full")
    end
  end

N
Nick 已提交
19
  module ConnectionAdapters
20
    # Connection pool base class for managing Active Record database
21 22
    # connections.
    #
P
Pratik Naik 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
38 39 40
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
41
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
42 43 44
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
45 46
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
47 48 49 50 51 52 53
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
54
    #
P
Pratik Naik 已提交
55 56 57 58 59
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
60 61 62 63 64 65
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +wait_timeout+: number of seconds to block and wait for a connection
    #   before giving up and raising a timeout error (default 5 seconds).
66
    class ConnectionPool
67 68 69
      # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
      # A reaper instantiated with a nil frequency will never reap the
      # connection pool.
70 71 72
      #
      # Configure the frequency by setting "reaping_frequency" in your
      # database yaml file.
73 74 75 76 77 78 79 80
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

81
        def run
82 83 84 85 86 87 88 89 90 91
          return unless frequency
          Thread.new(frequency, pool) { |t, p|
            while true
              sleep t
              p.reap
            end
          }
        end
      end

92 93
      include MonitorMixin

94
      attr_accessor :automatic_reconnect, :timeout
95
      attr_reader :spec, :connections, :size, :reaper
96

P
Pratik Naik 已提交
97 98 99 100 101 102
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
103
      def initialize(spec)
104 105
        super()

N
Nick 已提交
106
        @spec = spec
107

108 109
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
110

111
        @timeout = spec.config[:wait_timeout] || 5
112
        @reaper  = Reaper.new self, spec.config[:reaping_frequency]
113
        @reaper.run
114

115 116
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
117

118
        @connections         = []
119
        @automatic_reconnect = true
A
Aaron Patterson 已提交
120 121
      end

122 123 124 125 126 127
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
A
Aaron Patterson 已提交
128
        @reserved_connections[current_connection_id] ||= checkout
N
Nick 已提交
129 130
      end

131 132 133
      # Check to see if there is an active connection in this connection
      # pool.
      def active_connection?
134
        active_connections.any?
135 136
      end

137
      # Signal that the thread is finished with the current connection.
138
      # #release_connection releases the connection-thread association
139
      # and returns the connection to the pool.
140 141
      def release_connection(with_id = current_connection_id)
        conn = @reserved_connections.delete(with_id)
142 143 144
        checkin conn if conn
      end

145
      # If a connection already exists yield it to the block. If no connection
146
      # exists checkout a connection, yield it to the block, and checkin the
147
      # connection when finished.
148
      def with_connection
149
        connection_id = current_connection_id
150
        fresh_connection = true unless active_connection?
151
        yield connection
152
      ensure
153
        release_connection(connection_id) if fresh_connection
N
Nick 已提交
154 155
      end

156 157
      # Returns true if a connection has already been opened.
      def connected?
158
        synchronize { @connections.any? }
N
Nick 已提交
159 160
      end

P
Pratik Naik 已提交
161
      # Disconnects all connections in the pool, and clears the pool.
162
      def disconnect!
163 164 165 166 167 168 169
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect!
          end
          @connections = []
N
Nick 已提交
170 171 172
        end
      end

S
Sebastian Martinez 已提交
173
      # Clears the cache which maps classes.
N
Nick 已提交
174
      def clear_reloadable_connections!
175 176 177 178 179 180 181 182 183
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect! if conn.requires_reloading?
          end
          @connections.delete_if do |conn|
            conn.requires_reloading?
          end
184
        end
N
Nick 已提交
185 186
      end

187 188
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
189
      def verify_active_connections! #:nodoc:
190 191 192 193
        synchronize do
          @connections.each do |connection|
            connection.verify!
          end
N
Nick 已提交
194 195 196
        end
      end

197
      def clear_stale_cached_connections! # :nodoc:
198
      end
199
      deprecate :clear_stale_cached_connections!
200

P
Pratik Naik 已提交
201 202 203
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
204 205 206 207 208 209
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
      # size limit set), an ActiveRecord::PoolFullError exception will be raised.
P
Pratik Naik 已提交
210 211 212 213
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
214
      # - PoolFullError: no connection can be obtained from the pool.
215
      def checkout
216
        # Checkout an available connection
217
        synchronize do
218 219
          # Try to find a connection that hasn't been leased, and lease it
          conn = connections.find { |c| c.lease }
220

221 222
          # If all connections were leased, and we have room to expand,
          # create a new connection and lease it.
223
          if !conn && connections.size < size
224 225
            conn = checkout_new_connection
            conn.lease
226
          end
227

228 229 230
          if conn
            checkout_and_verify conn
          else
231
            raise PoolFullError.new(size, timeout)
232 233
          end
        end
N
Nick 已提交
234 235
      end

P
Pratik Naik 已提交
236 237 238 239 240
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
241
      def checkin(conn)
242
        synchronize do
243
          conn.run_callbacks :checkin do
244
            conn.expire
245
          end
246
        end
N
Nick 已提交
247
      end
248

249 250 251 252 253
      # Remove a connection from the connection pool.  The connection will
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
        synchronize do
          @connections.delete conn
254 255 256 257 258 259 260

          # FIXME: we might want to store the key on the connection so that removing
          # from the reserved hash will be a little easier.
          thread_id = @reserved_connections.keys.find { |k|
            @reserved_connections[k] == conn
          }
          @reserved_connections.delete thread_id if thread_id
261 262 263
        end
      end

264 265 266 267 268 269 270
      # Removes dead connections from the pool.  A dead connection can occur
      # if a programmer forgets to close a connection at the end of a thread
      # or a thread dies unexpectedly.
      def reap
        synchronize do
          stale = Time.now - @timeout
          connections.dup.each do |conn|
271
            remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
272 273 274 275
          end
        end
      end

276
      private
277

278
      def new_connection
279
        ActiveRecord::Base.send(spec.adapter_method, spec.config)
280 281
      end

282
      def current_connection_id #:nodoc:
283
        ActiveRecord::Base.connection_id ||= Thread.current.object_id
284
      end
N
Nick 已提交
285

286
      def checkout_new_connection
287 288
        raise ConnectionNotEstablished unless @automatic_reconnect

289
        c = new_connection
290
        c.pool = self
291
        @connections << c
292
        c
293 294 295
      end

      def checkout_and_verify(c)
296 297 298
        c.run_callbacks :checkout do
          c.verify!
        end
299 300
        c
      end
301

302
      def active_connections
303 304
        @connections.find_all { |c| c.in_use? }
      end
305 306
    end

P
Pratik Naik 已提交
307
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
308
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
328
    # ActiveRecord::Base.connection_handler. Active Record models use this to
P
Pratik Naik 已提交
329
    # determine that connection pool that they should use.
330
    class ConnectionHandler
331 332
      attr_reader :connection_pools

333 334
      def initialize(pools = {})
        @connection_pools = pools
335
        @class_to_pool    = {}
336
      end
337 338

      def establish_connection(name, spec)
339 340
        @connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec)
        @class_to_pool[name] = @connection_pools[spec]
341 342
      end

343 344 345 346 347 348
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
        connection_pools.values.any? { |pool| pool.active_connection? }
      end

349 350 351
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
352
      def clear_active_connections!
353
        @connection_pools.each_value {|pool| pool.release_connection }
354 355
      end

S
Sebastian Martinez 已提交
356
      # Clears the cache which maps classes.
357 358 359 360 361
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
362
        @connection_pools.each_value {|pool| pool.disconnect! }
363 364 365 366
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
367
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
368 369 370 371 372 373 374 375 376 377 378
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

379 380
      # Returns true if a connection that's accessible to this class has
      # already been opened.
381
      def connected?(klass)
382
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
383
        conn && conn.connected?
384 385 386 387 388 389 390
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
391
        pool = @class_to_pool.delete(klass.name)
392 393
        return nil unless pool

394
        @connection_pools.delete pool.spec
395
        pool.automatic_reconnect = false
396 397
        pool.disconnect!
        pool.spec.config
398 399
      end

400
      def retrieve_connection_pool(klass)
401
        pool = @class_to_pool[klass.name]
402
        return pool if pool
403
        return nil if ActiveRecord::Model == klass
J
Jon Leighton 已提交
404
        retrieve_connection_pool klass.active_record_super
405
      end
406
    end
407 408 409 410 411 412 413

    class ConnectionManagement
      def initialize(app)
        @app = app
      end

      def call(env)
414 415
        testing = env.key?('rack.test')

416 417 418 419
        response = @app.call(env)
        response[2] = ::Rack::BodyProxy.new(response[2]) do
          ActiveRecord::Base.clear_active_connections! unless testing
        end
420

421
        response
422
      rescue
423
        ActiveRecord::Base.clear_active_connections! unless testing
424
        raise
425 426
      end
    end
N
Nick 已提交
427
  end
428
end