提交 ce63a5a8 编写于 作者: F Fabian Reinartz 提交者: GitHub

Merge pull request #3352 from prometheus/rc2

Cut v2.0.0-rc.2
## 2.0.0-rc.1 / 2017-10-17
* [FEATURE] Added a warning for time-drift between the browser and the prometheus-server.
* [ENHANCEMENT] Much faster WAL read-back on restart.
* [BUGFIX] Fixed Remote-read to not drop the first series.
* [BUGFIX] Validate recording-rule names.
* [BUGFIX] Fix several races.
* [BUGFIX] Only close blocks if there are no iterators accessing it.
## 2.0.0-rc.2 / 2017-10-25
* [ENHANCEMENT] handle WAL segments with corrupted header gracefully
* [ENHANCEMENT] stabilize memory usage during WAL replay
* [CHANGE] Prefix all storage metrics with `prometheus_`
* [BUGFIX] Correctly handle label removal in remote read
* [BUGFIX] Fix chunk misalignment causing out-of-order samples
* [BUGFIX] Fix connection leak in Consul SD
* [BUGFIX] Handle invalid chunk derefernces gracefully
* [BUGFIX] Prevent potential deadlock during failing querier construction
Data written in previous pre-release versions may have been affected by the out-of-order
bug. Reading this data may reveal artefacts and incorrect data.
Starting with a clean storage directory is advised. The WAL directory may safely be kept.
## 1.8.0 / 2017-10-06
......
2.0.0-rc.1
2.0.0-rc.2
......@@ -167,6 +167,7 @@ func (a adapter) StartTime() (int64, error) {
return startTime, err
}
}
defer indexr.Close()
joblabel := "job"
tpls, err := indexr.LabelValues(joblabel)
......
......@@ -69,6 +69,17 @@ type Iterator interface {
Next() bool
}
// NewNopIterator returns a new chunk iterator that does not hold any data.
func NewNopIterator() Iterator {
return nopIterator{}
}
type nopIterator struct{}
func (nopIterator) At() (int64, float64) { return 0, 0 }
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
type Pool interface {
Put(Chunk) error
Get(e Encoding, b []byte) (Chunk, error)
......
......@@ -81,30 +81,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m := &compactorMetrics{}
m.ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_total",
Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_failed_total",
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_duration",
Name: "prometheus_tsdb_compaction_duration",
Help: "Duration of compaction runs.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_size",
Name: "prometheus_tsdb_compaction_chunk_size",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_samples",
Name: "prometheus_tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_range",
Name: "prometheus_tsdb_compaction_chunk_range",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
})
......
......@@ -129,7 +129,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_blocks_loaded",
Name: "prometheus_tsdb_blocks_loaded",
Help: "Number of currently loaded data blocks",
}, func() float64 {
db.mtx.RLock()
......@@ -137,15 +137,15 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return float64(len(db.blocks))
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_total",
Name: "prometheus_tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.",
})
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_failures_total",
Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
......@@ -448,9 +448,6 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
var cs []io.Closer
defer func() { closeAll(cs...) }()
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
......@@ -482,25 +479,25 @@ func (db *DB) reload() (err error) {
return errors.Wrap(err, "invalid block sequence")
}
// Close all opened blocks that no longer exist after we returned all locks.
// TODO(fabxc: probably races with querier still reading from them. Can
// we just abandon them and have the open FDs be GC'd automatically eventually?
for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks
db.mtx.Unlock()
for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; !ok {
b.Close()
}
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return nil
}
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
maxt := blocks[len(blocks)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
......@@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
......@@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error {
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
......@@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
if err == nil {
sq.blocks = append(sq.blocks, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
q.Close()
}
sq.blocks = append(sq.blocks, q)
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return sq, nil
}
......@@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
var g errgroup.Group
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error {
......
......@@ -89,59 +89,59 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_active_appenders",
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series",
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
})
m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_created_total",
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_series_removed_total",
Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_series_not_found",
Name: "prometheus_tsdb_head_series_not_found",
Help: "Total number of requests for series that were not found.",
})
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks",
Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_created_total",
Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks_removed_total",
Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_head_gc_duration_seconds",
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time",
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block.",
}, func() float64 {
return float64(h.MaxTime())
})
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time",
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block.",
}, func() float64 {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_truncate_duration_seconds",
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_samples_appended_total",
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended sampledb.",
})
......@@ -273,13 +273,23 @@ func (h *Head) ReadWAL() error {
}
}
samplesFunc := func(samples []RefSample) {
var buf []RefSample
select {
case buf = <-input:
default:
buf = make([]RefSample, 0, len(samples)*11/10)
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
}
firstInput <- append(buf[:0], samples...)
}
deletesFunc := func(stones []Stone) {
for _, s := range stones {
......@@ -665,7 +675,7 @@ func (h *Head) gc() {
// Rebuild symbols and label value indices from what is left in the postings terms.
h.postings.mtx.RLock()
symbols := make(map[string]struct{}, len(h.symbols))
symbols := make(map[string]struct{})
values := make(map[string]stringset, len(h.values))
for t := range h.postings.m {
......@@ -1152,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk {
}
s.chunks = append(s.chunks, c)
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
app, err := c.chunk.Appender()
if err != nil {
panic(err)
......@@ -1231,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
}
numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t {
return false, chunkCreated
}
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
// If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
if t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
......@@ -1242,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t
if numSamples == samplesPerChunk/4 {
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
}
s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1]
......@@ -1270,6 +1286,12 @@ func computeChunkEndTime(start, cur, max int64) int64 {
func (s *memSeries) iterator(id int) chunks.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
// We must ensure to not garbage collect as long as any readers still hold a reference.
if c == nil {
return chunks.NewNopIterator()
}
if id-s.firstChunkID < len(s.chunks)-1 {
return c.chunk.Iterator()
......
......@@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, errors.Wrapf(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrapf(err, "open tombstone reader")
}
return &blockQuerier{
......@@ -532,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool {
return false
}
}
if len(chks) == 0 {
continue
}
......
package tsdb
import (
"io"
"text/tabwriter"
)
const (
minwidth = 0
tabwidth = 0
padding = 2
padchar = ' '
flags = 0
)
func GetNewTabWriter(output io.Writer) *tabwriter.Writer {
return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags)
}
......@@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
if err != nil {
return nil, err
}
for _, fn := range fns {
for i, fn := range fns {
f, err := w.openSegmentFile(fn)
if err != nil {
return nil, err
if err == nil {
w.files = append(w.files, newSegmentFile(f))
continue
}
level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn)
for _, fn := range fns[i:] {
if err := os.Remove(fn); err != nil {
return w, errors.Wrap(err, "removing segment failed")
}
}
w.files = append(w.files, newSegmentFile(f))
break
}
go w.run(flushInterval)
......
......@@ -843,28 +843,28 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=",
"checksumSHA1": "qbhdcw451oyIWXj+0zlkR+rDi9Y=",
"path": "github.com/prometheus/tsdb",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
"revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-25T14:52:11Z"
},
{
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
"checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
"revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-25T14:52:11Z"
},
{
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
"revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-25T14:52:11Z"
},
{
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
"revision": "5d28c849c7ff3b43e2829a44a9aac16468e076ce",
"revisionTime": "2017-10-25T14:52:11Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册