TSDBJNIConnector.java 9.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/***************************************************************************
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 *****************************************************************************/
package com.taosdata.jdbc;

import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;

public class TSDBJNIConnector {
S
Shuaiqiang Chang 已提交
22
    private static volatile Boolean isInitialized = false;
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

    static {
        System.loadLibrary("taos");
        System.out.println("java.library.path:" + System.getProperty("java.library.path"));
    }

    /**
     * Connection pointer used in C
     */
    private long taos = TSDBConstants.JNI_NULL_POINTER;

    /**
     * Result set pointer for the current connection
     */
    private long taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;

    /**
     * result set status in current connection
     */
    private boolean isResultsetClosed = true;
    private int affectedRows = -1;

    /**
     * Whether the connection is closed
     */
    public boolean isClosed() {
        return this.taos == TSDBConstants.JNI_NULL_POINTER;
    }

    /**
     * Returns the status of last result set in current connection
     */
    public boolean isResultsetClosed() {
        return this.isResultsetClosed;
    }

    /**
     * Initialize static variables in JNI to optimize performance
     */
    public static void init(String configDir, String locale, String charset, String timezone) throws SQLWarning {
        synchronized (isInitialized) {
            if (!isInitialized) {
                initImp(configDir);
                if (setOptions(0, locale) < 0) {
                    throw new SQLWarning(TSDBConstants.WrapErrMsg("Failed to set locale: " + locale + ". System default will be used."));
                }
H
hzcheng 已提交
69 70 71
                if (setOptions(1, charset) < 0) {
                    throw new SQLWarning(TSDBConstants.WrapErrMsg("Failed to set charset: " + charset + ". System default will be used."));
                }
72 73 74 75 76 77 78 79
                if (setOptions(2, timezone) < 0) {
                    throw new SQLWarning(TSDBConstants.WrapErrMsg("Failed to set timezone: " + timezone + ". System default will be used."));
                }
                isInitialized = true;
                TaosGlobalConfig.setCharset(getTsCharset());
            }
        }
    }
H
hzcheng 已提交
80

81
    public static native void initImp(String configDir);
H
hzcheng 已提交
82

83
    public static native int setOptions(int optionIndex, String optionValue);
H
hzcheng 已提交
84 85 86

    public static native String getTsCharset();

87 88 89 90 91 92 93 94 95 96 97 98 99
    /**
     * Get connection pointer
     *
     * @throws SQLException
     */
    public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
        if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
            this.closeConnectionImp(this.taos);
            this.taos = TSDBConstants.JNI_NULL_POINTER;
        }

        this.taos = this.connectImp(host, port, dbName, user, password);
        if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
dengyihao's avatar
dengyihao 已提交
100
            throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l));
101 102 103 104 105 106 107 108 109 110 111 112
        }

        return true;
    }

    private native long connectImp(String host, int port, String dbName, String user, String password);

    /**
     * Execute DML/DDL operation
     *
     * @throws SQLException
     */
S
Shuaiqiang Chang 已提交
113
    public long executeQuery(String sql) throws SQLException {
H
hzcheng 已提交
114
        if (!this.isResultsetClosed) {
115
            freeResultSet(taosResultSetPointer);
H
hzcheng 已提交
116
        }
117

dengyihao's avatar
dengyihao 已提交
118
        Long pSql = 0l;
H
hzcheng 已提交
119
        try {
S
Shuaiqiang Chang 已提交
120
            pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
H
hzcheng 已提交
121 122
        } catch (Exception e) {
            e.printStackTrace();
S
Shuaiqiang Chang 已提交
123
            this.freeResultSet(pSql);
H
hzcheng 已提交
124 125
            throw new SQLException(TSDBConstants.WrapErrMsg("Unsupported encoding"));
        }
S
Shuaiqiang Chang 已提交
126
        int code = this.getErrCode(pSql);
dengyihao's avatar
dengyihao 已提交
127
        if (code != 0) {
H
hzcheng 已提交
128
            affectedRows = -1;
dengyihao's avatar
dengyihao 已提交
129 130 131
            String err_msg = this.getErrMsg(pSql);
            this.freeResultSet(pSql);
            throw new SQLException(TSDBConstants.WrapErrMsg(err_msg), "", code);
H
hzcheng 已提交
132
        }
J
jyhou 已提交
133

134
        // Try retrieving result set for the executed SQL using the current connection pointer. If the executed
J
jyhou 已提交
135 136
        // SQL is a DML/DDL which doesn't return a result set, then taosResultSetPointer should be 0L. Otherwise,
        // taosResultSetPointer should be a non-zero value.
S
Shuaiqiang Chang 已提交
137
        taosResultSetPointer = this.getResultSetImp(this.taos, pSql);
J
jyhou 已提交
138 139 140
        if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
            isResultsetClosed = false;
        }
S
Shuaiqiang Chang 已提交
141
        return pSql;
142
    }
H
hzcheng 已提交
143

S
Shuaiqiang Chang 已提交
144
    private native long executeQueryImp(byte[] sqlBytes, long connection);
H
hzcheng 已提交
145

146 147 148
    /**
     * Get recent error code by connection
     */
dengyihao's avatar
dengyihao 已提交
149 150
    public int getErrCode(long pSql) {
        return this.getErrCodeImp(this.taos, pSql);
151
    }
H
hzcheng 已提交
152

dengyihao's avatar
dengyihao 已提交
153
    private native int getErrCodeImp(long connection, long pSql);
H
hzcheng 已提交
154

155 156 157
    /**
     * Get recent error message by connection
     */
