提交 f8e20ec3 编写于 作者: S Steven Li

Adjusted crash_gen tool directory structure, added valgrind suppression file...

Adjusted crash_gen tool directory structure, added valgrind suppression file to expose client side memory leaks
上级 a4852c16
......@@ -43,10 +43,23 @@ TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
# Then let us set up the library path so that our compiled SO file can be loaded by Python
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
python3.8 ./crash_gen.py $@
if [[ $1 == '--valgrind' ]]; then
shift
export PYTHONMALLOC=malloc
# How to generate valgrind suppression file: https://stackoverflow.com/questions/17159578/generating-suppressions-for-memory-leaks
# valgrind --leak-check=full --gen-suppressions=all --log-fd=9 python3.8 ./crash_gen.py $@ 9>>memcheck.log
valgrind \
--leak-check=yes \
--suppressions=crash_gen/valgrind_taos.supp \
python3.8 \
./crash_gen/crash_gen.py $@
else
python3.8 ./crash_gen/crash_gen.py $@
fi
......@@ -15,7 +15,6 @@
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
......@@ -42,6 +41,9 @@ import os
import io
import signal
import traceback
import resource
from guppy import hpy
import gc
try:
import psutil
......@@ -382,6 +384,14 @@ class ThreadCoordinator:
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not gConfig.debug: # print this only if we are not in debug mode
print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
# memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
# if (self._curStep % 10) == 3:
# h = hpy()
# print("\n")
# print(h.heap())
try:
self._syncAtBarrier() # For now just cross the barrier
......@@ -434,6 +444,19 @@ class ThreadCoordinator:
logger.info("\nAll worker threads finished")
self._execStats.endExec()
def cleanup(self): # free resources
self._pool.cleanup()
self._pool = None
self._te = None
self._dbManager = None
self._executedTasks = None
self._lock = None
self._stepBarrier = None
self._execStats = None
self._runStatus = None
def printStats(self):
self._execStats.printStats()
......@@ -525,6 +548,9 @@ class ThreadPool:
logger.debug("Joining thread...")
workerThread._thread.join()
def cleanup(self):
self.threadList = None # maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
......@@ -812,6 +838,7 @@ class MyTDSql:
# self.cursor.log(caller.filename + ".sql")
def close(self):
self._cursor.close() # can we double close?
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
self._cursor.close()
......@@ -1819,7 +1846,7 @@ class ExecutionStats:
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
self._elapsedTime))
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections))
logger.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
logger.info("| Longest native query time: {:.3f} seconds, started: {}".
format(MyTDSql.longestQueryTime,
time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
......@@ -2322,7 +2349,7 @@ class SvcManager:
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self.svcMgrThread = None
self.svcMgrThread = None # type: ServiceManagerThread
self._lock = threading.Lock()
self._isRestarting = False
......@@ -2409,13 +2436,12 @@ class SvcManager:
time.sleep(2.0)
proc.kill()
# print("Process: {}".format(proc.name()))
self.svcMgrThread = ServiceManagerThread() # create the object
print("Attempting to start TAOS service started, printing out output...")
self.svcMgrThread.start()
self.svcMgrThread.procIpcBatch(
trimToTarget=10,
forceOutput=True) # for printing 10 lines
self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines=20):
......@@ -2464,7 +2490,7 @@ class ServiceManagerThread:
MAX_QUEUE_SIZE = 10000
def __init__(self):
self._tdeSubProcess = None
self._tdeSubProcess = None # type: TdeSubProcess
self._thread = None
self._status = None
......@@ -2495,13 +2521,13 @@ class ServiceManagerThread:
self._tdeSubProcess.start()
self._ipcQueue = Queue()
self._thread = threading.Thread(
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
self._thread2 = threading.Thread(
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
self._thread2.daemon = True # thread dies with the program
......@@ -2866,6 +2892,7 @@ class ClientManager:
def run(self, svcMgr):
# self._printLastNumbers()
global gConfig
dbManager = DbManager() # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
......@@ -2876,15 +2903,37 @@ class ClientManager:
# print("TC failed = {}".format(self.tc.isFailed()))
if svcMgr: # gConfig.auto_start_service:
svcMgr.stopTaosService()
svcMgr = None
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
# Linux return code: ref https://shapeshed.com/unix-exit-codes/
return 1 if self.tc.isFailed() else 0
ret = 1 if self.tc.isFailed() else 0
self.tc.cleanup()
# Release global variables
gConfig = None
gSvcMgr = None
logger = None
# Release variables here
self.tc = None
thPool = None
dbManager = None
gc.collect() # force garbage collection
# h = hpy()
# print("\n----- Final Python Heap -----\n")
# print(h.heap())
return ret
def conclude(self):
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
self.tc.printStats()
self.tc.getDbManager().cleanUp()
class MainExec:
STATUS_STARTING = 1
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册