提交 402bf83e 编写于 作者: S slguan

Some python scripts are not uploaded

上级 7c927553
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
apilevel = '2.0'
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
\ No newline at end of file
import ctypes
from .constants import FieldType
from .error import *
import math
import datetime
def _convert_millisecond_to_datetime(milli):
return datetime.datetime.fromtimestamp(milli/1000.0)
def _convert_microsecond_to_datetime(micro):
return datetime.datetime.fromtimestamp(micro/1000000.0)
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
_timstamp_converter = _convert_millisecond_to_datetime
if micro:
_timstamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
else:
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)] ]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
else:
return [ None if ele.value == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
"""
assert(nbytes is not None)
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
else:
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
except ValueError:
res.append(None)
return res
# if num_of_rows > 0:
# for i in range(abs(num_of_rows)):
# try:
# res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
# except ValueError:
# res.append(None)
# return res
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]]
# else:
# return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]]
_CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [('name', ctypes.c_char * 64),
('bytes', ctypes.c_short),
('type', ctypes.c_char)]
# C interface class
class CTaosInterface(object):
libtaos = ctypes.CDLL('libtaos.so')
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
def __init__(self, config=None):
'''
Function to initialize the class
@host : str, hostname to connect
@user : str, username to connect to server
@password : str, password to connect to server
@db : str, default db to use when log in
@config : str, config directory
@rtype : None
'''
if config is None:
self._config = ctypes.c_char_p(None)
else:
try:
self._config = ctypes.c_char_p(config.encode('utf-8'))
except AttributeError:
raise AttributeError("config is expected as a str")
if config != None:
CTaosInterface.libtaos.taos_options(3, self._config)
CTaosInterface.libtaos.taos_init()
@property
def config(self):
""" Get current config
"""
return self._config
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
'''
Function to connect to server
@rtype: c_void_p, TDengine handle
'''
# host
try:
_host = ctypes.c_char_p(host.encode(
"utf-8")) if host != None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("host is expected as a str")
# user
try:
_user = ctypes.c_char_p(user.encode("utf-8"))
except AttributeError:
raise AttributeError("user is expected as a str")
# password
try:
_password = ctypes.c_char_p(password.encode("utf-8"))
except AttributeError:
raise AttributeError("password is expected as a str")
# db
try:
_db = ctypes.c_char_p(
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("db is expected as a str")
# port
try:
_port = ctypes.c_int(port)
except TypeError:
raise TypeError("port is expected as an int")
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
_host, _user, _password, _db, _port))
if connection.value == None:
print('connect to TDengine failed')
# sys.exit(1)
else:
print('connect to TDengine success')
return connection
@staticmethod
def close(connection):
'''Close the TDengine handle
'''
CTaosInterface.libtaos.taos_close(connection)
print('connection is closed')
@staticmethod
def query(connection, sql):
'''Run SQL
@sql: str, sql string to run
@rtype: 0 on success and -1 on failure
'''
try:
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
except AttributeError:
raise AttributeError("sql is expected as a string")
finally:
CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
@staticmethod
def useResult(connection):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
return None, 0
blocks = [None] * len(fields)
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def freeResult(result):
CTaosInterface.libtaos.taos_free_result(result)
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
@staticmethod
def fetchFields(result):
return CTaosInterface.libtaos.taos_fetch_fields(result)
# @staticmethod
# def fetchRow(result, fields):
# l = []
# row = CTaosInterface.libtaos.taos_fetch_row(result)
# if not row:
# return None
# for i in range(len(fields)):
# l.append(CTaosInterface.getDataValue(
# row[i], fields[i]['type'], fields[i]['bytes']))
# return tuple(l)
# @staticmethod
# def getDataValue(data, dtype, byte):
# '''
# '''
# if not data:
# return None
# if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
@staticmethod
def errStr(connection):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection)
\ No newline at end of file
# from .cursor import TDengineCursor
from .cursor import TDengineCursor
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
\ No newline at end of file
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_SMALLINT_NULL = -32768
C_INT_NULL = -2147483648
C_BIGINT_NULL = -9223372036854775808
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Time precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = None
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchBlock(self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._connection.clear_result_set()
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._connection.clear_result_set()
self._reset_result()
stmt = operation
if params is not None:
pass
res = CTaosInterface.query(self._connection._conn, stmt)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
return CTaosInterface.affectedRows(self._connection._conn)
else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
if num_of_fields == 0: break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
self._connection.clear_result_set()
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = None
self._rowcount = -1
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append((ele['name'], ele['type'], None, None, None, None, False))
return self._result
\ No newline at end of file
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
\ No newline at end of file
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
\ No newline at end of file
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
apilevel = '2.0'
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
\ No newline at end of file
import ctypes
from .constants import FieldType
from .error import *
import math
import datetime
def _convert_millisecond_to_datetime(milli):
return datetime.datetime.fromtimestamp(milli/1000.0)
def _convert_microsecond_to_datetime(micro):
return datetime.datetime.fromtimestamp(micro/1000000.0)
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
_timestamp_converter = _convert_millisecond_to_datetime
if micro:
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)] ]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
else:
return [ None if ele.value == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
"""
assert(nbytes is not None)
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
else:
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
except ValueError:
res.append(None)
return res
# if num_of_rows > 0:
# for i in range(abs(num_of_rows)):
# try:
# res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
# except ValueError:
# res.append(None)
# return res
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]]
# else:
# return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]]
_CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [('name', ctypes.c_char * 64),
('bytes', ctypes.c_short),
('type', ctypes.c_char)]
# C interface class
class CTaosInterface(object):
libtaos = ctypes.CDLL('libtaos.so')
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
def __init__(self, config=None):
'''
Function to initialize the class
@host : str, hostname to connect
@user : str, username to connect to server
@password : str, password to connect to server
@db : str, default db to use when log in
@config : str, config directory
@rtype : None
'''
if config is None:
self._config = ctypes.c_char_p(None)
else:
try:
self._config = ctypes.c_char_p(config.encode('utf-8'))
except AttributeError:
raise AttributeError("config is expected as a str")
if config != None:
CTaosInterface.libtaos.taos_options(3, self._config)
CTaosInterface.libtaos.taos_init()
@property
def config(self):
""" Get current config
"""
return self._config
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
'''
Function to connect to server
@rtype: c_void_p, TDengine handle
'''
# host
try:
_host = ctypes.c_char_p(host.encode(
"utf-8")) if host != None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("host is expected as a str")
# user
try:
_user = ctypes.c_char_p(user.encode("utf-8"))
except AttributeError:
raise AttributeError("user is expected as a str")
# password
try:
_password = ctypes.c_char_p(password.encode("utf-8"))
except AttributeError:
raise AttributeError("password is expected as a str")
# db
try:
_db = ctypes.c_char_p(
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("db is expected as a str")
# port
try:
_port = ctypes.c_int(port)
except TypeError:
raise TypeError("port is expected as an int")
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
_host, _user, _password, _db, _port))
if connection.value == None:
print('connect to TDengine failed')
# sys.exit(1)
else:
print('connect to TDengine success')
return connection
@staticmethod
def close(connection):
'''Close the TDengine handle
'''
CTaosInterface.libtaos.taos_close(connection)
print('connection is closed')
@staticmethod
def query(connection, sql):
'''Run SQL
@sql: str, sql string to run
@rtype: 0 on success and -1 on failure
'''
try:
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
except AttributeError:
raise AttributeError("sql is expected as a string")
finally:
CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
@staticmethod
def useResult(connection):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def freeResult(result):
CTaosInterface.libtaos.taos_free_result(result)
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
@staticmethod
def fetchFields(result):
return CTaosInterface.libtaos.taos_fetch_fields(result)
# @staticmethod
# def fetchRow(result, fields):
# l = []
# row = CTaosInterface.libtaos.taos_fetch_row(result)
# if not row:
# return None
# for i in range(len(fields)):
# l.append(CTaosInterface.getDataValue(
# row[i], fields[i]['type'], fields[i]['bytes']))
# return tuple(l)
# @staticmethod
# def getDataValue(data, dtype, byte):
# '''
# '''
# if not data:
# return None
# if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
@staticmethod
def errStr(connection):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8')
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
result, des = CTaosInterface.useResult(conn)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
print(data)
cinter.close(conn)
\ No newline at end of file
# from .cursor import TDengineCursor
from .cursor import TDengineCursor
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
\ No newline at end of file
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_SMALLINT_NULL = -32768
C_INT_NULL = -2147483648
C_BIGINT_NULL = -9223372036854775808
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = None
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchBlock(self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._connection.clear_result_set()
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._connection.clear_result_set()
self._reset_result()
stmt = operation
if params is not None:
pass
res = CTaosInterface.query(self._connection._conn, stmt)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
return CTaosInterface.affectedRows(self._connection._conn)
else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
if num_of_fields == 0: break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
self._connection.clear_result_set()
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = None
self._rowcount = -1
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append((ele['name'], ele['type'], None, None, None, None, False))
return self._result
\ No newline at end of file
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
\ No newline at end of file
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册