dengyihao's avatar
dengyihao 已提交
158 159
    public String getErrMsg(long pSql) {
        return this.getErrMsgImp(pSql);
160
    }
H
hzcheng 已提交
161

dengyihao's avatar
dengyihao 已提交
162
    private native String getErrMsgImp(long pSql);
H
hzcheng 已提交
163

164 165
    /**
     * Get resultset pointer
H
hzcheng 已提交
166
     * Each connection should have a single open result set at a time
167 168
     */
    public long getResultSet() {
J
jyhou 已提交
169
        return taosResultSetPointer;
170
    }
H
hzcheng 已提交
171

S
Shuaiqiang Chang 已提交
172
    private native long getResultSetImp(long connection, long pSql);
H
hzcheng 已提交
173

174 175 176 177
    /**
     * Free resultset operation from C to release resultset pointer by JNI
     */
    public int freeResultSet(long result) {
J
jyhou 已提交
178 179 180
        int res = TSDBConstants.JNI_SUCCESS;
        if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
            throw new RuntimeException("Invalid result set pointer");
181
        } else if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
J
jyhou 已提交
182 183 184
            res = this.freeResultSetImp(this.taos, result);
            isResultsetClosed = true; // reset resultSetPointer to 0 after freeResultSetImp() return
            taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
S
slguan 已提交
185
        } else {
S
slguan 已提交
186
            isResultsetClosed = true;
187
        }
J
jyhou 已提交
188
        return res;
189
    }
H
hzcheng 已提交
190

J
jyhou 已提交
191 192 193 194
    /**
     * Close the open result set which is associated to the current connection. If the result set is already
     * closed, return 0 for success.
     */
195 196
    public int freeResultSet() {
        int resCode = TSDBConstants.JNI_SUCCESS;
J
jyhou 已提交
197 198 199
        if (!isResultsetClosed) {
            resCode = this.freeResultSetImp(this.taos, this.taosResultSetPointer);
            taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
200
            isResultsetClosed = true;
J
jyhou 已提交
201 202 203 204
        }
        return resCode;
    }

205
    private native int freeResultSetImp(long connection, long result);
H
hzcheng 已提交
206

207 208 209
    /**
     * Get affected rows count
     */
S
Shuaiqiang Chang 已提交
210
    public int getAffectedRows(long pSql) {
211
        int affectedRows = this.affectedRows;
H
hzcheng 已提交
212
        if (affectedRows < 0) {
S
Shuaiqiang Chang 已提交
213
            affectedRows = this.getAffectedRowsImp(this.taos, pSql);
H
hzcheng 已提交
214 215
        }
        return affectedRows;
216 217
    }

S
Shuaiqiang Chang 已提交
218
    private native int getAffectedRowsImp(long connection, long pSql);
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245

    /**
     * Get schema metadata
     */
    public int getSchemaMetaData(long resultSet, List<ColumnMetaData> columnMetaData) {
        return this.getSchemaMetaDataImp(this.taos, resultSet, columnMetaData);
    }

    private native int getSchemaMetaDataImp(long connection, long resultSet, List<ColumnMetaData> columnMetaData);

    /**
     * Get one row data
     */
    public int fetchRow(long resultSet, TSDBResultSetRowData rowData) {
        return this.fetchRowImp(this.taos, resultSet, rowData);
    }

    private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);

    /**
     * Execute close operation from C to release connection pointer by JNI
     *
     * @throws SQLException
     */
    public void closeConnection() throws SQLException {
        int code = this.closeConnectionImp(this.taos);
        if (code < 0) {
dengyihao's avatar
dengyihao 已提交
246
            throw new SQLException(TSDBConstants.FixErrMsg(code), "", this.getErrCode(0l));
247 248 249 250
        } else if (code == 0) {
            this.taos = TSDBConstants.JNI_NULL_POINTER;
        } else {
            throw new SQLException("Undefined error code returned by TDengine when closing a connection");
H
hzcheng 已提交
251
        }
252 253 254 255 256 257 258
    }

    private native int closeConnectionImp(long connection);

    /**
     * Subscribe to a table in TSDB
     */
S
Shuaiqiang Chang 已提交
259 260
    public long subscribe(String topic, String sql, boolean restart, int period) {
        return subscribeImp(this.taos, restart, topic, sql, period);
261 262
    }

S
Shuaiqiang Chang 已提交
263
    public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
264 265 266 267

    /**
     * Consume a subscribed table
     */
S
Shuaiqiang Chang 已提交
268
    public long consume(long subscription) {
269 270 271
        return this.consumeImp(subscription);
    }

S
Shuaiqiang Chang 已提交
272
    private native long consumeImp(long subscription);
273 274 275 276 277 278

    /**
     * Unsubscribe a table
     *
     * @param subscription
     */
S
Shuaiqiang Chang 已提交
279 280
    public void unsubscribe(long subscription, boolean isKeep) {
        unsubscribeImp(subscription, isKeep);
281 282
    }

S
Shuaiqiang Chang 已提交
283
    private native void unsubscribeImp(long subscription, boolean isKeep);
284 285 286 287 288 289 290

    /**
     * Validate if a <I>create table</I> sql statement is correct without actually creating that table
     */
    public boolean validateCreateTableSql(String sql) {
        long connection = taos;
        int res = validateCreateTableSqlImp(connection, sql.getBytes());
H
hzcheng 已提交
291
        return res != 0 ? false : true;
292
    }
H
hzcheng 已提交
293

294
    private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
H
hzcheng 已提交
295
}