未验证 提交 4cdcaba9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3295 from taosdata/feature/crash_gen

Fixed a problem with crash_gen tool, so that it releases DB connections properly
......@@ -161,6 +161,21 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
# Before we fetch the task and run it, let's ensure we properly "use" the database
if (gConfig.per_thread_db_connection): # most likely TRUE
if not self._dbConn.isOpen: # might have been closed during server auto-restart
self.useDb() # might encounter exceptions. TODO: catch
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready
# ignore
dummy = 0
print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
# Fetch a task from the Thread Coordinator
logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask()
......@@ -324,10 +339,12 @@ class ThreadCoordinator:
logger.debug("[STT] transition ended")
# Due to limitation (or maybe not) of the Python library,
# we cannot share connections across threads
if sm.hasDatabase():
for t in self._pool.threadList:
logger.debug("[DB] use db for all worker threads")
# Here we are in main thread, we cannot operate the connections created in workers
# Moving below to task loop
# if sm.hasDatabase():
# for t in self._pool.threadList:
# logger.debug("[DB] use db for all worker threads")
# t.useDb()
# t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
......@@ -387,7 +404,7 @@ class ThreadCoordinator:
transitionFailed = self._doTransition() # To start, we end step -1 first
except taos.error.ProgrammingError as err:
transitionFailed = True
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
errno2 = Helper.convertErrno(err.errno) # correct error scheme
errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
......@@ -468,6 +485,10 @@ class ThreadCoordinator:
# We define a class to run a number of threads in locking steps.
class Helper:
def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno
class ThreadPool:
def __init__(self, numThreads, maxSteps):
......@@ -613,8 +634,7 @@ class DbConn:
def resetDb(self): # reset the whole database, etc.
if (not self.isOpen):
raise RuntimeError(
"Cannot reset database until connection is open")
raise RuntimeError("Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc.
self.execute('drop database if exists db')
......@@ -681,8 +701,7 @@ class DbConnRest(DbConn):
def close(self):
if (not self.isOpen):
raise RuntimeError(
"Cannot clean up database until connection is open")
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
logger.debug("[DB] REST Database connection closed")
self.isOpen = False
......@@ -747,27 +766,32 @@ class DbConnRest(DbConn):
class MyTDSql:
def __init__(self):
def __init__(self, hostAddr, cfgPath):
# Make the DB connection
self._conn = taos.connect(host=hostAddr, config=cfgPath)
self._cursor = self._conn.cursor()
self.queryRows = 0
self.queryCols = 0
self.affectedRows = 0
def init(self, cursor, log=True):
self.cursor = cursor
# def init(self, cursor, log=True):
# self.cursor = cursor
# if (log):
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# self.cursor.log(caller.filename + ".sql")
def close(self):
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
def query(self, sql):
self.sql = sql
self.queryResult = self.cursor.fetchall()
self.queryResult = self._cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
self.queryCols = len(self._cursor.description)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
......@@ -778,7 +802,7 @@ class MyTDSql:
def execute(self, sql):
self.sql = sql
self.affectedRows = self.cursor.execute(sql)
self.affectedRows = self._cursor.execute(sql)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
......@@ -791,13 +815,13 @@ class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
totalConnections = 0 # Not private
def __init__(self):
self._type = self.TYPE_NATIVE
self._conn = None
self._cursor = None
# self._cursor = None
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......@@ -814,7 +838,8 @@ class DbConnNative(DbConn):
buildPath = root[:len(root) - len("/build/bin")]
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath))
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
.format(selfPath, projPath))
return buildPath
......@@ -822,33 +847,40 @@ class DbConnNative(DbConn):
cfgPath = self.getBuildPath() + "/test/cfg"
hostAddr = ""
with self._lock: # force single threading for opening DB connections
if not self._connInfoDisplayed:
self.__class__._connInfoDisplayed = True # updating CLASS variable
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
if not cls._connInfoDisplayed:
cls._connInfoDisplayed = True # updating CLASS variable
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
# Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor()
# Record the count in the class
self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
cls.totalConnections += 1
self._cursor.execute('reset query cache')
self._tdSql.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every
# Open connection
self._tdSql = MyTDSql()
# self._tdSql = MyTDSql()
# self._tdSql.init(self._cursor)
def close(self):
if (not self.isOpen):
raise RuntimeError(
"Cannot clean up database until connection is open")
raise RuntimeError("Cannot clean up database until connection is open")
# Decrement the class wide counter
cls = self.__class__ # Get the class, to access class variables
with cls._lock:
cls.totalConnections -= 1
logger.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
raise RuntimeError("Cannot execute database commands until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
......@@ -1528,7 +1560,7 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
errno2 = Helper.convertErrno(err.errno)
if (gConfig.continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql()))
......@@ -1678,9 +1710,8 @@ class ExecutionStats:
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
"| Top numbers written: {}".format(
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections))
......@@ -1789,7 +1820,7 @@ class TdSuperTable:
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
......@@ -1891,7 +1922,7 @@ class TaskReadData(StateTransitionTask):
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName()))
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
......@@ -1920,9 +1951,8 @@ class TaskDropSuperTable(StateTransitionTask):
self.execWtSql(wt, "drop table {}".format(
regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err:
# correcting for strange error number scheme
errno2 = err.errno if (
err.errno > 0) else 0x80000000 + err.errno
# correcting for strange error number scheme
errno2 = Helper.convertErrno(err.errno)
if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False
......@@ -2429,7 +2459,11 @@ class ServiceManagerThread:
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
line = line.decode("utf-8").rstrip()
except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(line))
# This might block, and then causing "out" buffer to block
......@@ -2455,7 +2489,7 @@ class ServiceManagerThread:
def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''):
print("\nTD Svc STDERR: {}".format(line))
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
class TdeSubProcess:
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册