提交 ecd4bc61 编写于 作者: S Shlomi Noach

Merge branch 'master' into identify-gtid

......@@ -10,3 +10,4 @@ vagrant/db4-post-install.sh
vagrant/vagrant-ssh-key
vagrant/vagrant-ssh-key.pub
Godeps/_workspace
main
......@@ -33,6 +33,7 @@ usage() {
function precheck() {
local target
local ok=0 # return err. so shell exit code
target="$1"
if [[ "$target" == "linux" ]]; then
if [[ ! -x "$( which fpm )" ]]; then
......@@ -135,7 +136,7 @@ function build() {
prefix="$4"
ldflags="-X main.AppVersion=${RELEASE_VERSION} -X main.GitCommit=${GIT_COMMIT}"
echo "Building via $(go version)"
gobuild="go build ${opt_race} -ldflags \"$ldflags\" -o $builddir/orchestrator${prefix}/orchestrator/orchestrator go/cmd/orchestrator/main.go"
gobuild="go build -i ${opt_race} -ldflags \"$ldflags\" -o $builddir/orchestrator${prefix}/orchestrator/orchestrator go/cmd/orchestrator/main.go"
case $os in
'linux')
......
......@@ -20,6 +20,7 @@ At this time recovery requires either GTID, [Pseudo GTID](#pseudo-gtid) or Binlo
* DeadIntermediateMasterWithSingleSlaveFailingToConnect
* DeadIntermediateMasterWithSingleSlave
* DeadIntermediateMasterAndSomeSlaves
* DeadIntermediateMasterAndSlaves
* AllIntermediateMasterSlavesFailingToConnectOrDead
* AllIntermediateMasterSlavesNotReplicating
* UnreachableIntermediateMaster
......
......@@ -51,6 +51,8 @@ type Configuration struct {
MySQLTopologyUseMutualTLS bool // Turn on TLS authentication with the Topology MySQL instances
MySQLTopologyMaxPoolConnections int // Max concurrent connections on any topology instance
DatabaselessMode__experimental bool // !!!EXPERIMENTAL!!! Orchestrator will execute without speaking to a backend database; super-standalone mode
BackendDB string // EXPERIMENTAL: type of backend db; either "mysql" or "sqlite3"
SQLite3DataFile string // when BackendDB == "sqlite3", full path to sqlite3 datafile
SkipOrchestratorDatabaseUpdate bool // When true, do not check backend database schema nor attempt to update it. Useful when you may be running multiple versions of orchestrator, and you only wish certain boxes to dictate the db structure (or else any time a different orchestrator version runs it will rebuild database schema)
PanicIfDifferentDatabaseDeploy bool // When true, and this process finds the orchestrator backend DB was provisioned by a different version, panic
MySQLOrchestratorHost string
......@@ -160,6 +162,7 @@ type Configuration struct {
PseudoGTIDMonotonicHint string // subtring in Pseudo-GTID entry which indicates Pseudo-GTID entries are expected to be monotonically increasing
DetectPseudoGTIDQuery string // Optional query which is used to authoritatively decide whether pseudo gtid is enabled on instance
PseudoGTIDCoordinatesHistoryHeuristicMinutes int // Significantly reducing Pseudo-GTID lookup time, this indicates the most recent N minutes binlog position where search for Pseudo-GTID will heuristically begin (there is a fallback on fullscan if unsuccessful)
PseudoGTIDPreferIndependentMultiMatch bool // if 'false', a multi-replica Pseudo-GTID operation will attempt grouping replicas via Pseudo-GTID, and make less binlog computations. However it may cause servers in same bucket wait for one another, which could delay some servers from being repointed. There is a tradeoff between total operation time for all servers, and per-server time. When 'true', Pseudo-GTID matching will operate per server, independently. This will cause waste of same calculations, but no two servers will wait on one another.
BinlogEventsChunkSize int // Chunk size (X) for SHOW BINLOG|RELAYLOG EVENTS LIMIT ?,X statements. Smaller means less locking and mroe work to be done
BufferBinlogEvents bool // Should we used buffered read on SHOW BINLOG|RELAYLOG EVENTS -- releases the database lock sooner (recommended)
SkipBinlogEventsContaining []string // When scanning/comparing binlogs for Pseudo-GTID, skip entries containing given texts. These are NOT regular expressions (would consume too much CPU while scanning binlogs), just substrings to find.
......@@ -198,6 +201,7 @@ type Configuration struct {
GraphitePollSeconds int // Graphite writes interval. 0 disables.
URLPrefix string // URL prefix to run orchestrator on non-root web path, e.g. /orchestrator to put it behind nginx.
MaxOutdatedKeysToShow int // Maximum number of keys to show in ContinousDiscovery. If the number of polled hosts grows too far then showing the complete list is not ideal.
DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps
}
// ToJSONString will marshal this configuration as JSON
......@@ -228,6 +232,8 @@ func newConfiguration() *Configuration {
StatusEndpoint: "/api/status",
StatusSimpleHealth: true,
StatusOUVerify: false,
BackendDB: "mysql",
SQLite3DataFile: "",
SkipOrchestratorDatabaseUpdate: false,
PanicIfDifferentDatabaseDeploy: false,
MySQLOrchestratorMaxPoolConnections: 128, // limit concurrent conns to backend DB
......@@ -323,6 +329,7 @@ func newConfiguration() *Configuration {
PseudoGTIDMonotonicHint: "",
DetectPseudoGTIDQuery: "",
PseudoGTIDCoordinatesHistoryHeuristicMinutes: 2,
PseudoGTIDPreferIndependentMultiMatch: false,
BinlogEventsChunkSize: 10000,
BufferBinlogEvents: true,
SkipBinlogEventsContaining: []string{},
......@@ -358,6 +365,7 @@ func newConfiguration() *Configuration {
GraphitePollSeconds: 60,
URLPrefix: "",
MaxOutdatedKeysToShow: 64,
DiscoveryIgnoreReplicaHostnameFilters: []string{},
}
}
......@@ -461,12 +469,24 @@ func (this *Configuration) postReadAdjustments() error {
return fmt.Errorf("config's RemoteSSHCommand must either be empty, or contain a '{hostname}' placeholder")
}
}
if this.IsSQLite() && this.SQLite3DataFile == "" {
return fmt.Errorf("SQLite3DataFile must be set when BackendDB is sqlite3")
}
if this.RemoteSSHForMasterFailover && this.RemoteSSHCommand == "" {
return fmt.Errorf("RemoteSSHCommand is required when RemoteSSHForMasterFailover is set")
}
return nil
}
func (this *Configuration) IsSQLite() bool {
return strings.Contains(this.BackendDB, "sqlite")
}
func (this *Configuration) IsMySQL() bool {
return this.BackendDB == "mysql" || this.BackendDB == ""
}
// read reads configuration from given file, or silently skips if the file does not exist.
// If the file does exist, then it is expected to be in valid JSON format or the function bails out.
func read(fileName string) (*Configuration, error) {
......
此差异已折叠。
......@@ -45,6 +45,7 @@ const (
DeadIntermediateMasterWithSingleSlave = "DeadIntermediateMasterWithSingleSlave"
DeadIntermediateMasterWithSingleSlaveFailingToConnect = "DeadIntermediateMasterWithSingleSlaveFailingToConnect"
DeadIntermediateMasterAndSomeSlaves = "DeadIntermediateMasterAndSomeSlaves"
DeadIntermediateMasterAndSlaves = "DeadIntermediateMasterAndSlaves"
UnreachableIntermediateMaster = "UnreachableIntermediateMaster"
AllIntermediateMasterSlavesFailingToConnectOrDead = "AllIntermediateMasterSlavesFailingToConnectOrDead"
AllIntermediateMasterSlavesNotReplicating = "AllIntermediateMasterSlavesNotReplicating"
......@@ -94,7 +95,7 @@ type ReplicationAnalysis struct {
type ReplicationAnalysisChangelog struct {
AnalyzedInstanceKey InstanceKey
Changelog string
Changelog []string
}
// ReadReplicaHostsFromString parses and reads replica keys from comma delimited string
......
......@@ -42,18 +42,18 @@ var recentInstantAnalysis = cache.New(time.Duration(config.Config.RecoveryPollSe
func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnalysis bool) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}
args := sqlutils.Args(config.Config.InstancePollSeconds, clusterName)
args := sqlutils.Args(2*config.Config.InstancePollSeconds, clusterName)
analysisQueryReductionClause := ``
if config.Config.ReduceReplicationAnalysisCount {
analysisQueryReductionClause = `
HAVING
(MIN(
master_instance.last_checked <= master_instance.last_seen
AND master_instance.last_attempted_check <= master_instance.last_seen + INTERVAL (2 * ?) SECOND
) IS TRUE /* AS is_last_check_valid */) = 0
AND master_instance.last_attempted_check <= master_instance.last_seen + INTERVAL ? SECOND
) = 1 /* AS is_last_check_valid */) = 0
OR (IFNULL(SUM(slave_instance.last_checked <= slave_instance.last_seen
AND slave_instance.slave_io_running = 0
AND slave_instance.last_io_error RLIKE 'error (connecting|reconnecting) to master'
AND slave_instance.last_io_error like '%error %connecting to master%'
AND slave_instance.slave_sql_running = 1),
0) /* AS count_slaves_failing_to_connect_to_master */ > 0)
OR (IFNULL(SUM(slave_instance.last_checked <= slave_instance.last_seen),
......@@ -65,11 +65,11 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
OR (MIN(
master_instance.slave_sql_running = 1
AND master_instance.slave_io_running = 0
AND master_instance.last_io_error RLIKE 'error (connecting|reconnecting) to master'
AND master_instance.last_io_error like '%error %connecting to master%'
) /* AS is_failing_to_connect_to_master */)
OR (COUNT(slave_instance.server_id) /* AS count_slaves */ > 0)
`
args = append(args, config.Config.InstancePollSeconds)
args = append(args, 2*config.Config.InstancePollSeconds)
}
// "OR count_slaves > 0" above is a recent addition, which, granted, makes some previous conditions redundant.
// It gives more output, and more "NoProblem" messages that I am now interested in for purpose of auditing in database_instance_analysis_changelog
......@@ -83,11 +83,11 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
MIN(IFNULL(cluster_alias.alias, master_instance.cluster_name)) AS cluster_alias,
MIN(
master_instance.last_checked <= master_instance.last_seen
AND master_instance.last_attempted_check <= master_instance.last_seen + INTERVAL (2 * ?) SECOND
) IS TRUE AS is_last_check_valid,
AND master_instance.last_attempted_check <= master_instance.last_seen + INTERVAL ? SECOND
) = 1 AS is_last_check_valid,
MIN(master_instance.master_host IN ('' , '_')
OR master_instance.master_port = 0
OR left(master_instance.master_host, 2) = '//') AS is_master,
OR substr(master_instance.master_host, 1, 2) = '//') AS is_master,
MIN(master_instance.is_co_master) AS is_co_master,
MIN(CONCAT(master_instance.hostname,
':',
......@@ -101,30 +101,25 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
0) AS count_valid_replicating_slaves,
IFNULL(SUM(slave_instance.last_checked <= slave_instance.last_seen
AND slave_instance.slave_io_running = 0
AND slave_instance.last_io_error RLIKE 'error (connecting|reconnecting) to master'
AND slave_instance.last_io_error like '%%error %%connecting to master%%'
AND slave_instance.slave_sql_running = 1),
0) AS count_slaves_failing_to_connect_to_master,
IFNULL(SUM(
current_relay_log_file=prev_relay_log_file
and current_relay_log_pos=prev_relay_log_pos
and current_seen != prev_seen),
0) AS count_stale_slaves,
MIN(master_instance.replication_depth) AS replication_depth,
GROUP_CONCAT(slave_instance.Hostname, ':', slave_instance.Port) as slave_hosts,
GROUP_CONCAT(concat(slave_instance.Hostname, ':', slave_instance.Port)) as slave_hosts,
MIN(
master_instance.slave_sql_running = 1
AND master_instance.slave_io_running = 0
AND master_instance.last_io_error RLIKE 'error (connecting|reconnecting) to master'
AND master_instance.last_io_error like '%%error %%connecting to master%%'
) AS is_failing_to_connect_to_master,
MIN(
database_instance_downtime.downtime_active IS NULL
OR database_instance_downtime.end_timestamp < NOW()
) IS FALSE AS is_downtimed,
MIN(
database_instance_downtime.downtime_active is not null
and ifnull(database_instance_downtime.end_timestamp, now()) > now()
) AS is_downtimed,
MIN(
IFNULL(database_instance_downtime.end_timestamp, '')
) AS downtime_end_timestamp,
MIN(
IFNULL(TIMESTAMPDIFF(SECOND, NOW(), database_instance_downtime.end_timestamp), 0)
IFNULL(unix_timestamp() - unix_timestamp(database_instance_downtime.end_timestamp), 0)
) AS downtime_remaining_seconds,
MIN(
master_instance.binlog_server
......@@ -168,10 +163,11 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
AND slave_instance.log_slave_updates
AND slave_instance.binlog_format = 'ROW'),
0) AS count_row_based_loggin_slaves,
COUNT(DISTINCT IF(
slave_instance.log_bin AND slave_instance.log_slave_updates,
substring_index(slave_instance.version, '.', 2),
NULL)
COUNT(DISTINCT case
when slave_instance.log_bin AND slave_instance.log_slave_updates
then slave_instance.major_version
else NULL
end
) AS count_distinct_logging_major_versions
FROM
database_instance master_instance
......@@ -221,7 +217,7 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
a.CountValidReplicas = m.GetUint("count_valid_slaves")
a.CountValidReplicatingReplicas = m.GetUint("count_valid_replicating_slaves")
a.CountReplicasFailingToConnectToMaster = m.GetUint("count_slaves_failing_to_connect_to_master")
a.CountStaleReplicas = m.GetUint("count_stale_slaves")
a.CountStaleReplicas = 0
a.ReplicationDepth = m.GetUint("replication_depth")
a.IsFailingToConnectToMaster = m.GetBool("is_failing_to_connect_to_master")
a.IsDowntimed = m.GetBool("is_downtimed")
......@@ -322,6 +318,10 @@ func GetReplicationAnalysis(clusterName string, includeDowntimed bool, auditAnal
a.Analysis = DeadIntermediateMasterAndSomeSlaves
a.Description = "Intermediate master cannot be reached by orchestrator; some of its replicas are unreachable and none of its reachable replicas is replicating"
//
} else if !a.IsMaster && !a.LastCheckValid && a.CountReplicas > 0 && a.CountValidReplicas == 0 {
a.Analysis = DeadIntermediateMasterAndSlaves
a.Description = "Intermediate master cannot be reached by orchestrator and all of its replicas are unreachable"
//
} else if !a.IsMaster && !a.LastCheckValid && a.CountValidReplicas > 0 && a.CountValidReplicatingReplicas > 0 {
a.Analysis = UnreachableIntermediateMaster
a.Description = "Intermediate master cannot be reached by orchestrator but it has replicating replicas; possibly a network/host issue"
......@@ -418,32 +418,49 @@ func auditInstanceAnalysisInChangelog(instanceKey *InstanceKey, analysisCode Ana
// to verify no two orchestrator services are doing this without coordinating (namely, one dies, the other taking its place
// and has no familiarity of the former's cache)
analysisChangeWriteAttemptCounter.Inc(1)
sqlResult, err := db.ExecOrchestrator(`
lastAnalysisChanged := false
{
sqlResult, err := db.ExecOrchestrator(`
update database_instance_last_analysis set
analysis = ?,
analysis_timestamp = now()
where
hostname = ?
and port = ?
and analysis != ?
`,
string(analysisCode), instanceKey.Hostname, instanceKey.Port, string(analysisCode),
)
if err != nil {
return log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return log.Errore(err)
}
lastAnalysisChanged = (rows > 0)
}
if !lastAnalysisChanged {
_, err := db.ExecOrchestrator(`
insert ignore into database_instance_last_analysis (
hostname, port, analysis_timestamp, analysis
) values (
?, ?, now(), ?
) on duplicate key update
analysis = values(analysis),
analysis_timestamp = if(analysis = values(analysis), analysis_timestamp, values(analysis_timestamp))
)
`,
instanceKey.Hostname, instanceKey.Port, string(analysisCode),
)
if err != nil {
return log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return log.Errore(err)
instanceKey.Hostname, instanceKey.Port, string(analysisCode),
)
if err != nil {
return log.Errore(err)
}
}
recentInstantAnalysis.Set(instanceKey.DisplayString(), analysisCode, cache.DefaultExpiration)
lastAnalysisChanged := (rows > 0)
if !lastAnalysisChanged {
return nil
}
_, err = db.ExecOrchestrator(`
_, err := db.ExecOrchestrator(`
insert into database_instance_analysis_changelog (
hostname, port, analysis_timestamp, analysis
) values (
......@@ -472,26 +489,29 @@ func ExpireInstanceAnalysisChangelog() error {
}
// ReadReplicationAnalysisChangelog
func ReadReplicationAnalysisChangelog() ([]ReplicationAnalysisChangelog, error) {
res := []ReplicationAnalysisChangelog{}
func ReadReplicationAnalysisChangelog() (res [](*ReplicationAnalysisChangelog), err error) {
query := `
select
hostname,
port,
group_concat(analysis_timestamp,';',analysis order by changelog_id) as changelog
hostname,
port,
analysis_timestamp,
analysis
from
database_instance_analysis_changelog
group by
hostname, port
order by
hostname, port, changelog_id
`
err := db.QueryOrchestratorRowsMap(query, func(m sqlutils.RowMap) error {
analysisChangelog := ReplicationAnalysisChangelog{}
analysisChangelog := &ReplicationAnalysisChangelog{}
err = db.QueryOrchestratorRowsMap(query, func(m sqlutils.RowMap) error {
key := InstanceKey{Hostname: m.GetString("hostname"), Port: m.GetInt("port")}
analysisChangelog.AnalyzedInstanceKey.Hostname = m.GetString("hostname")
analysisChangelog.AnalyzedInstanceKey.Port = m.GetInt("port")
analysisChangelog.Changelog = m.GetString("changelog")
if !analysisChangelog.AnalyzedInstanceKey.Equals(&key) {
analysisChangelog = &ReplicationAnalysisChangelog{AnalyzedInstanceKey: key, Changelog: []string{}}
res = append(res, analysisChangelog)
}
analysisEntry := fmt.Sprintf("%s;%s,", m.GetString("analysis_timestamp"), m.GetString("analysis"))
analysisChangelog.Changelog = append(analysisChangelog.Changelog, analysisEntry)
res = append(res, analysisChangelog)
return nil
})
......
......@@ -72,9 +72,9 @@ func WriteClusterAlias(clusterName string, alias string) error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
replace into
cluster_alias (cluster_name, alias)
cluster_alias (cluster_name, alias, last_registered)
values
(?, ?)
(?, ?, now())
`,
clusterName, alias)
return log.Errore(err)
......@@ -106,14 +106,9 @@ func UpdateClusterAliases() error {
cluster_alias (alias, cluster_name, last_registered)
select
suggested_cluster_alias,
substring_index(group_concat(
cluster_name order by
((last_checked <= last_seen) is true) desc,
read_only asc,
num_slave_hosts desc
), ',', 1) as cluster_name,
NOW()
from
cluster_name,
now()
from
database_instance
left join database_instance_downtime using (hostname, port)
where
......@@ -123,9 +118,11 @@ func UpdateClusterAliases() error {
database_instance_downtime.downtime_active = 1
and database_instance_downtime.end_timestamp > now()
and database_instance_downtime.reason = ?
, false) is false
group by
suggested_cluster_alias
, 0) = 0
order by
ifnull(last_checked <= last_seen, 0) asc,
read_only desc,
num_slave_hosts asc
`, DowntimeLostInRecoveryMessage)
return log.Errore(err)
}
......
......@@ -300,7 +300,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
resolvedHostname = instance.Key.Hostname
}
if instance.IsOracleMySQL() && !instance.IsSmallerMajorVersionByString("5.6") {
if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") {
var masterInfoRepositoryOnTable bool
// Stuff only supported on Oracle MySQL >= 5.6
// ...
......@@ -466,6 +466,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// otherwise report the error to the caller
return fmt.Errorf("ReadTopologyInstance(%+v) 'show slave hosts' returned row with <host,port>: <%v,%v>", instanceKey, host, port)
}
// Note: NewInstanceKeyFromStrings calls ResolveHostname() implicitly
replicaKey, err := NewInstanceKeyFromStrings(host, port)
if err == nil && replicaKey.IsValid() {
......@@ -480,7 +481,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
}
if !foundByShowSlaveHosts && !isMaxScale {
// Either not configured to read SHOW SLAVE HOSTS or nothing was there.
// Discover by processlist
// Discover by information_schema.processlist
latency.Start("instance")
err := sqlutils.QueryRowsMap(db, `
select
......@@ -488,8 +489,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
from
information_schema.processlist
where
command='Binlog Dump'
or command='Binlog Dump GTID'
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
......@@ -517,7 +517,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
command,
time,
state,
left(processlist.info, 1024) as info,
substr(processlist.info, 1, 1024) as info,
now() - interval time second as started_at
from
information_schema.processlist
......@@ -942,17 +942,14 @@ func readInstancesByCondition(condition string, args []interface{}, sort string)
query := fmt.Sprintf(`
select
*,
timestampdiff(second, last_checked, now()) as seconds_since_last_checked,
(last_checked <= last_seen) is true as is_last_check_valid,
timestampdiff(second, last_seen, now()) as seconds_since_last_seen,
unix_timestamp() - unix_timestamp(last_checked) as seconds_since_last_checked,
ifnull(last_checked <= last_seen, 0) as is_last_check_valid,
unix_timestamp() - unix_timestamp(last_seen) as seconds_since_last_seen,
candidate_database_instance.last_suggested is not null
and candidate_database_instance.promotion_rule in ('must', 'prefer') as is_candidate,
ifnull(nullif(candidate_database_instance.promotion_rule, ''), 'neutral') as promotion_rule,
ifnull(unresolved_hostname, '') as unresolved_hostname,
(
database_instance_downtime.downtime_active IS NULL
or database_instance_downtime.end_timestamp < NOW()
) is false as is_downtimed,
(database_instance_downtime.downtime_active is not null and ifnull(database_instance_downtime.end_timestamp, now()) > now()) as is_downtimed,
ifnull(database_instance_downtime.reason, '') as downtime_reason,
ifnull(database_instance_downtime.owner, '') as downtime_owner,
ifnull(database_instance_downtime.end_timestamp, '') as downtime_end_timestamp
......@@ -1102,10 +1099,10 @@ func ReadUnseenInstances() ([](*Instance), error) {
// ReadProblemInstances reads all instances with problems
func ReadProblemInstances(clusterName string) ([](*Instance), error) {
condition := `
cluster_name LIKE IF(? = '', '%', ?)
cluster_name LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END)
and (
(last_seen < last_checked)
or (not ifnull(timestampdiff(second, last_checked, now()) <= ?, false))
or (unix_timestamp() - unix_timestamp(last_checked) > ?)
or (not slave_sql_running)
or (not slave_io_running)
or (abs(cast(seconds_behind_master as signed) - cast(sql_delay as signed)) > ?)
......@@ -1124,10 +1121,8 @@ func ReadProblemInstances(clusterName string) ([](*Instance), error) {
if instance.IsDowntimed {
skip = true
}
for _, filter := range config.Config.ProblemIgnoreHostnameFilters {
if matched, _ := regexp.MatchString(filter, instance.Key.Hostname); matched {
skip = true
}
if RegexpMatchPatterns(instance.Key.Hostname, config.Config.ProblemIgnoreHostnameFilters) {
skip = true
}
if !skip {
reportedInstances = append(reportedInstances, instance)
......@@ -1140,11 +1135,11 @@ func ReadProblemInstances(clusterName string) ([](*Instance), error) {
func SearchInstances(searchString string) ([](*Instance), error) {
searchString = strings.TrimSpace(searchString)
condition := `
locate(?, hostname) > 0
or locate(?, cluster_name) > 0
or locate(?, version) > 0
or locate(?, version_comment) > 0
or locate(?, concat(hostname, ':', port)) > 0
instr(hostname, ?) > 0
or instr(cluster_name, ?) > 0
or instr(version, ?) > 0
or instr(version_comment, ?) > 0
or instr(concat(hostname, ':', port), ?) > 0
or concat(server_id, '') = ?
or concat(port, '') = ?
`
......@@ -1260,22 +1255,16 @@ func ReadClusterCandidateInstances(clusterName string) ([](*Instance), error) {
func filterOSCInstances(instances [](*Instance)) [](*Instance) {
result := [](*Instance){}
for _, instance := range instances {
skipThisHost := false
for _, filter := range config.Config.OSCIgnoreHostnameFilters {
if matched, _ := regexp.MatchString(filter, instance.Key.Hostname); matched {
skipThisHost = true
}
if RegexpMatchPatterns(instance.Key.Hostname, config.Config.OSCIgnoreHostnameFilters) {
continue
}
if instance.IsBinlogServer() {
skipThisHost = true
continue
}
if !instance.IsLastCheckValid {
skipThisHost = true
}
if !skipThisHost {
result = append(result, instance)
continue
}
result = append(result, instance)
}
return result
}
......@@ -1589,25 +1578,44 @@ func InjectUnseenMasters() error {
// appears on the hostname_resolved table; this means some time in the past their hostname was unresovled, and now
// resovled to a different value; the old hostname is never accessed anymore and the old entry should be removed.
func ForgetUnseenInstancesDifferentlyResolved() error {
sqlResult, err := db.ExecOrchestrator(`
DELETE FROM
database_instance
USING
hostname_resolve
JOIN database_instance ON (hostname_resolve.hostname = database_instance.hostname)
WHERE
hostname_resolve.hostname != hostname_resolve.resolved_hostname
AND (last_checked <= last_seen) IS NOT TRUE
`,
)
if err != nil {
return log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return log.Errore(err)
query := `
select
database_instance.hostname, database_instance.port
from
hostname_resolve
JOIN database_instance ON (hostname_resolve.hostname = database_instance.hostname)
where
hostname_resolve.hostname != hostname_resolve.resolved_hostname
AND ifnull(last_checked <= last_seen, 0) = 0
`
keys := NewInstanceKeyMap()
err := db.QueryOrchestrator(query, nil, func(m sqlutils.RowMap) error {
key := InstanceKey{
Hostname: m.GetString("hostname"),
Port: m.GetInt("port"),
}
keys.AddKey(key)
return nil
})
var rowsAffected int64 = 0
for _, key := range keys.GetInstanceKeys() {
sqlResult, err := db.ExecOrchestrator(`
delete from
database_instance
where
hostname = ? and port = ?
`, sqlutils.Args(key.Hostname, key.Port),
)
if err != nil {
return log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return log.Errore(err)
}
rowsAffected = rowsAffected + rows
}
AuditOperation("forget-unseen-differently-resolved", nil, fmt.Sprintf("Forgotten instances: %d", rows))
AuditOperation("forget-unseen-differently-resolved", nil, fmt.Sprintf("Forgotten instances: %d", rowsAffected))
return err
}
......@@ -1857,13 +1865,13 @@ func ReadOutdatedInstanceKeys() ([]InstanceKey, error) {
from
database_instance
where
if (
last_attempted_check <= last_checked,
last_checked < now() - interval ? second,
last_checked < now() - interval (? * 2) second
)
case
when last_attempted_check <= last_checked
then last_checked < now() - interval ? second
else last_checked < now() - interval ? second
end
`
args := sqlutils.Args(config.Config.InstancePollSeconds, config.Config.InstancePollSeconds)
args := sqlutils.Args(config.Config.InstancePollSeconds, 2*config.Config.InstancePollSeconds)
err := db.QueryOrchestrator(query, args, func(m sqlutils.RowMap) error {
instanceKey, merr := NewInstanceKeyFromStrings(m.GetString("hostname"), m.GetString("port"))
......@@ -1945,6 +1953,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo
"server_id",
"server_uuid",
"version",
"major_version",
"version_comment",
"binlog_server",
"read_only",
......@@ -2012,6 +2021,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo
args = append(args, instance.ServerID)
args = append(args, instance.ServerUUID)
args = append(args, instance.Version)
args = append(args, instance.MajorVersionString())
args = append(args, instance.VersionComment)
args = append(args, instance.IsBinlogServer())
args = append(args, instance.ReadOnly)
......@@ -2349,8 +2359,8 @@ func RecordInstanceCoordinatesHistory() error {
_, err := db.ExecOrchestrator(`
delete from database_instance_coordinates_history
where
recorded_timestamp < NOW() - INTERVAL (? + 5) MINUTE
`, config.Config.PseudoGTIDCoordinatesHistoryHeuristicMinutes,
recorded_timestamp < NOW() - INTERVAL ? MINUTE
`, (config.Config.PseudoGTIDCoordinatesHistoryHeuristicMinutes + 5),
)
return log.Errore(err)
}
......@@ -2511,37 +2521,3 @@ func RecordInstanceBinlogFileHistory() error {
}
return ExecDBWriteFunc(writeFunc)
}
// UpdateInstanceRecentRelaylogHistory updates the database_instance_recent_relaylog_history
// table listing the current relaylog coordinates and the one-before.
// This information can be used to diagnoze a stale-replication scenario (for example, master is locked down
// and although replicas are connected, they're not making progress)
func UpdateInstanceRecentRelaylogHistory() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
insert into
database_instance_recent_relaylog_history (
hostname, port,
current_relay_log_file, current_relay_log_pos, current_seen,
prev_relay_log_file, prev_relay_log_pos
)
select
hostname, port,
relay_log_file, relay_log_pos, last_seen,
'', 0
from database_instance
where
relay_log_file != ''
on duplicate key update
prev_relay_log_file = current_relay_log_file,
prev_relay_log_pos = current_relay_log_pos,
prev_seen = current_seen,
current_relay_log_file = values(current_relay_log_file),
current_relay_log_pos = values (current_relay_log_pos),
current_seen = values(current_seen)
`,
)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}
......@@ -3,6 +3,8 @@ package inst
import (
"bytes"
"fmt"
"regexp"
"strings"
"testing"
test "github.com/openark/golib/tests"
......@@ -14,6 +16,17 @@ var (
i730k = InstanceKey{Hostname: "i730", Port: 3306}
)
var (
spacesRegexp = regexp.MustCompile(`[ \t\n\r]+`)
)
func normalizeQuery(name string) string {
name = strings.Replace(name, "`", "", -1)
name = spacesRegexp.ReplaceAllString(name, " ")
name = strings.TrimSpace(name)
return name
}
func mkTestInstances() []*Instance {
i710 := Instance{Key: i710k, ServerID: 710, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 10}}
i720 := Instance{Key: i720k, ServerID: 720, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 20}}
......@@ -36,34 +49,34 @@ func TestMkInsertOdku(t *testing.T) {
// one instance
s1 := `INSERT ignore INTO database_instance
(hostname, port, last_checked, last_attempted_check, uptime, server_id, server_uuid, version, version_comment, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, supports_oracle_gtid, oracle_gtid, executed_gtid_set, gtid_purged, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, allow_tls, semi_sync_enforced, instance_alias, last_seen)
(hostname, port, last_checked, last_attempted_check, uptime, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, supports_oracle_gtid, oracle_gtid, executed_gtid_set, gtid_purged, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, allow_tls, semi_sync_enforced, instance_alias, last_seen)
VALUES
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), uptime=VALUES(uptime), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), log_bin=VALUES(log_bin), log_slave_updates=VALUES(log_slave_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), master_host=VALUES(master_host), master_port=VALUES(master_port), slave_sql_running=VALUES(slave_sql_running), slave_io_running=VALUES(slave_io_running), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), executed_gtid_set=VALUES(executed_gtid_set), gtid_purged=VALUES(gtid_purged), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), master_log_file=VALUES(master_log_file), read_master_log_pos=VALUES(read_master_log_pos), relay_master_log_file=VALUES(relay_master_log_file), exec_master_log_pos=VALUES(exec_master_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), seconds_behind_master=VALUES(seconds_behind_master), slave_lag_seconds=VALUES(slave_lag_seconds), sql_delay=VALUES(sql_delay), num_slave_hosts=VALUES(num_slave_hosts), slave_hosts=VALUES(slave_hosts), cluster_name=VALUES(cluster_name), suggested_cluster_alias=VALUES(suggested_cluster_alias), data_center=VALUES(data_center), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_master=VALUES(is_co_master), replication_credentials_available=VALUES(replication_credentials_available), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), instance_alias=VALUES(instance_alias), last_seen=VALUES(last_seen)
hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), uptime=VALUES(uptime), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), log_bin=VALUES(log_bin), log_slave_updates=VALUES(log_slave_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), master_host=VALUES(master_host), master_port=VALUES(master_port), slave_sql_running=VALUES(slave_sql_running), slave_io_running=VALUES(slave_io_running), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), executed_gtid_set=VALUES(executed_gtid_set), gtid_purged=VALUES(gtid_purged), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), master_log_file=VALUES(master_log_file), read_master_log_pos=VALUES(read_master_log_pos), relay_master_log_file=VALUES(relay_master_log_file), exec_master_log_pos=VALUES(exec_master_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), seconds_behind_master=VALUES(seconds_behind_master), slave_lag_seconds=VALUES(slave_lag_seconds), sql_delay=VALUES(sql_delay), num_slave_hosts=VALUES(num_slave_hosts), slave_hosts=VALUES(slave_hosts), cluster_name=VALUES(cluster_name), suggested_cluster_alias=VALUES(suggested_cluster_alias), data_center=VALUES(data_center), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_master=VALUES(is_co_master), replication_credentials_available=VALUES(replication_credentials_available), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), instance_alias=VALUES(instance_alias), last_seen=VALUES(last_seen)
`
a1 := `i710, 3306, 0, 710, , 5.6.7, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , `
a1 := `i710, 3306, 0, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , `
sql1, args1 := mkInsertOdkuForInstances(instances[:1], false, true)
test.S(t).ExpectEquals(sql1, s1)
test.S(t).ExpectEquals(normalizeQuery(sql1), normalizeQuery(s1))
test.S(t).ExpectEquals(fmtArgs(args1), a1)
// three instances
s3 := `INSERT INTO database_instance
(hostname, port, last_checked, last_attempted_check, uptime, server_id, server_uuid, version, version_comment, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, supports_oracle_gtid, oracle_gtid, executed_gtid_set, gtid_purged, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, allow_tls, semi_sync_enforced, instance_alias, last_seen)
(hostname, port, last_checked, last_attempted_check, uptime, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, supports_oracle_gtid, oracle_gtid, executed_gtid_set, gtid_purged, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, allow_tls, semi_sync_enforced, instance_alias, last_seen)
VALUES
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()),
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()),
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()),
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()),
(?, ?, NOW(), NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), uptime=VALUES(uptime), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), log_bin=VALUES(log_bin), log_slave_updates=VALUES(log_slave_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), master_host=VALUES(master_host), master_port=VALUES(master_port), slave_sql_running=VALUES(slave_sql_running), slave_io_running=VALUES(slave_io_running), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), executed_gtid_set=VALUES(executed_gtid_set), gtid_purged=VALUES(gtid_purged), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), master_log_file=VALUES(master_log_file), read_master_log_pos=VALUES(read_master_log_pos), relay_master_log_file=VALUES(relay_master_log_file), exec_master_log_pos=VALUES(exec_master_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), seconds_behind_master=VALUES(seconds_behind_master), slave_lag_seconds=VALUES(slave_lag_seconds), sql_delay=VALUES(sql_delay), num_slave_hosts=VALUES(num_slave_hosts), slave_hosts=VALUES(slave_hosts), cluster_name=VALUES(cluster_name), suggested_cluster_alias=VALUES(suggested_cluster_alias), data_center=VALUES(data_center), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_master=VALUES(is_co_master), replication_credentials_available=VALUES(replication_credentials_available), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), instance_alias=VALUES(instance_alias), last_seen=VALUES(last_seen)
hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), uptime=VALUES(uptime), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), log_bin=VALUES(log_bin), log_slave_updates=VALUES(log_slave_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), master_host=VALUES(master_host), master_port=VALUES(master_port), slave_sql_running=VALUES(slave_sql_running), slave_io_running=VALUES(slave_io_running), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), executed_gtid_set=VALUES(executed_gtid_set), gtid_purged=VALUES(gtid_purged), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), master_log_file=VALUES(master_log_file), read_master_log_pos=VALUES(read_master_log_pos), relay_master_log_file=VALUES(relay_master_log_file), exec_master_log_pos=VALUES(exec_master_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), seconds_behind_master=VALUES(seconds_behind_master), slave_lag_seconds=VALUES(slave_lag_seconds), sql_delay=VALUES(sql_delay), num_slave_hosts=VALUES(num_slave_hosts), slave_hosts=VALUES(slave_hosts), cluster_name=VALUES(cluster_name), suggested_cluster_alias=VALUES(suggested_cluster_alias), data_center=VALUES(data_center), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_master=VALUES(is_co_master), replication_credentials_available=VALUES(replication_credentials_available), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), instance_alias=VALUES(instance_alias), last_seen=VALUES(last_seen)
`
a3 := `i710, 3306, 0, 710, , 5.6.7, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , i720, 3306, 0, 720, , 5.6.7, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , i730, 3306, 0, 730, , 5.6.7, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , `
a3 := `i710, 3306, 0, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , i720, 3306, 0, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , i730, 3306, 0, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, false, false, , 0, , 0, false, false, false, false, false, , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , 0, false, false, false, false, false, , `
sql3, args3 := mkInsertOdkuForInstances(instances[:3], true, true)
test.S(t).ExpectEquals(sql3, s3)
test.S(t).ExpectEquals(normalizeQuery(sql3), normalizeQuery(s3))
test.S(t).ExpectEquals(fmtArgs(args3), a3)
}
......
......@@ -21,6 +21,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"time"
"github.com/github/orchestrator/go/config"
......@@ -1695,9 +1696,61 @@ func GetSortedReplicas(masterKey *InstanceKey, stopReplicationMethod StopReplica
return replicas, err
}
func MultiMatchBelowIndependently(replicas [](*Instance), belowKey *InstanceKey, postponedFunctionsContainer *PostponedFunctionsContainer) (matchedReplicas [](*Instance), belowInstance *Instance, err error, errs []error) {
belowInstance, found, err := ReadInstance(belowKey)
if err != nil || !found {
return matchedReplicas, belowInstance, err, errs
}
replicas = RemoveInstance(replicas, belowKey)
if len(replicas) == 0 {
// Nothing to do
return replicas, belowInstance, err, errs
}
log.Infof("Will match %+v replicas below %+v via Pseudo-GTID, independently", len(replicas), belowKey)
barrier := make(chan *InstanceKey)
replicaMutex := &sync.Mutex{}
for _, replica := range replicas {
replica := replica
// Parallelize repoints
go func() {
defer func() { barrier <- &replica.Key }()
ExecuteOnTopology(func() {
replica, _, replicaErr := MatchBelow(&replica.Key, belowKey, true)
replicaMutex.Lock()
defer replicaMutex.Unlock()
if replicaErr == nil {
matchedReplicas = append(matchedReplicas, replica)
} else {
errs = append(errs, replicaErr)
}
})
}()
}
for range replicas {
<-barrier
}
if len(errs) == len(replicas) {
// All returned with error
return matchedReplicas, belowInstance, fmt.Errorf("MultiMatchBelowIndependently: Error on all %+v operations", len(errs)), errs
}
AuditOperation("multi-match-below-independent", belowKey, fmt.Sprintf("matched %d/%d replicas below %+v via Pseudo-GTID", len(matchedReplicas), len(replicas), belowKey))
return matchedReplicas, belowInstance, err, errs
}
// MultiMatchBelow will efficiently match multiple replicas below a given instance.
// It is assumed that all given replicas are siblings
func MultiMatchBelow(replicas [](*Instance), belowKey *InstanceKey, replicasAlreadyStopped bool, postponedFunctionsContainer *PostponedFunctionsContainer) ([](*Instance), *Instance, error, []error) {
if config.Config.PseudoGTIDPreferIndependentMultiMatch {
return MultiMatchBelowIndependently(replicas, belowKey, postponedFunctionsContainer)
}
res := [](*Instance){}
errs := []error{}
replicaMutex := make(chan bool, 1)
......
......@@ -158,3 +158,13 @@ func IsSmallerBinlogFormat(binlogFormat string, otherBinlogFormat string) bool {
}
return false
}
// RegexpMatchPatterns returns true if s matches any of the provided regexpPatterns
func RegexpMatchPatterns(s string, regexpPatterns []string) bool {
for _, filter := range regexpPatterns {
if matched, err := regexp.MatchString(filter, s); err == nil && matched {
return true
}
}
return false
}
package inst
import (
"testing"
)
type testPatterns struct {
s string
patterns []string
expected bool
}
func TestRegexpMatchPatterns(t *testing.T) {
patterns := []testPatterns{
{"hostname", []string{}, false},
{"hostname", []string{"blah"}, false},
{"hostname", []string{"blah", "blah"}, false},
{"hostname", []string{"host", "blah"}, true},
{"hostname", []string{"blah", "host"}, true},
{"hostname", []string{"ho.tname"}, true},
{"hostname", []string{"ho.tname2"}, false},
{"hostname", []string{"ho.*me"}, true},
}
for _, p := range patterns {
if match := RegexpMatchPatterns(p.s, p.patterns); match != p.expected {
t.Errorf("RegexpMatchPatterns failed with: %q, %+v, got: %+v, expected: %+v", p.s, p.patterns, match, p.expected)
}
}
}
......@@ -35,7 +35,7 @@ func ReadActiveMaintenance() ([]Maintenance, error) {
hostname,
port,
begin_timestamp,
timestampdiff(second, begin_timestamp, now()) as seconds_elapsed,
unix_timestamp() - unix_timestamp(begin_timestamp) as seconds_elapsed,
maintenance_active,
owner,
reason
......@@ -236,12 +236,13 @@ func ExpireMaintenance() error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
left join node_health on (processing_node_hostname = node_health.hostname AND processing_node_token = node_health.token)
set
database_instance_maintenance.maintenance_active = NULL
maintenance_active = NULL
where
node_health.last_seen_active IS NULL
and explicitly_bounded = 0
explicitly_bounded = 0
and concat(processing_node_hostname, ':', processing_node_token) not in (
select concat(hostname, ':', token) from node_health
)
`,
)
if err != nil {
......
......@@ -40,11 +40,11 @@ func WriteLongRunningProcesses(instanceKey *InstanceKey, processes []Process) er
for _, process := range processes {
_, merr := db.ExecOrchestrator(`
insert ignore into database_instance_long_running_queries (
hostname,
port,
process_id,
process_started_at,
insert ignore into database_instance_long_running_queries (
hostname,
port,
process_id,
process_started_at,
process_user,
process_host,
process_db,
......
......@@ -63,7 +63,7 @@ func WriteResolvedHostname(hostname string, resolvedHostname string) error {
values
(?, ?, NOW())
on duplicate key update
hostname=if(values(hostname) != resolved_hostname, values(hostname), hostname),
hostname=values(hostname),
resolved_timestamp=values(resolved_timestamp)
`,
hostname,
......@@ -215,7 +215,7 @@ func WriteHostnameUnresolve(instanceKey *InstanceKey, unresolvedHostname string)
return log.Errore(err)
}
_, err = db.ExecOrchestrator(`
replace into hostname_unresolve_history (
replace into hostname_unresolve_history (
hostname,
unresolved_hostname,
last_registered)
......@@ -232,7 +232,7 @@ func WriteHostnameUnresolve(instanceKey *InstanceKey, unresolvedHostname string)
func DeregisterHostnameUnresolve(instanceKey *InstanceKey) error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from hostname_unresolve
delete from hostname_unresolve
where hostname=?
`, instanceKey.Hostname,
)
......@@ -245,7 +245,7 @@ func DeregisterHostnameUnresolve(instanceKey *InstanceKey) error {
func ExpireHostnameUnresolve() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from hostname_unresolve
delete from hostname_unresolve
where last_registered < NOW() - INTERVAL ? MINUTE
`, config.Config.ExpiryHostnameResolvesMinutes,
)
......@@ -260,8 +260,8 @@ func ForgetExpiredHostnameResolves() error {
delete
from hostname_resolve
where
resolved_timestamp < NOW() - interval (? * 2) minute`,
config.Config.ExpiryHostnameResolvesMinutes,
resolved_timestamp < NOW() - interval ? minute`,
2*config.Config.ExpiryHostnameResolvesMinutes,
)
return err
}
......
......@@ -71,6 +71,11 @@ func init() {
ometrics.OnGraphiteTick(func() { isElectedGauge.Update(int64(atomic.LoadInt64(&isElectedNode))) })
}
// used in several places
func instancePollSecondsDuration() time.Duration {
return time.Duration(config.Config.InstancePollSeconds) * time.Second
}
// acceptSignals registers for OS signals
func acceptSignals() {
c := make(chan os.Signal, 1)
......@@ -140,7 +145,7 @@ func discoverInstance(instanceKey inst.InstanceKey) {
defer func() {
latency.Stop("total")
discoveryTime := latency.Elapsed("total")
if discoveryTime > time.Duration(config.Config.InstancePollSeconds)*time.Second {
if discoveryTime > instancePollSecondsDuration() {
log.Warningf("discoverInstance for key %v took %.4fs", instanceKey, discoveryTime.Seconds())
}
}()
......@@ -150,7 +155,10 @@ func discoverInstance(instanceKey inst.InstanceKey) {
return
}
if existsInCacheError := recentDiscoveryOperationKeys.Add(instanceKey.DisplayString(), true, cache.DefaultExpiration); existsInCacheError != nil {
// Calculate the expiry period each time as InstancePollSeconds
// _may_ change during the run of the process (via SIGHUP) and
// it is not possible to change the cache's default expiry..
if existsInCacheError := recentDiscoveryOperationKeys.Add(instanceKey.DisplayString(), true, instancePollSecondsDuration()); existsInCacheError != nil {
// Just recently attempted
return
}
......@@ -216,7 +224,13 @@ func discoverInstance(instanceKey inst.InstanceKey) {
// Investigate replicas:
for _, replicaKey := range instance.SlaveHosts.GetInstanceKeys() {
replicaKey := replicaKey
replicaKey := replicaKey // not needed? no concurrency here?
// Avoid noticing some hosts we would otherwise discover
if inst.RegexpMatchPatterns(replicaKey.Hostname, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
continue
}
if replicaKey.IsValid() {
discoveryQueue.Push(replicaKey)
}
......@@ -286,13 +300,14 @@ func ContinuousDiscovery() {
}
log.Infof("Starting continuous discovery")
recentDiscoveryOperationKeys = cache.New(time.Duration(config.Config.InstancePollSeconds)*time.Second, time.Second)
recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second)
inst.LoadHostnameResolveCache()
go handleDiscoveryRequests()
// Careful: config.Config.GetDiscoveryPollSeconds() is CONSTANT. It can never change.
discoveryTick := time.Tick(time.Duration(config.Config.GetDiscoveryPollSeconds()) * time.Second)
instancePollTick := time.Tick(time.Duration(config.Config.InstancePollSeconds) * time.Second)
instancePollTick := time.Tick(instancePollSecondsDuration())
caretakingTick := time.Tick(time.Minute)
recoveryTick := time.Tick(time.Duration(config.Config.RecoveryPollSeconds) * time.Second)
var snapshotTopologiesTick <-chan time.Time
......@@ -318,7 +333,6 @@ func ContinuousDiscovery() {
// But rather should invoke such routinely operations that need to be as (or roughly as) frequent
// as instance poll
if atomic.LoadInt64(&isElectedNode) == 1 {
go inst.UpdateInstanceRecentRelaylogHistory()
go inst.RecordInstanceCoordinatesHistory()
}
}()
......
......@@ -1185,17 +1185,27 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand
checkAndRecoverFunction = checkAndRecoverDeadIntermediateMaster
case inst.DeadIntermediateMasterWithSingleSlaveFailingToConnect:
checkAndRecoverFunction = checkAndRecoverDeadIntermediateMaster
case inst.DeadIntermediateMasterAndSlaves:
checkAndRecoverFunction = checkAndRecoverGenericProblem
case inst.AllIntermediateMasterSlavesFailingToConnectOrDead:
checkAndRecoverFunction = checkAndRecoverDeadIntermediateMaster
case inst.AllIntermediateMasterSlavesNotReplicating:
checkAndRecoverFunction = nil
case inst.DeadCoMaster:
checkAndRecoverFunction = checkAndRecoverDeadCoMaster
case inst.DeadCoMasterAndSomeSlaves:
checkAndRecoverFunction = checkAndRecoverDeadCoMaster
case inst.DeadMasterAndSlaves:
checkAndRecoverFunction = checkAndRecoverGenericProblem
go emergentlyReadTopologyInstance(&analysisEntry.AnalyzedInstanceMasterKey, analysisEntry.Analysis)
case inst.UnreachableMaster:
checkAndRecoverFunction = checkAndRecoverGenericProblem
go emergentlyReadTopologyInstanceReplicas(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.AllMasterSlavesNotReplicating:
checkAndRecoverFunction = checkAndRecoverGenericProblem
go emergentlyReadTopologyInstance(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.AllMasterSlavesNotReplicatingOrDead:
checkAndRecoverFunction = checkAndRecoverGenericProblem
go emergentlyReadTopologyInstance(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
case inst.FirstTierSlaveFailingToConnectToMaster:
go emergentlyReadTopologyInstance(&analysisEntry.AnalyzedInstanceMasterKey, analysisEntry.Analysis)
......
......@@ -244,16 +244,39 @@ func ExpireBlockedRecoveries() error {
// Older recovery is acknowledged by now, hence blocked recovery should be released.
// Do NOTE that the data in blocked_topology_recovery is only used for auditing: it is NOT the data
// based on which we make automated decisions.
_, err := db.ExecOrchestrator(`
delete
from blocked_topology_recovery
using
blocked_topology_recovery
left join topology_recovery on (blocking_recovery_id = topology_recovery.recovery_id and acknowledged = 0)
query := `
select
blocked_topology_recovery.hostname,
blocked_topology_recovery.port
from
blocked_topology_recovery
left join topology_recovery on (blocking_recovery_id = topology_recovery.recovery_id and acknowledged = 0)
where
acknowledged is null
`
expiredKeys := inst.NewInstanceKeyMap()
err := db.QueryOrchestrator(query, sqlutils.Args(), func(m sqlutils.RowMap) error {
key := inst.InstanceKey{Hostname: m.GetString("hostname"), Port: m.GetInt("port")}
expiredKeys.AddKey(key)
return nil
})
for _, expiredKey := range expiredKeys.GetInstanceKeys() {
_, err := db.ExecOrchestrator(`
delete
from blocked_topology_recovery
where
acknowledged is null
`,
)
hostname = ?
and port = ?
`,
expiredKey.Hostname, expiredKey.Port,
)
if err != nil {
return log.Errore(err)
}
}
if err != nil {
return log.Errore(err)
}
......@@ -284,7 +307,7 @@ func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, w
query := fmt.Sprintf(`
update topology_recovery set
in_active_period = 0,
end_active_period_unixtime = IF(end_active_period_unixtime = 0, UNIX_TIMESTAMP(), end_active_period_unixtime),
end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end,
%s
acknowledged = 1,
acknowledged_at = NOW(),
......
......@@ -25,24 +25,79 @@ import (
// AttemptElection tries to grab leadership (become active node)
func AttemptElection() (bool, error) {
sqlResult, err := db.ExecOrchestrator(`
insert ignore into active_node (
anchor, hostname, token, first_seen_active, last_seen_active
) values (
1, ?, ?, now(), now()
) on duplicate key update
hostname = if(last_seen_active < now() - interval ? second, values(hostname), hostname),
token = if(last_seen_active < now() - interval ? second, values(token), token),
first_seen_active = if(last_seen_active < now() - interval ? second, values(first_seen_active), first_seen_active),
last_seen_active = if(hostname = values(hostname) and token = values(token), values(last_seen_active), last_seen_active)
`,
ThisHostname, ProcessToken.Hash, config.Config.ActiveNodeExpireSeconds, config.Config.ActiveNodeExpireSeconds, config.Config.ActiveNodeExpireSeconds,
)
if err != nil {
return false, log.Errore(err)
{
sqlResult, err := db.ExecOrchestrator(`
insert ignore into active_node (
anchor, hostname, token, first_seen_active, last_seen_active
) values (
1, ?, ?, now(), now()
)
`,
ThisHostname, ProcessToken.Hash,
)
if err != nil {
return false, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
if rows > 0 {
// We managed to insert a row
return true, nil
}
}
{
// takeover from a node that has been inactive
sqlResult, err := db.ExecOrchestrator(`
update active_node set
hostname = ?,
token = ?,
first_seen_active=now(),
last_seen_active=now()
where
anchor = 1
and last_seen_active < (now() - interval ? second)
`,
ThisHostname, ProcessToken.Hash, config.Config.ActiveNodeExpireSeconds,
)
if err != nil {
return false, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
if rows > 0 {
// We managed to update a row: overtaking a previous leader
return true, nil
}
}
{
// Update last_seen_active is this very node is already the active node
sqlResult, err := db.ExecOrchestrator(`
update active_node set
last_seen_active=now()
where
anchor = 1
and hostname = ?
and token = ?
`,
ThisHostname, ProcessToken.Hash,
)
if err != nil {
return false, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
if rows > 0 {
// Reaffirmed our own leadership
return true, nil
}
}
rows, err := sqlResult.RowsAffected()
return (rows > 0), log.Errore(err)
return false, nil
}
// GrabElection forcibly grabs leadership. Use with care!!
......
......@@ -17,7 +17,6 @@
package process
import (
"database/sql"
"time"
"github.com/github/orchestrator/go/config"
......@@ -56,7 +55,7 @@ const (
var continuousRegistrationInitiated bool = false
// RegisterNode writes down this node in the node_health table
func RegisterNode(extraInfo string, command string, firstTime bool) (sql.Result, error) {
func RegisterNode(extraInfo string, command string, firstTime bool) (healthy bool, err error) {
if firstTime {
db.ExecOrchestrator(`
insert ignore into node_health_history
......@@ -68,37 +67,63 @@ func RegisterNode(extraInfo string, command string, firstTime bool) (sql.Result,
config.RuntimeCLIFlags.ConfiguredVersion,
)
}
return db.ExecOrchestrator(`
insert into node_health
{
sqlResult, err := db.ExecOrchestrator(`
update node_health set
last_seen_active = now(),
extra_info = case when ? != '' then ? else extra_info end,
app_version = ?
where
hostname = ?
and token = ?
`,
extraInfo, extraInfo, config.RuntimeCLIFlags.ConfiguredVersion, ThisHostname, ProcessToken.Hash,
)
if err != nil {
return false, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
if rows > 0 {
return true, nil
}
}
{
sqlResult, err := db.ExecOrchestrator(`
insert ignore into node_health
(hostname, token, first_seen_active, last_seen_active, extra_info, command, app_version)
values
(?, ?, now(), now(), ?, ?, ?)
on duplicate key update
token=values(token),
last_seen_active=values(last_seen_active),
extra_info=if(values(extra_info) != '', values(extra_info), extra_info),
app_version=values(app_version)
`,
ThisHostname, ProcessToken.Hash, extraInfo, command,
config.RuntimeCLIFlags.ConfiguredVersion,
)
ThisHostname, ProcessToken.Hash, extraInfo, command,
config.RuntimeCLIFlags.ConfiguredVersion,
)
if err != nil {
return false, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
if rows > 0 {
return true, nil
}
}
return false, nil
}
// HealthTest attempts to write to the backend database and get a result
func HealthTest() (*HealthStatus, error) {
health := HealthStatus{Healthy: false, Hostname: ThisHostname, Token: ProcessToken.Hash}
sqlResult, err := RegisterNode("", "", false)
if err != nil {
health.Error = err
return &health, log.Errore(err)
}
rows, err := sqlResult.RowsAffected()
healthy, err := RegisterNode("", "", false)
if err != nil {
health.Error = err
return &health, log.Errore(err)
}
health.Healthy = (rows > 0)
health.Healthy = healthy
health.ActiveNode, health.IsActiveNode, err = ElectedNode()
if err != nil {
health.Error = err
......
......@@ -57,7 +57,7 @@ $(document).ready(function() {
var changelog = changelogMap[getInstanceId(audit.AnalysisEntry.AnalyzedInstanceKey.Hostname, audit.AnalysisEntry.AnalyzedInstanceKey.Port)];
if (changelog) {
moreInfo += '<div>Changelog :<ul>';
changelog.split(",").reverse().forEach(function(changelogEntry) {
changelog.reverse().forEach(function(changelogEntry) {
var changelogEntryTokens = changelogEntry.split(';');
var changelogEntryTimestamp = changelogEntryTokens[0];
var changelogEntryAnalysis = changelogEntryTokens[1];
......
......@@ -14,6 +14,7 @@ var interestingAnalysis = {
"DeadIntermediateMasterWithSingleSlaveFailingToConnect" : true,
"DeadIntermediateMasterWithSingleSlave" : true,
"DeadIntermediateMasterAndSomeSlaves" : true,
"DeadIntermediateMasterAndSlaves" : true,
"AllIntermediateMasterSlavesFailingToConnectOrDead" : true,
"AllIntermediateMasterSlavesNotReplicating" : true,
"UnreachableIntermediateMaster" : true,
......
......@@ -17,6 +17,6 @@ export GOPATH="$PWD/.gopath"
cd .gopath/src/github.com/github/orchestrator
# We put the binaries directly into the bindir, because we have no need for shim wrappers
go build -o "$bindir/orchestrator" -ldflags "-X main.AppVersion=${version} -X main.BuildDescribe=${describe}" ./go/cmd/orchestrator/main.go
go build -i -o "$bindir/orchestrator" -ldflags "-X main.AppVersion=${version} -X main.BuildDescribe=${describe}" ./go/cmd/orchestrator/main.go
rsync -qa ./resources $bindir/
-- 22295 replicates from 22294
UPDATE database_instance SET last_seen=last_checked - interval 1 minute where master_port=22294;
UPDATE database_instance SET last_seen=last_checked - interval 1 minute where port=22294;
testhost:22294 (cluster testhost:22293): DeadIntermediateMasterAndSlaves
UPDATE database_instance SET version='5.7.8' where port=22297;
UPDATE database_instance SET version='5.7.8', major_version='5.7' where port=22297;
DROP TABLE IF EXISTS database_instance;
CREATE TABLE database_instance (
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
last_checked timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_attempted_check timestamp NOT NULL DEFAULT '1970-12-31 23:00:00',
last_seen timestamp NULL DEFAULT NULL,
uptime int(10) unsigned NOT NULL,
server_id int(10) unsigned NOT NULL,
server_uuid varchar(64) NOT NULL,
version varchar(128) NOT NULL,
binlog_server tinyint(3) unsigned NOT NULL,
read_only tinyint(3) unsigned NOT NULL,
binlog_format varchar(16) NOT NULL,
log_bin tinyint(3) unsigned NOT NULL,
log_slave_updates tinyint(3) unsigned NOT NULL,
binary_log_file varchar(128) NOT NULL,
binary_log_pos bigint(20) unsigned NOT NULL,
master_host varchar(128) NOT NULL,
master_port smallint(5) unsigned NOT NULL,
slave_sql_running tinyint(3) unsigned NOT NULL,
slave_io_running tinyint(3) unsigned NOT NULL,
has_replication_filters tinyint(3) unsigned NOT NULL,
oracle_gtid tinyint(3) unsigned NOT NULL,
executed_gtid_set text NOT NULL,
gtid_purged text NOT NULL,
supports_oracle_gtid tinyint(3) unsigned NOT NULL,
mariadb_gtid tinyint(3) unsigned NOT NULL,
pseudo_gtid tinyint(3) unsigned NOT NULL,
master_log_file varchar(128) NOT NULL,
read_master_log_pos bigint(20) unsigned NOT NULL,
relay_master_log_file varchar(128) NOT NULL,
exec_master_log_pos bigint(20) unsigned NOT NULL,
relay_log_file varchar(128) NOT NULL,
relay_log_pos bigint(20) unsigned NOT NULL,
last_sql_error text NOT NULL,
last_io_error text NOT NULL,
seconds_behind_master bigint(20) unsigned DEFAULT NULL,
slave_lag_seconds bigint(20) unsigned DEFAULT NULL,
sql_delay int(10) unsigned NOT NULL,
allow_tls tinyint(3) unsigned NOT NULL,
num_slave_hosts int(10) unsigned NOT NULL,
slave_hosts text NOT NULL,
cluster_name varchar(128) NOT NULL,
suggested_cluster_alias varchar(128) NOT NULL,
data_center varchar(32) NOT NULL,
physical_environment varchar(32) NOT NULL,
instance_alias varchar(128) NOT NULL,
semi_sync_enforced tinyint(3) unsigned NOT NULL,
replication_depth tinyint(3) unsigned NOT NULL,
is_co_master tinyint(3) unsigned NOT NULL,
replication_credentials_available tinyint(3) unsigned NOT NULL,
has_replication_credentials tinyint(3) unsigned NOT NULL,
PRIMARY KEY (hostname,port),
KEY cluster_name_idx (cluster_name),
KEY last_checked_idx (last_checked),
KEY last_seen_idx (last_seen),
KEY master_host_port_idx (master_host,master_port)
) ENGINE=InnoDB DEFAULT CHARSET=ascii;
-- topology:
--
......@@ -66,10 +7,12 @@ CREATE TABLE database_instance (
-- + 22296
-- + 22297
--
INSERT INTO database_instance VALUES ('testhost',22293,'2016-11-22 09:57:42','2016-11-22 09:57:42','2016-11-22 09:57:42',70189,1,'00022293-1111-1111-1111-111111111111','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000155',4440872,'',0,0,0,0,0,'','',0,0,1,'',0,'',0,'',0,'','',NULL,NULL,0,0,3,'[{\"Hostname\":\"testhost\",\"Port\":22294},{\"Hostname\":\"testhost\",\"Port\":22297},{\"Hostname\":\"testhost\",\"Port\":22296}]','testhost:22293','testhost','ny','','',0,0,0,0,0);
INSERT INTO database_instance VALUES ('testhost',22294,'2016-11-22 09:57:42','2016-11-22 09:57:42','2016-11-22 09:57:42',79929,101,'9a996060-6b8f-11e6-903f-6191b3cde928','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000117',9762775,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000155',4440872,'mysql-bin.000155',4440872,'mysql-relay.000013',4441035,'\"\"','\"\"',0,0,0,0,1,'[{\"Hostname\":\"testhost\",\"Port\":22295}]','testhost:22293','','seattle','','',0,1,0,0,1);
INSERT INTO database_instance VALUES ('testhost',22295,'2016-11-22 09:57:42','2016-11-22 09:57:42','2016-11-22 09:57:42',483348,102,'9dc85926-6b8f-11e6-903f-85211507e568','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000117',59871358,'testhost',22294,1,1,0,0,'','',0,0,1,'mysql-bin.000117',9762775,'mysql-bin.000117',9762775,'mysql-relay.000005',4667810,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','seattle','','',0,2,0,0,1);
INSERT INTO database_instance VALUES ('testhost',22296,'2016-11-22 09:57:42','2016-11-22 09:57:42','2016-11-22 09:57:42',483346,103,'00022296-4444-4444-4444-444444444444','5.6.28',0,0,'STATEMENT',0,1,'',0,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000155',4440872,'mysql-bin.000155',4440872,'mysql-relay.000011',4441035,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','ny','','',0,1,0,0,1);
INSERT INTO database_instance VALUES ('testhost',22297,'2016-11-22 09:57:42','2016-11-22 09:57:42','2016-11-22 09:57:42',483344,104,'00022297-5555-5555-5555-555555555555','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.001001',30564325,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000155',4440872,'mysql-bin.000155',4440872,'mysql-relay.000101',4441035,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','ny','','',0,1,0,0,1);
DELETE FROM database_instance;
INSERT INTO database_instance (hostname, port, last_checked, last_attempted_check, last_seen, uptime, server_id, server_uuid, version, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, oracle_gtid, executed_gtid_set, gtid_purged, supports_oracle_gtid, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, allow_tls, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, instance_alias, semi_sync_enforced, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, version_comment, major_version) VALUES ('testhost',22293,'2017-02-02 08:29:57','2017-02-02 08:29:57','2017-02-02 08:29:57',670447,1,'00022293-1111-1111-1111-111111111111','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000167',137086726,'',0,0,0,0,0,'','',0,0,1,'',0,'',0,'',0,'','',NULL,NULL,0,0,3,'[{\"Hostname\":\"testhost\",\"Port\":22294},{\"Hostname\":\"testhost\",\"Port\":22297},{\"Hostname\":\"testhost\",\"Port\":22296}]','testhost:22293','testhost','ny','','',0,0,0,0,0,'MySQL Community Server (GPL)','5.6');
INSERT INTO database_instance (hostname, port, last_checked, last_attempted_check, last_seen, uptime, server_id, server_uuid, version, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, oracle_gtid, executed_gtid_set, gtid_purged, supports_oracle_gtid, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, allow_tls, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, instance_alias, semi_sync_enforced, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, version_comment, major_version) VALUES ('testhost',22294,'2017-02-02 08:29:57','2017-02-02 08:29:57','2017-02-02 08:29:57',72466,101,'9a996060-6b8f-11e6-903f-6191b3cde928','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000132',15931747,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000167',137086726,'mysql-bin.000167',137086726,'mysql-relay.000029',15978254,'\"\"','\"\"',0,0,0,0,1,'[{\"Hostname\":\"testhost\",\"Port\":22295}]','testhost:22293','','ny','','',0,1,0,0,1,'MySQL Community Server (GPL)','5.6');
INSERT INTO database_instance (hostname, port, last_checked, last_attempted_check, last_seen, uptime, server_id, server_uuid, version, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, oracle_gtid, executed_gtid_set, gtid_purged, supports_oracle_gtid, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, allow_tls, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, instance_alias, semi_sync_enforced, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, version_comment, major_version) VALUES ('testhost',22295,'2017-02-02 08:29:57','2017-02-02 08:29:57','2017-02-02 08:29:57',670442,102,'9dc85926-6b8f-11e6-903f-85211507e568','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.000129',136688950,'testhost',22294,1,1,0,0,'','',0,0,1,'mysql-bin.000132',15931747,'mysql-bin.000132',15931747,'mysql-relay.000002',15868528,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','ny','','',0,2,0,0,1,'MySQL Community Server (GPL)','5.6');
INSERT INTO database_instance (hostname, port, last_checked, last_attempted_check, last_seen, uptime, server_id, server_uuid, version, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, oracle_gtid, executed_gtid_set, gtid_purged, supports_oracle_gtid, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, allow_tls, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, instance_alias, semi_sync_enforced, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, version_comment, major_version) VALUES ('testhost',22296,'2017-02-02 08:29:57','2017-02-02 08:29:57','2017-02-02 08:29:57',670438,103,'00022296-4444-4444-4444-444444444444','5.6.28',0,0,'STATEMENT',0,1,'',0,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000167',137086726,'mysql-bin.000167',137086726,'mysql-relay.000052',137086889,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','ny','','',0,1,0,0,1,'MySQL Community Server (GPL)','5.6');
INSERT INTO database_instance (hostname, port, last_checked, last_attempted_check, last_seen, uptime, server_id, server_uuid, version, binlog_server, read_only, binlog_format, log_bin, log_slave_updates, binary_log_file, binary_log_pos, master_host, master_port, slave_sql_running, slave_io_running, has_replication_filters, oracle_gtid, executed_gtid_set, gtid_purged, supports_oracle_gtid, mariadb_gtid, pseudo_gtid, master_log_file, read_master_log_pos, relay_master_log_file, exec_master_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, seconds_behind_master, slave_lag_seconds, sql_delay, allow_tls, num_slave_hosts, slave_hosts, cluster_name, suggested_cluster_alias, data_center, physical_environment, instance_alias, semi_sync_enforced, replication_depth, is_co_master, replication_credentials_available, has_replication_credentials, version_comment, major_version) VALUES ('testhost',22297,'2017-02-02 08:29:57','2017-02-02 08:29:57','2017-02-02 08:29:57',670435,104,'00022297-5555-5555-5555-555555555555','5.6.28-log',0,0,'STATEMENT',1,1,'mysql-bin.001013',106518535,'testhost',22293,1,1,0,0,'','',0,0,1,'mysql-bin.000167',137086726,'mysql-bin.000167',137086726,'mysql-relay.000041',86111934,'\"\"','\"\"',0,0,0,0,0,'[]','testhost:22293','','seattle','','',0,1,0,0,1,'MySQL Community Server (GPL)','5.6');
DELETE FROM candidate_database_instance;
UPDATE database_instance SET exec_master_log_pos=4 where port in (22294, 22296)
UPDATE database_instance SET exec_master_log_pos=4 where port in (22294, 22296);
......@@ -14,6 +14,8 @@
"MySQLTopologyUseMutualTLS": false,
"MySQLTopologyMaxPoolConnections": 3,
"DatabaselessMode__experimental": false,
"BackendDB": "backend-db-placeholder",
"SQLite3DataFile": "sqlite-data-file-placeholder",
"MySQLOrchestratorHost": "localhost",
"MySQLOrchestratorPort": 3306,
"MySQLOrchestratorDatabase": "test",
......
......@@ -4,23 +4,39 @@
# See https://github.com/github/orchestrator/tree/doc/local-tests.md
#
# Usage: localtests/test/sh [filter]
# Usage: localtests/test/sh [mysql|sqlite] [filter]
# By default, runs all tests. Given filter, will only run tests matching given regep
tests_path=$(dirname $0)
test_logfile=/tmp/orchestrator-test.log
test_outfile=/tmp/orchestrator-test.out
test_diff_file=/tmp/orchestrator-test.diff
test_query_file=/tmp/orchestrator-test.sql
test_config_file=/tmp/orchestrator.conf.json
orchestrator_binary=/tmp/orchestrator-test
exec_command_file=/tmp/orchestrator-test.bash
db_type=""
sqlite_file="/tmp/orchestrator.db"
test_pattern="${1:-.}"
function run_queries() {
queries_file="$1"
verify_mysql() {
if [ "$(mysql test -e "select 1" -ss)" != "1" ] ; then
echo "Cannot verify mysql"
if [ "$db_type" == "sqlite" ] ; then
cat $queries_file | sed -e "s/last_checked - interval 1 minute/datetime('last_checked', '-1 minute')/g" | sqlite3 $sqlite_file
else
# Assume mysql
mysql --default-character-set=utf8mb4 test -ss < $queries_file
fi
}
check_db() {
echo "select 1;" > $test_query_file
query_result="$(run_queries $test_query_file)"
if [ "$query_result" != "1" ] ; then
echo "Cannot execute queries"
exit 1
fi
echo "- check_db OK"
}
exec_cmd() {
......@@ -40,11 +56,11 @@ test_single() {
echo -n "Testing: $test_name"
echo_dot
mysql --default-character-set=utf8mb4 test < $tests_path/create-per-test.sql
run_queries $tests_path/create-per-test.sql
echo_dot
if [ -f $tests_path/$test_name/create.sql ] ; then
mysql --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql
run_queries $tests_path/$test_name/create.sql
fi
extra_args=""
......@@ -53,7 +69,7 @@ test_single() {
fi
#
cmd="$orchestrator_binary \
--config=${tests_path}/orchestrator.conf.json
--config=${test_config_file}
--debug \
--stack \
${extra_args[@]}"
......@@ -65,7 +81,7 @@ test_single() {
execution_result=$?
if [ -f $tests_path/$test_name/destroy.sql ] ; then
mysql --default-character-set=utf8mb4 test < $tests_path/$test_name/destroy.sql
run_queries $tests_path/$test_name/destroy.sql
fi
if [ -f $tests_path/$test_name/expect_failure ] ; then
......@@ -113,12 +129,37 @@ build_binary() {
go build -o $orchestrator_binary go/cmd/orchestrator/main.go
}
deploy_internal_db() {
echo "Deploying db"
cmd="$orchestrator_binary \
--config=${test_config_file}
--debug \
--stack \
-c redeploy-internal-db"
echo_dot
echo $cmd > $exec_command_file
echo_dot
bash $exec_command_file 1> $test_outfile 2> $test_logfile
echo "- deploy_internal_db result: $?"
}
generate_config_file() {
cp ${tests_path}/orchestrator.conf.json ${test_config_file}
sed -i -e "s/backend-db-placeholder/${db_type}/g" ${test_config_file}
sed -i -e "s^sqlite-data-file-placeholder^${sqlite_file}^g" ${test_config_file}
echo "- generate_config_file OK"
}
test_all() {
build_binary
deploy_internal_db
if [ $? -ne 0 ] ; then
echo "ERROR build failed"
echo "ERROR deploy failed"
return 1
fi
echo "- deploy_internal_db OK"
test_pattern="${1:-.}"
find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | xargs ls -td1 | cut -d "/" -f 4 | egrep "$test_pattern" | while read test_name ; do
test_single "$test_name"
if [ $? -ne 0 ] ; then
......@@ -131,5 +172,34 @@ test_all() {
done
}
verify_mysql
test_all
test_db() {
db_type="$1"
echo "### testing via $db_type"
generate_config_file
check_db
test_all ${@:2}
echo "- done testing via $db_type"
}
main() {
build_binary
if [ $? -ne 0 ] ; then
echo "ERROR build failed"
return 1
fi
test_dbs=""
if [ "$1" == "mysql" ] ; then
test_dbs="mysql"
shift
elif [ "$1" == "sqlite" ] ; then
test_dbs="sqlite"
shift
fi
test_dbs=${test_dbs:-"mysql"}
for db in $(echo $test_dbs) ; do
test_db $db "$@"
done
}
main "$@"
......@@ -30,14 +30,6 @@ func (this *regexpMap) process(text string) (result string) {
return this.r.ReplaceAllString(text, this.replacement)
}
var (
createTableCharset = rmap(`(?i)varchar[\s]*[(][\s]*([0-9]+)[\s]*[)] (character set|charset) [\S]+`, `varchar(${1})`)
)
var (
identifyCreateStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`))
)
func rmap(regexpExpression string, replacement string) regexpMap {
return regexpMap{
r: regexp.MustCompile(regexpSpaces(regexpExpression)),
......@@ -46,18 +38,12 @@ func rmap(regexpExpression string, replacement string) regexpMap {
}
func regexpSpaces(statement string) string {
return strings.Replace(statement, " ", `[ ]+`, -1)
return strings.Replace(statement, " ", `[\s]+`, -1)
}
func isCreateTable(statement string) bool {
return identifyCreateStatement.MatchString(statement)
}
func ToSqlite3CreateTable(statement string) (string, error) {
statement = createTableCharset.process(statement)
return statement, nil
}
func ToSqlite3Dialect(statement string) (translated string, err error) {
return statement, err
func applyConversions(statement string, conversions []regexpMap) string {
for _, rmap := range conversions {
statement = rmap.process(statement)
}
return statement
}
/*
Copyright 2017 GitHub Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sqlutils
import (
"testing"
test "github.com/openark/golib/tests"
)
func init() {
}
func TestIsCreateTable(t *testing.T) {
test.S(t).ExpectTrue(isCreateTable("create table t(id int)"))
test.S(t).ExpectTrue(isCreateTable(" create table t(id int)"))
test.S(t).ExpectTrue(isCreateTable("CREATE TABLE t(id int)"))
test.S(t).ExpectTrue(isCreateTable(`
create table t(id int)
`))
test.S(t).ExpectFalse(isCreateTable("where create table t(id int)"))
test.S(t).ExpectFalse(isCreateTable("insert"))
}
func TestToSqlite3CreateTable(t *testing.T) {
{
statement := "create table t(id int)"
result, err := ToSqlite3CreateTable(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(result, statement)
}
{
statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')"
result, err := ToSqlite3CreateTable(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(result, "create table t(id int, v varchar(123) NOT NULL default '')")
}
{
statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')"
result, err := ToSqlite3CreateTable(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(result, "create table t(id int, v varchar(123) NOT NULL default '')")
}
}
/*
Copyright 2017 GitHub Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// What's this about?
// This is a brute-force regular-expression based conversion from MySQL syntax to sqlite3 syntax.
// It is NOT meant to be a general purpose solution and is only expected & confirmed to run on
// queries issued by orchestrator. There are known limitations to this design.
// It's not even pretty.
// In fact...
// Well, it gets the job done at this time. Call it debt.
package sqlutils
import (
"regexp"
)
var sqlite3CreateTableConversions = []regexpMap{
rmap(`(?i) (character set|charset) [\S]+`, ``),
rmap(`(?i)int unsigned`, `int`),
rmap(`(?i)int[\s]*[(][\s]*([0-9]+)[\s]*[)] unsigned`, `int`),
rmap(`(?i)engine[\s]*=[\s]*(innodb|myisam|ndb|memory|tokudb)`, ``),
rmap(`(?i)DEFAULT CHARSET[\s]*=[\s]*[\S]+`, ``),
rmap(`(?i)int( not null|) auto_increment`, `integer`),
rmap(`(?i)comment '[^']*'`, ``),
rmap(`(?i)after [\S]+`, ``),
rmap(`(?i)alter table ([\S]+) add (index|key) ([\S]+) (.+)`, `create index ${3}_${1} on $1 $4`),
rmap(`(?i)alter table ([\S]+) add unique (index|key) ([\S]+) (.+)`, `create unique index ${3}_${1} on $1 $4`),
rmap(`(?i)([\S]+) enum[\s]*([(].*?[)])`, `$1 text check($1 in $2)`),
rmap(`(?i)([\s\S]+[/][*] sqlite3-skip [*][/][\s\S]+)`, ``),
rmap(`(?i)timestamp default current_timestamp`, `timestamp default ('')`),
rmap(`(?i)timestamp not null default current_timestamp`, `timestamp not null default ('')`),
rmap(`(?i)add column (.*int) not null[\s]*$`, `add column $1 not null default 0`),
rmap(`(?i)add column (.* text) not null[\s]*$`, `add column $1 not null default ''`),
rmap(`(?i)add column (.* varchar.*) not null[\s]*$`, `add column $1 not null default ''`),
}
var sqlite3InsertConversions = []regexpMap{
rmap(`(?i)insert ignore`, `insert or ignore`),
rmap(`(?i)now[(][)]`, `datetime('now')`),
rmap(`(?i)insert into ([\s\S]+) on duplicate key update [\s\S]+`, `replace into $1`),
}
var sqlite3GeneralConversions = []regexpMap{
rmap(`(?i)now[(][)][\s]*[-][\s]*interval [?] ([\w]+)`, `datetime('now', printf('-%d $1', ?))`),
rmap(`(?i)now[(][)][\s]*[+][\s]*interval [?] ([\w]+)`, `datetime('now', printf('+%d $1', ?))`),
rmap(`(?i)now[(][)][\s]*[-][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '-${1} $2')`),
rmap(`(?i)now[(][)][\s]*[+][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '+${1} $2')`),
rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[-][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('-%d $2', ?))`),
rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[+][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('+%d $2', ?))`),
rmap(`(?i)unix_timestamp[(][)]`, `strftime('%s', 'now')`),
rmap(`(?i)unix_timestamp[(]([^)]+)[)]`, `strftime('%s', $1)`),
rmap(`(?i)now[(][)]`, `datetime('now')`),
rmap(`(?i)cast[(][\s]*([\S]+) as signed[\s]*[)]`, `cast($1 as integer)`),
rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2)`),
rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2 || $3)`),
rmap(`(?i) rlike `, ` like `),
rmap(`(?i)create index([\s\S]+)[(][\s]*[0-9]+[\s]*[)]([\s\S]+)`, `create index ${1}${2}`),
rmap(`(?i)drop index ([\S]+) on ([\S]+)`, `drop index if exists $1`),
}
var (
sqlite3IdentifyCreateTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`))
sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create( unique|) index`))
sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*drop index`))
sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*alter table`))
sqlite3IdentifyInsertStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*(insert|replace)`))
)
func IsInsert(statement string) bool {
return sqlite3IdentifyInsertStatement.MatchString(statement)
}
func IsCreateTable(statement string) bool {
return sqlite3IdentifyCreateTableStatement.MatchString(statement)
}
func IsCreateIndex(statement string) bool {
return sqlite3IdentifyCreateIndexStatement.MatchString(statement)
}
func IsDropIndex(statement string) bool {
return sqlite3IdentifyDropIndexStatement.MatchString(statement)
}
func IsAlterTable(statement string) bool {
return sqlite3IdentifyAlterTableStatement.MatchString(statement)
}
func ToSqlite3CreateTable(statement string) string {
return applyConversions(statement, sqlite3CreateTableConversions)
}
func ToSqlite3Insert(statement string) string {
return applyConversions(statement, sqlite3InsertConversions)
}
func ToSqlite3Dialect(statement string) (translated string) {
if IsCreateTable(statement) {
return ToSqlite3CreateTable(statement)
}
if IsAlterTable(statement) {
return ToSqlite3CreateTable(statement)
}
statement = applyConversions(statement, sqlite3GeneralConversions)
if IsInsert(statement) {
return ToSqlite3Insert(statement)
}
return statement
}
/*
Copyright 2017 GitHub Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sqlutils
import (
"regexp"
"strings"
"testing"
test "github.com/openark/golib/tests"
)
var spacesRegexp = regexp.MustCompile(`[\s]+`)
func init() {
}
func stripSpaces(statement string) string {
statement = strings.TrimSpace(statement)
statement = spacesRegexp.ReplaceAllString(statement, " ")
return statement
}
func TestIsCreateTable(t *testing.T) {
test.S(t).ExpectTrue(IsCreateTable("create table t(id int)"))
test.S(t).ExpectTrue(IsCreateTable(" create table t(id int)"))
test.S(t).ExpectTrue(IsCreateTable("CREATE TABLE t(id int)"))
test.S(t).ExpectTrue(IsCreateTable(`
create table t(id int)
`))
test.S(t).ExpectFalse(IsCreateTable("where create table t(id int)"))
test.S(t).ExpectFalse(IsCreateTable("insert"))
}
func TestToSqlite3CreateTable(t *testing.T) {
{
statement := "create table t(id int)"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, statement)
}
{
statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, "create table t(id int, v varchar(123) NOT NULL default '')")
}
{
statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, "create table t(id int, v varchar ( 123 ) NOT NULL default '')")
}
{
statement := "create table t(i smallint unsigned)"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, "create table t(i smallint)")
}
{
statement := "create table t(i smallint(5) unsigned)"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, "create table t(i smallint)")
}
{
statement := "create table t(i smallint ( 5 ) unsigned)"
result := ToSqlite3CreateTable(statement)
test.S(t).ExpectEquals(result, "create table t(i smallint)")
}
}
func TestToSqlite3AlterTable(t *testing.T) {
{
statement := `
ALTER TABLE
database_instance
ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER slave_lag_seconds
`
result := stripSpaces(ToSqlite3Dialect(statement))
test.S(t).ExpectEquals(result, stripSpaces(`
ALTER TABLE
database_instance
add column sql_delay int not null default 0
`))
}
{
statement := `
ALTER TABLE
database_instance
ADD INDEX master_host_port_idx (master_host, master_port)
`
result := stripSpaces(ToSqlite3Dialect(statement))
test.S(t).ExpectEquals(result, stripSpaces(`
create index
master_host_port_idx_database_instance
on database_instance (master_host, master_port)
`))
}
{
statement := `
ALTER TABLE
topology_recovery
ADD KEY last_detection_idx (last_detection_id)
`
result := stripSpaces(ToSqlite3Dialect(statement))
test.S(t).ExpectEquals(result, stripSpaces(`
create index
last_detection_idx_topology_recovery
on topology_recovery (last_detection_id)
`))
}
}
func TestCreateIndex(t *testing.T) {
{
statement := `
create index
master_host_port_idx_database_instance
on database_instance (master_host(128), master_port)
`
result := stripSpaces(ToSqlite3Dialect(statement))
test.S(t).ExpectEquals(result, stripSpaces(`
create index
master_host_port_idx_database_instance
on database_instance (master_host, master_port)
`))
}
}
func TestIsInsert(t *testing.T) {
test.S(t).ExpectTrue(IsInsert("insert into t"))
test.S(t).ExpectTrue(IsInsert("insert ignore into t"))
test.S(t).ExpectTrue(IsInsert(`
insert ignore into t
`))
test.S(t).ExpectFalse(IsInsert("where create table t(id int)"))
test.S(t).ExpectFalse(IsInsert("create table t(id int)"))
test.S(t).ExpectTrue(IsInsert(`
insert into
cluster_domain_name (cluster_name, domain_name, last_registered)
values
(?, ?, datetime('now'))
on duplicate key update
domain_name=values(domain_name),
last_registered=values(last_registered)
`))
}
func TestToSqlite3Insert(t *testing.T) {
{
statement := `
insert into
cluster_domain_name (cluster_name, domain_name, last_registered)
values
(?, ?, datetime('now'))
on duplicate key update
domain_name=values(domain_name),
last_registered=values(last_registered)
`
result := stripSpaces(ToSqlite3Dialect(statement))
test.S(t).ExpectEquals(result, stripSpaces(`
replace into
cluster_domain_name (cluster_name, domain_name, last_registered)
values
(?, ?, datetime('now'))
`))
}
}
func TestToSqlite3GeneralConversions(t *testing.T) {
{
statement := "select now()"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select datetime('now')")
}
{
statement := "select now() - interval ? second"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select datetime('now', printf('-%d second', ?))")
}
{
statement := "select now() + interval ? minute"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select datetime('now', printf('+%d minute', ?))")
}
{
statement := "select now() + interval 5 minute"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select datetime('now', '+5 minute')")
}
{
statement := "select some_table.some_column + interval ? minute"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select datetime(some_table.some_column, printf('+%d minute', ?))")
}
{
statement := "AND master_instance.last_attempted_check <= master_instance.last_seen + interval ? minute"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "AND master_instance.last_attempted_check <= datetime(master_instance.last_seen, printf('+%d minute', ?))")
}
{
statement := "select concat(master_instance.port, '') as port"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select (master_instance.port || '') as port")
}
{
statement := "select concat( 'abc' , 'def') as s"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select ('abc' || 'def') as s")
}
{
statement := "select concat( 'abc' , 'def', last.col) as s"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select ('abc' || 'def' || last.col) as s")
}
{
statement := "select concat(myself.only) as s"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select concat(myself.only) as s")
}
{
statement := "select concat(1, '2', 3, '4') as s"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select concat(1, '2', 3, '4') as s")
}
{
statement := "select group_concat( 'abc' , 'def') as s"
result := ToSqlite3Dialect(statement)
test.S(t).ExpectEquals(result, "select group_concat( 'abc' , 'def') as s")
}
}
......@@ -127,21 +127,33 @@ var knownDBsMutex = &sync.Mutex{}
// GetDB returns a DB instance based on uri.
// bool result indicates whether the DB was returned from cache; err
func GetDB(mysql_uri string) (*sql.DB, bool, error) {
func GetGenericDB(driverName, dataSourceName string) (*sql.DB, bool, error) {
knownDBsMutex.Lock()
defer func() {
knownDBsMutex.Unlock()
}()
var exists bool
if _, exists = knownDBs[mysql_uri]; !exists {
if db, err := sql.Open("mysql", mysql_uri); err == nil {
knownDBs[mysql_uri] = db
if _, exists = knownDBs[dataSourceName]; !exists {
if db, err := sql.Open(driverName, dataSourceName); err == nil {
knownDBs[dataSourceName] = db
} else {
return db, exists, err
}
}
return knownDBs[mysql_uri], exists, nil
return knownDBs[dataSourceName], exists, nil
}
// GetDB returns a MySQL DB instance based on uri.
// bool result indicates whether the DB was returned from cache; err
func GetDB(mysql_uri string) (*sql.DB, bool, error) {
return GetGenericDB("mysql", mysql_uri)
}
// GetDB returns a SQLite DB instance based on DB file name.
// bool result indicates whether the DB was returned from cache; err
func GetSQLiteDB(dbFile string) (*sql.DB, bool, error) {
return GetGenericDB("sqlite3", dbFile)
}
// RowToArray is a convenience function, typically not called directly, which maps a
......
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package context defines the Context type, which carries deadlines,
// cancelation signals, and other request-scoped values across API boundaries
// and between processes.
//
// Incoming requests to a server should create a Context, and outgoing calls to
// servers should accept a Context. The chain of function calls between must
// propagate the Context, optionally replacing it with a modified copy created
// using WithDeadline, WithTimeout, WithCancel, or WithValue.
//
// Programs that use Contexts should follow these rules to keep interfaces
// consistent across packages and enable static analysis tools to check context
// propagation:
//
// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
// func DoSomething(ctx context.Context, arg Arg) error {
// // ... use ctx ...
// }
//
// Do not pass a nil Context, even if a function permits it. Pass context.TODO
// if you are unsure about which Context to use.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The same Context may be passed to functions running in different goroutines;
// Contexts are safe for simultaneous use by multiple goroutines.
//
// See http://blog.golang.org/context for example code for a server that uses
// Contexts.
package context // import "golang.org/x/net/context"
import "time"
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter). TODO is recognized by static analysis tools that determine
// whether Contexts are propagated correctly in a program.
func TODO() Context {
return todo
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package context
import (
"fmt"
"math/rand"
"runtime"
"strings"
"sync"
"testing"
"time"
)
// otherContext is a Context that's not one of the types defined in context.go.
// This lets us test code paths that differ based on the underlying type of the
// Context.
type otherContext struct {
Context
}
func TestBackground(t *testing.T) {
c := Background()
if c == nil {
t.Fatalf("Background returned nil")
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
if got, want := fmt.Sprint(c), "context.Background"; got != want {
t.Errorf("Background().String() = %q want %q", got, want)
}
}
func TestTODO(t *testing.T) {
c := TODO()
if c == nil {
t.Fatalf("TODO returned nil")
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
if got, want := fmt.Sprint(c), "context.TODO"; got != want {
t.Errorf("TODO().String() = %q want %q", got, want)
}
}
func TestWithCancel(t *testing.T) {
c1, cancel := WithCancel(Background())
if got, want := fmt.Sprint(c1), "context.Background.WithCancel"; got != want {
t.Errorf("c1.String() = %q want %q", got, want)
}
o := otherContext{c1}
c2, _ := WithCancel(o)
contexts := []Context{c1, o, c2}
for i, c := range contexts {
if d := c.Done(); d == nil {
t.Errorf("c[%d].Done() == %v want non-nil", i, d)
}
if e := c.Err(); e != nil {
t.Errorf("c[%d].Err() == %v want nil", i, e)
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
}
cancel()
time.Sleep(100 * time.Millisecond) // let cancelation propagate
for i, c := range contexts {
select {
case <-c.Done():
default:
t.Errorf("<-c[%d].Done() blocked, but shouldn't have", i)
}
if e := c.Err(); e != Canceled {
t.Errorf("c[%d].Err() == %v want %v", i, e, Canceled)
}
}
}
func TestParentFinishesChild(t *testing.T) {
// Context tree:
// parent -> cancelChild
// parent -> valueChild -> timerChild
parent, cancel := WithCancel(Background())
cancelChild, stop := WithCancel(parent)
defer stop()
valueChild := WithValue(parent, "key", "value")
timerChild, stop := WithTimeout(valueChild, 10000*time.Hour)
defer stop()
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
case x := <-cancelChild.Done():
t.Errorf("<-cancelChild.Done() == %v want nothing (it should block)", x)
case x := <-timerChild.Done():
t.Errorf("<-timerChild.Done() == %v want nothing (it should block)", x)
case x := <-valueChild.Done():
t.Errorf("<-valueChild.Done() == %v want nothing (it should block)", x)
default:
}
// The parent's children should contain the two cancelable children.
pc := parent.(*cancelCtx)
cc := cancelChild.(*cancelCtx)
tc := timerChild.(*timerCtx)
pc.mu.Lock()
if len(pc.children) != 2 || !pc.children[cc] || !pc.children[tc] {
t.Errorf("bad linkage: pc.children = %v, want %v and %v",
pc.children, cc, tc)
}
pc.mu.Unlock()
if p, ok := parentCancelCtx(cc.Context); !ok || p != pc {
t.Errorf("bad linkage: parentCancelCtx(cancelChild.Context) = %v, %v want %v, true", p, ok, pc)
}
if p, ok := parentCancelCtx(tc.Context); !ok || p != pc {
t.Errorf("bad linkage: parentCancelCtx(timerChild.Context) = %v, %v want %v, true", p, ok, pc)
}
cancel()
pc.mu.Lock()
if len(pc.children) != 0 {
t.Errorf("pc.cancel didn't clear pc.children = %v", pc.children)
}
pc.mu.Unlock()
// parent and children should all be finished.
check := func(ctx Context, name string) {
select {
case <-ctx.Done():
default:
t.Errorf("<-%s.Done() blocked, but shouldn't have", name)
}
if e := ctx.Err(); e != Canceled {
t.Errorf("%s.Err() == %v want %v", name, e, Canceled)
}
}
check(parent, "parent")
check(cancelChild, "cancelChild")
check(valueChild, "valueChild")
check(timerChild, "timerChild")
// WithCancel should return a canceled context on a canceled parent.
precanceledChild := WithValue(parent, "key", "value")
select {
case <-precanceledChild.Done():
default:
t.Errorf("<-precanceledChild.Done() blocked, but shouldn't have")
}
if e := precanceledChild.Err(); e != Canceled {
t.Errorf("precanceledChild.Err() == %v want %v", e, Canceled)
}
}
func TestChildFinishesFirst(t *testing.T) {
cancelable, stop := WithCancel(Background())
defer stop()
for _, parent := range []Context{Background(), cancelable} {
child, cancel := WithCancel(parent)
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
case x := <-child.Done():
t.Errorf("<-child.Done() == %v want nothing (it should block)", x)
default:
}
cc := child.(*cancelCtx)
pc, pcok := parent.(*cancelCtx) // pcok == false when parent == Background()
if p, ok := parentCancelCtx(cc.Context); ok != pcok || (ok && pc != p) {
t.Errorf("bad linkage: parentCancelCtx(cc.Context) = %v, %v want %v, %v", p, ok, pc, pcok)
}
if pcok {
pc.mu.Lock()
if len(pc.children) != 1 || !pc.children[cc] {
t.Errorf("bad linkage: pc.children = %v, cc = %v", pc.children, cc)
}
pc.mu.Unlock()
}
cancel()
if pcok {
pc.mu.Lock()
if len(pc.children) != 0 {
t.Errorf("child's cancel didn't remove self from pc.children = %v", pc.children)
}
pc.mu.Unlock()
}
// child should be finished.
select {
case <-child.Done():
default:
t.Errorf("<-child.Done() blocked, but shouldn't have")
}
if e := child.Err(); e != Canceled {
t.Errorf("child.Err() == %v want %v", e, Canceled)
}
// parent should not be finished.
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
default:
}
if e := parent.Err(); e != nil {
t.Errorf("parent.Err() == %v want nil", e)
}
}
}
func testDeadline(c Context, wait time.Duration, t *testing.T) {
select {
case <-time.After(wait):
t.Fatalf("context should have timed out")
case <-c.Done():
}
if e := c.Err(); e != DeadlineExceeded {
t.Errorf("c.Err() == %v want %v", e, DeadlineExceeded)
}
}
func TestDeadline(t *testing.T) {
t.Parallel()
const timeUnit = 500 * time.Millisecond
c, _ := WithDeadline(Background(), time.Now().Add(1*timeUnit))
if got, prefix := fmt.Sprint(c), "context.Background.WithDeadline("; !strings.HasPrefix(got, prefix) {
t.Errorf("c.String() = %q want prefix %q", got, prefix)
}
testDeadline(c, 2*timeUnit, t)
c, _ = WithDeadline(Background(), time.Now().Add(1*timeUnit))
o := otherContext{c}
testDeadline(o, 2*timeUnit, t)
c, _ = WithDeadline(Background(), time.Now().Add(1*timeUnit))
o = otherContext{c}
c, _ = WithDeadline(o, time.Now().Add(3*timeUnit))
testDeadline(c, 2*timeUnit, t)
}
func TestTimeout(t *testing.T) {
t.Parallel()
const timeUnit = 500 * time.Millisecond
c, _ := WithTimeout(Background(), 1*timeUnit)
if got, prefix := fmt.Sprint(c), "context.Background.WithDeadline("; !strings.HasPrefix(got, prefix) {
t.Errorf("c.String() = %q want prefix %q", got, prefix)
}
testDeadline(c, 2*timeUnit, t)
c, _ = WithTimeout(Background(), 1*timeUnit)
o := otherContext{c}
testDeadline(o, 2*timeUnit, t)
c, _ = WithTimeout(Background(), 1*timeUnit)
o = otherContext{c}
c, _ = WithTimeout(o, 3*timeUnit)
testDeadline(c, 2*timeUnit, t)
}
func TestCanceledTimeout(t *testing.T) {
t.Parallel()
const timeUnit = 500 * time.Millisecond
c, _ := WithTimeout(Background(), 2*timeUnit)
o := otherContext{c}
c, cancel := WithTimeout(o, 4*timeUnit)
cancel()
time.Sleep(1 * timeUnit) // let cancelation propagate
select {
case <-c.Done():
default:
t.Errorf("<-c.Done() blocked, but shouldn't have")
}
if e := c.Err(); e != Canceled {
t.Errorf("c.Err() == %v want %v", e, Canceled)
}
}
type key1 int
type key2 int
var k1 = key1(1)
var k2 = key2(1) // same int as k1, different type
var k3 = key2(3) // same type as k2, different int
func TestValues(t *testing.T) {
check := func(c Context, nm, v1, v2, v3 string) {
if v, ok := c.Value(k1).(string); ok == (len(v1) == 0) || v != v1 {
t.Errorf(`%s.Value(k1).(string) = %q, %t want %q, %t`, nm, v, ok, v1, len(v1) != 0)
}
if v, ok := c.Value(k2).(string); ok == (len(v2) == 0) || v != v2 {
t.Errorf(`%s.Value(k2).(string) = %q, %t want %q, %t`, nm, v, ok, v2, len(v2) != 0)
}
if v, ok := c.Value(k3).(string); ok == (len(v3) == 0) || v != v3 {
t.Errorf(`%s.Value(k3).(string) = %q, %t want %q, %t`, nm, v, ok, v3, len(v3) != 0)
}
}
c0 := Background()
check(c0, "c0", "", "", "")
c1 := WithValue(Background(), k1, "c1k1")
check(c1, "c1", "c1k1", "", "")
if got, want := fmt.Sprint(c1), `context.Background.WithValue(1, "c1k1")`; got != want {
t.Errorf("c.String() = %q want %q", got, want)
}
c2 := WithValue(c1, k2, "c2k2")
check(c2, "c2", "c1k1", "c2k2", "")
c3 := WithValue(c2, k3, "c3k3")
check(c3, "c2", "c1k1", "c2k2", "c3k3")
c4 := WithValue(c3, k1, nil)
check(c4, "c4", "", "c2k2", "c3k3")
o0 := otherContext{Background()}
check(o0, "o0", "", "", "")
o1 := otherContext{WithValue(Background(), k1, "c1k1")}
check(o1, "o1", "c1k1", "", "")
o2 := WithValue(o1, k2, "o2k2")
check(o2, "o2", "c1k1", "o2k2", "")
o3 := otherContext{c4}
check(o3, "o3", "", "c2k2", "c3k3")
o4 := WithValue(o3, k3, nil)
check(o4, "o4", "", "c2k2", "")
}
func TestAllocs(t *testing.T) {
bg := Background()
for _, test := range []struct {
desc string
f func()
limit float64
gccgoLimit float64
}{
{
desc: "Background()",
f: func() { Background() },
limit: 0,
gccgoLimit: 0,
},
{
desc: fmt.Sprintf("WithValue(bg, %v, nil)", k1),
f: func() {
c := WithValue(bg, k1, nil)
c.Value(k1)
},
limit: 3,
gccgoLimit: 3,
},
{
desc: "WithTimeout(bg, 15*time.Millisecond)",
f: func() {
c, _ := WithTimeout(bg, 15*time.Millisecond)
<-c.Done()
},
limit: 8,
gccgoLimit: 16,
},
{
desc: "WithCancel(bg)",
f: func() {
c, cancel := WithCancel(bg)
cancel()
<-c.Done()
},
limit: 5,
gccgoLimit: 8,
},
{
desc: "WithTimeout(bg, 100*time.Millisecond)",
f: func() {
c, cancel := WithTimeout(bg, 100*time.Millisecond)
cancel()
<-c.Done()
},
limit: 8,
gccgoLimit: 25,
},
} {
limit := test.limit
if runtime.Compiler == "gccgo" {
// gccgo does not yet do escape analysis.
// TODO(iant): Remove this when gccgo does do escape analysis.
limit = test.gccgoLimit
}
if n := testing.AllocsPerRun(100, test.f); n > limit {
t.Errorf("%s allocs = %f want %d", test.desc, n, int(limit))
}
}
}
func TestSimultaneousCancels(t *testing.T) {
root, cancel := WithCancel(Background())
m := map[Context]CancelFunc{root: cancel}
q := []Context{root}
// Create a tree of contexts.
for len(q) != 0 && len(m) < 100 {
parent := q[0]
q = q[1:]
for i := 0; i < 4; i++ {
ctx, cancel := WithCancel(parent)
m[ctx] = cancel
q = append(q, ctx)
}
}
// Start all the cancels in a random order.
var wg sync.WaitGroup
wg.Add(len(m))
for _, cancel := range m {
go func(cancel CancelFunc) {
cancel()
wg.Done()
}(cancel)
}
// Wait on all the contexts in a random order.
for ctx := range m {
select {
case <-ctx.Done():
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for <-ctx.Done(); stacks:\n%s", buf[:n])
}
}
// Wait for all the cancel functions to return.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for cancel functions; stacks:\n%s", buf[:n])
}
}
func TestInterlockedCancels(t *testing.T) {
parent, cancelParent := WithCancel(Background())
child, cancelChild := WithCancel(parent)
go func() {
parent.Done()
cancelChild()
}()
cancelParent()
select {
case <-child.Done():
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for child.Done(); stacks:\n%s", buf[:n])
}
}
func TestLayersCancel(t *testing.T) {
testLayers(t, time.Now().UnixNano(), false)
}
func TestLayersTimeout(t *testing.T) {
testLayers(t, time.Now().UnixNano(), true)
}
func testLayers(t *testing.T, seed int64, testTimeout bool) {
rand.Seed(seed)
errorf := func(format string, a ...interface{}) {
t.Errorf(fmt.Sprintf("seed=%d: %s", seed, format), a...)
}
const (
timeout = 200 * time.Millisecond
minLayers = 30
)
type value int
var (
vals []*value
cancels []CancelFunc
numTimers int
ctx = Background()
)
for i := 0; i < minLayers || numTimers == 0 || len(cancels) == 0 || len(vals) == 0; i++ {
switch rand.Intn(3) {
case 0:
v := new(value)
ctx = WithValue(ctx, v, v)
vals = append(vals, v)
case 1:
var cancel CancelFunc
ctx, cancel = WithCancel(ctx)
cancels = append(cancels, cancel)
case 2:
var cancel CancelFunc
ctx, cancel = WithTimeout(ctx, timeout)
cancels = append(cancels, cancel)
numTimers++
}
}
checkValues := func(when string) {
for _, key := range vals {
if val := ctx.Value(key).(*value); key != val {
errorf("%s: ctx.Value(%p) = %p want %p", when, key, val, key)
}
}
}
select {
case <-ctx.Done():
errorf("ctx should not be canceled yet")
default:
}
if s, prefix := fmt.Sprint(ctx), "context.Background."; !strings.HasPrefix(s, prefix) {
t.Errorf("ctx.String() = %q want prefix %q", s, prefix)
}
t.Log(ctx)
checkValues("before cancel")
if testTimeout {
select {
case <-ctx.Done():
case <-time.After(timeout + 100*time.Millisecond):
errorf("ctx should have timed out")
}
checkValues("after timeout")
} else {
cancel := cancels[rand.Intn(len(cancels))]
cancel()
select {
case <-ctx.Done():
default:
errorf("ctx should be canceled")
}
checkValues("after cancel")
}
}
func TestCancelRemoves(t *testing.T) {
checkChildren := func(when string, ctx Context, want int) {
if got := len(ctx.(*cancelCtx).children); got != want {
t.Errorf("%s: context has %d children, want %d", when, got, want)
}
}
ctx, _ := WithCancel(Background())
checkChildren("after creation", ctx, 0)
_, cancel := WithCancel(ctx)
checkChildren("with WithCancel child ", ctx, 1)
cancel()
checkChildren("after cancelling WithCancel child", ctx, 0)
ctx, _ = WithCancel(Background())
checkChildren("after creation", ctx, 0)
_, cancel = WithTimeout(ctx, 60*time.Minute)
checkChildren("with WithTimeout child ", ctx, 1)
cancel()
checkChildren("after cancelling WithTimeout child", ctx, 0)
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !plan9,go1.7
package ctxhttp
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"context"
)
func TestGo17Context(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "ok")
}))
ctx := context.Background()
resp, err := Get(ctx, http.DefaultClient, ts.URL)
if resp == nil || err != nil {
t.Fatalf("error received from client: %v %v", err, resp)
}
resp.Body.Close()
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
func nop() {}
var (
testHookContextDoneBeforeHeaders = nop
testHookDoReturned = nop
testHookDidBodyClose = nop
)
// Do sends an HTTP request with the provided http.Client and returns an HTTP response.
// If the client is nil, http.DefaultClient is used.
// If the context is canceled or times out, ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
// TODO(djd): Respect any existing value of req.Cancel.
cancel := make(chan struct{})
req.Cancel = cancel
type responseAndError struct {
resp *http.Response
err error
}
result := make(chan responseAndError, 1)
// Make local copies of test hooks closed over by goroutines below.
// Prevents data races in tests.
testHookDoReturned := testHookDoReturned
testHookDidBodyClose := testHookDidBodyClose
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
close(cancel)
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
c := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(cancel)
case <-c:
// The response's Body is closed.
}
}()
resp.Body = &notifyingReader{resp.Body, c}
return resp, nil
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// notifyingReader is an io.ReadCloser that closes the notify channel after
// Close is called or a Read fails on the underlying ReadCloser.
type notifyingReader struct {
io.ReadCloser
notify chan<- struct{}
}
func (r *notifyingReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err != nil && r.notify != nil {
close(r.notify)
r.notify = nil
}
return n, err
}
func (r *notifyingReader) Close() error {
err := r.ReadCloser.Close()
if r.notify != nil {
close(r.notify)
r.notify = nil
}
return err
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !plan9,!go1.7
package ctxhttp
import (
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"golang.org/x/net/context"
)
// golang.org/issue/14065
func TestClosesResponseBodyOnCancel(t *testing.T) {
defer func() { testHookContextDoneBeforeHeaders = nop }()
defer func() { testHookDoReturned = nop }()
defer func() { testHookDidBodyClose = nop }()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
// closed when Do enters select case <-ctx.Done()
enteredDonePath := make(chan struct{})
testHookContextDoneBeforeHeaders = func() {
close(enteredDonePath)
}
testHookDoReturned = func() {
// We now have the result (the Flush'd headers) at least,
// so we can cancel the request.
cancel()
// But block the client.Do goroutine from sending
// until Do enters into the <-ctx.Done() path, since
// otherwise if both channels are readable, select
// picks a random one.
<-enteredDonePath
}
sawBodyClose := make(chan struct{})
testHookDidBodyClose = func() { close(sawBodyClose) }
tr := &http.Transport{}
defer tr.CloseIdleConnections()
c := &http.Client{Transport: tr}
req, _ := http.NewRequest("GET", ts.URL, nil)
_, doErr := Do(ctx, c, req)
select {
case <-sawBodyClose:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for body to close")
}
if doErr != ctx.Err() {
t.Errorf("Do error = %v; want %v", doErr, ctx.Err())
}
}
type noteCloseConn struct {
net.Conn
onceClose sync.Once
closefn func()
}
func (c *noteCloseConn) Close() error {
c.onceClose.Do(c.closefn)
return c.Conn.Close()
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !plan9
package ctxhttp
import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"
"golang.org/x/net/context"
)
const (
requestDuration = 100 * time.Millisecond
requestBody = "ok"
)
func okHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(requestDuration)
io.WriteString(w, requestBody)
}
func TestNoTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(okHandler))
defer ts.Close()
ctx := context.Background()
res, err := Get(ctx, nil, ts.URL)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
slurp, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if string(slurp) != requestBody {
t.Errorf("body = %q; want %q", slurp, requestBody)
}
}
func TestCancelBeforeHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
blockServer := make(chan struct{})
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cancel()
<-blockServer
io.WriteString(w, requestBody)
}))
defer ts.Close()
defer close(blockServer)
res, err := Get(ctx, nil, ts.URL)
if err == nil {
res.Body.Close()
t.Fatal("Get returned unexpected nil error")
}
if err != context.Canceled {
t.Errorf("err = %v; want %v", err, context.Canceled)
}
}
func TestCancelAfterHangingRequest(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
<-w.(http.CloseNotifier).CloseNotify()
}))
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
resp, err := Get(ctx, nil, ts.URL)
if err != nil {
t.Fatalf("unexpected error in Get: %v", err)
}
// Cancel befer reading the body.
// Reading Request.Body should fail, since the request was
// canceled before anything was written.
cancel()
done := make(chan struct{})
go func() {
b, err := ioutil.ReadAll(resp.Body)
if len(b) != 0 || err == nil {
t.Errorf(`Read got (%q, %v); want ("", error)`, b, err)
}
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Errorf("Test timed out")
case <-done:
}
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
package context
import (
"context" // standard library's context, as of Go 1.7
"time"
)
var (
todo = context.TODO()
background = context.Background()
)
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = context.Canceled
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = context.DeadlineExceeded
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
ctx, f := context.WithCancel(parent)
return ctx, CancelFunc(f)
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
ctx, f := context.WithDeadline(parent, deadline)
return ctx, CancelFunc(f)
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return context.WithValue(parent, key, val)
}
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package context
import (
"errors"
"fmt"
"sync"
"time"
)
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, c)
return c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) *cancelCtx {
return &cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
*cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package context_test
import (
"fmt"
"time"
"golang.org/x/net/context"
)
func ExampleWithTimeout() {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
select {
case <-time.After(200 * time.Millisecond):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}
// Output:
// context deadline exceeded
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册