提交 9eb22a37 编写于 作者: S slguan

Increase the comparison test between InfluxDB and TDengine

上级 e5edd507
package com.taosdata.generator;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Random;
public class DataGenerator {
/*
* to simulate the change action of humidity The valid range of humidity is
* [0, 100]
*/
public static class ValueGen {
int center;
int range;
Random rand;
public ValueGen(int center, int range) {
this.center = center;
this.range = range;
this.rand = new Random();
}
double next() {
double v = this.rand.nextGaussian();
if (v < -3) {
v = -3;
}
if (v > 3) {
v = 3;
}
return (this.range / 3.00) * v + center;
}
}
// data scale
private static int timestep = 1000; // sample time interval in milliseconds
private static long dataStartTime = 1563249700000L;
private static int deviceId = 0;
private static String tagPrefix = "dev_";
// MachineNum RowsPerMachine MachinesInOneFile
public static void main(String args[]) {
int numOfDevice = 10000;
int numOfFiles = 100;
int rowsPerDevice = 10000;
String directory = "~/";
for (int i = 0; i < args.length; i++) {
if (args[i].equalsIgnoreCase("-numOfDevices")) {
if (i < args.length - 1) {
numOfDevice = Integer.parseInt(args[++i]);
} else {
System.out.println("'-numOfDevices' requires a parameter, default is 10000");
}
} else if (args[i].equalsIgnoreCase("-numOfFiles")) {
if (i < args.length - 1) {
numOfFiles = Integer.parseInt(args[++i]);
} else {
System.out.println("'-numOfFiles' requires a parameter, default is 100");
}
} else if (args[i].equalsIgnoreCase("-rowsPerDevice")) {
if (i < args.length - 1) {
rowsPerDevice = Integer.parseInt(args[++i]);
} else {
System.out.println("'-rowsPerDevice' requires a parameter, default is 10000");
}
} else if (args[i].equalsIgnoreCase("-dataDir")) {
if (i < args.length - 1) {
directory = args[++i];
} else {
System.out.println("'-dataDir' requires a parameter, default is ~/testdata");
}
}
}
System.out.println("parameters");
System.out.printf("----dataDir:%s\n", directory);
System.out.printf("----numOfFiles:%d\n", numOfFiles);
System.out.printf("----numOfDevice:%d\n", numOfDevice);
System.out.printf("----rowsPerDevice:%d\n", rowsPerDevice);
int numOfDevPerFile = numOfDevice / numOfFiles;
long ts = dataStartTime;
// deviceId, time stamp, humid(int), temp(double), tagString(dev_deviceid)
int humidityDistRadius = 35;
int tempDistRadius = 17;
for (int i = 0; i < numOfFiles; ++i) { // prepare the data file
dataStartTime = ts;
// generate file name
String path = directory;
try {
path += "/testdata" + String.valueOf(i) + ".csv";
getDataInOneFile(path, rowsPerDevice, numOfDevPerFile, humidityDistRadius, tempDistRadius);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void getDataInOneFile(String path, int rowsPerDevice, int num, int humidityDistRadius, int tempDistRadius) throws IOException {
DecimalFormat df = new DecimalFormat("0.0000");
long startTime = dataStartTime;
FileWriter fw = new FileWriter(new File(path));
BufferedWriter bw = new BufferedWriter(fw);
for (int i = 0; i < num; ++i) {
deviceId += 1;
Random rand = new Random();
double centralVal = Math.abs(rand.nextInt(100));
if (centralVal < humidityDistRadius) {
centralVal = humidityDistRadius;
}
if (centralVal + humidityDistRadius > 100) {
centralVal = 100 - humidityDistRadius;
}
DataGenerator.ValueGen humidityDataGen = new DataGenerator.ValueGen((int) centralVal, humidityDistRadius);
dataStartTime = startTime;
centralVal = Math.abs(rand.nextInt(22));
DataGenerator.ValueGen tempDataGen = new DataGenerator.ValueGen((int) centralVal, tempDistRadius);
for (int j = 0; j < rowsPerDevice; ++j) {
int humidity = (int) humidityDataGen.next();
double temp = tempDataGen.next();
int deviceGroup = deviceId % 100;
StringBuffer sb = new StringBuffer();
sb.append(deviceId).append(" ").append(tagPrefix).append(deviceId).append(" ").append(deviceGroup)
.append(" ").append(dataStartTime).append(" ").append(humidity).append(" ")
.append(df.format(temp));
bw.write(sb.toString());
bw.write("\n");
dataStartTime += timestep;
}
}
bw.close();
fw.close();
System.out.printf("file:%s generated\n", path);
}
}
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb1-client/v2"
)
type ProArgs struct {
host string
username string
password string
db string
sql string
dataDir string
filesNum int
writeClients int
rowsPerRequest int
}
type WriteInfo struct {
threadId int
sID int
eID int
}
type StatisInfo struct {
totalRows int64
}
var statis StatisInfo
func main() {
// Configuration
var arguments ProArgs
// Parse options
flag.StringVar(&(arguments.host), "host", "http://localhost:8086", "Server host to connect")
flag.StringVar(&(arguments.db), "db", "db", "DB to insert data")
flag.StringVar(&(arguments.username), "user", "", "Username used to connect to server")
flag.StringVar(&(arguments.password), "pass", "", "Password used to connect to server")
flag.StringVar(&(arguments.sql), "sql", "./sqlCmd.txt", "File name of SQL commands")
flag.StringVar(&(arguments.dataDir), "dataDir", "./testdata", "Raw csv data")
flag.IntVar(&(arguments.filesNum), "numOfFiles", 10, "Number of files int dataDir ")
flag.IntVar(&(arguments.writeClients), "writeClients", 0, "Number of write clients")
flag.IntVar(&(arguments.rowsPerRequest), "rowsPerRequest", 100, "Number of rows per request")
flag.Parse()
statis.totalRows = 0
if arguments.writeClients > 0 {
writeData(&arguments)
} else {
readData(&arguments)
}
}
func writeData(arguments *ProArgs) {
log.Println("write data")
log.Println("---- writeClients:", arguments.writeClients)
log.Println("---- dataDir:", arguments.dataDir)
log.Println("---- numOfFiles:", arguments.filesNum)
log.Println("---- rowsPerRequest:", arguments.rowsPerRequest)
var wg sync.WaitGroup
wg.Add(arguments.writeClients)
st := time.Now()
a := arguments.filesNum / arguments.writeClients
b := arguments.filesNum % arguments.writeClients
last := 0
for i := 0; i < arguments.writeClients; i++ {
var wInfo WriteInfo
wInfo.threadId = i + 1
wInfo.sID = last
if i < b {
wInfo.eID = last + a
} else {
wInfo.eID = last + a - 1
}
last = wInfo.eID + 1
go writeDataImp(&wInfo, &wg, arguments)
}
wg.Wait()
elapsed := time.Since(st)
seconds := float64(elapsed) / float64(time.Second)
log.Println("---- Spent", seconds, "seconds to insert", statis.totalRows, "records, speed:", float64(statis.totalRows)/seconds, "Rows/Second")
}
func writeDataImp(wInfo *WriteInfo, wg *sync.WaitGroup, arguments *ProArgs) {
defer wg.Done()
log.Println("Thread", wInfo.threadId, "writing sID", wInfo.sID, "eID", wInfo.eID)
// Connect to the server
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr: arguments.host,
Username: arguments.username,
Password: arguments.password,
})
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create database
_, err = queryDB(conn, fmt.Sprintf("create database %s", arguments.db), arguments.db)
if err != nil {
log.Fatal(err)
}
// Write data
counter := 0
totalRecords := 0
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: arguments.db,
Precision: "ms",
})
if err != nil {
log.Fatal(err)
}
for j := wInfo.sID; j <= wInfo.eID; j++ {
fileName := fmt.Sprintf("%s/testdata%d.csv", arguments.dataDir, j)
fs, err := os.Open(fileName)
if err != nil {
log.Printf("failed to open file %s", fileName)
log.Fatal(err)
}
log.Printf("open file %s success", fileName)
bfRd := bufio.NewReader(fs)
for {
sline, err := bfRd.ReadString('\n')
if err != nil {
break
}
sline = strings.TrimSuffix(sline, "\n")
s := strings.Split(sline, " ")
if len(s) != 6 {
continue
}
// Create a point and add to batch
tags := map[string]string{
"devid": s[0],
"devname": s[1],
"devgroup": s[2],
}
timestamp, _ := strconv.ParseInt(s[3], 10, 64)
temperature, _ := strconv.ParseInt(s[4], 10, 32)
humidity, _ := strconv.ParseFloat(s[5], 64)
fields := map[string]interface{}{
"temperature": temperature,
"humidity": humidity,
}
pt, err := client.NewPoint("devices", tags, fields, time.Unix(0, timestamp * int64(time.Millisecond)))
if err != nil {
log.Fatalln("Error: ", err)
}
bp.AddPoint(pt)
counter++
if counter >= arguments.rowsPerRequest {
if err := conn.Write(bp); err != nil {
log.Fatal(err)
}
totalRecords += counter
counter = 0
bp, err = client.NewBatchPoints(client.BatchPointsConfig{
Database: arguments.db,
Precision: "ms",
})
if err != nil {
log.Fatal(err)
}
}
}
fs.Close()
}
totalRecords += counter
if counter > 0 {
if err := conn.Write(bp); err != nil {
log.Fatal(err)
}
}
atomic.AddInt64(&statis.totalRows, int64(totalRecords))
}
func readData(arguments *ProArgs) {
log.Println("read data")
log.Println("---- sql:", arguments.sql)
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr: arguments.host,
Username: arguments.username,
Password: arguments.password,
})
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fs, err := os.Open(arguments.sql)
if err != nil {
log.Printf("failed to open file %s", arguments.sql)
log.Fatal(err)
}
log.Printf("open file %s success", arguments.sql)
bfRd := bufio.NewReader(fs)
for {
sline, err := bfRd.ReadString('\n')
if err != nil {
break
}
sline = strings.TrimSuffix(sline, "\n")
st := time.Now()
_, err = queryDB(conn, sline, arguments.db)
if err != nil {
log.Fatal(err)
}
elapsed := time.Since(st)
seconds := float64(elapsed) / float64(time.Second)
log.Println("---- Spent", seconds, "seconds to query ", sline)
}
}
func queryDB(conn client.Client, cmd string, db string) (res []client.Result, err error) {
query := client.Query{
Command: cmd,
Database: db,
}
response, err := conn.Query(query)
if err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
} else {
return res, err
}
return res, nil
}
select * from devices where devgroup='0';
select * from devices where devgroup='10';
select * from devices where devgroup='20';
select * from devices where devgroup='30';
select * from devices where devgroup='40';
select * from devices where devgroup='50';
select * from devices where devgroup='60';
select * from devices where devgroup='70';
select * from devices where devgroup='80';
select * from devices where devgroup='90';
select count(temperature) from devices where devgroup=~/[1-1][0-9]/;
select count(temperature) from devices where devgroup=~/[1-2][0-9]/;
select count(temperature) from devices where devgroup=~/[1-3][0-9]/;
select count(temperature) from devices where devgroup=~/[1-4][0-9]/;
select count(temperature) from devices where devgroup=~/[1-5][0-9]/;
select count(temperature) from devices where devgroup=~/[1-6][0-9]/;
select count(temperature) from devices where devgroup=~/[1-7][0-9]/;
select count(temperature) from devices where devgroup=~/[1-8][0-9]/;
select count(temperature) from devices where devgroup=~/[1-9][0-9]/;
select count(temperature) from devices;
select mean(temperature) from devices where devgroup=~/[1-1][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-2][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-3][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-4][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-5][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-6][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-7][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-8][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-9][0-9]/;
select mean(temperature) from devices;
select sum(temperature) from devices where devgroup=~/[1-1][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-2][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-3][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-4][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-5][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-6][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-7][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-8][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-9][0-9]/;
select sum(temperature) from devices;
select max(temperature) from devices where devgroup=~/[1-1][0-9]/;
select max(temperature) from devices where devgroup=~/[1-2][0-9]/;
select max(temperature) from devices where devgroup=~/[1-3][0-9]/;
select max(temperature) from devices where devgroup=~/[1-4][0-9]/;
select max(temperature) from devices where devgroup=~/[1-5][0-9]/;
select max(temperature) from devices where devgroup=~/[1-6][0-9]/;
select max(temperature) from devices where devgroup=~/[1-7][0-9]/;
select max(temperature) from devices where devgroup=~/[1-8][0-9]/;
select max(temperature) from devices where devgroup=~/[1-9][0-9]/;
select max(temperature) from devices;
select min(temperature) from devices where devgroup=~/[1-1][0-9]/;
select min(temperature) from devices where devgroup=~/[1-2][0-9]/;
select min(temperature) from devices where devgroup=~/[1-3][0-9]/;
select min(temperature) from devices where devgroup=~/[1-4][0-9]/;
select min(temperature) from devices where devgroup=~/[1-5][0-9]/;
select min(temperature) from devices where devgroup=~/[1-6][0-9]/;
select min(temperature) from devices where devgroup=~/[1-7][0-9]/;
select min(temperature) from devices where devgroup=~/[1-8][0-9]/;
select min(temperature) from devices where devgroup=~/[1-9][0-9]/;
select min(temperature) from devices;
select spread(temperature) from devices where devgroup=~/[1-1][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-2][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-3][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-4][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-5][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-6][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-7][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-8][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-9][0-9]/;
select spread(temperature) from devices;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-1][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-2][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-3][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-4][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-5][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-6][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-7][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-8][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-9][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-1][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-2][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-3][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-4][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-5][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-6][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-7][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-8][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-9][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices group by time(1m);
ROOT=./
TARGET=exe
LFLAGS = '-Wl,-rpath,/usr/lib' -ltaos -lpthread -lm -lrt
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 -std=gnu99
all: $(TARGET)
exe:
gcc $(CFLAGS) ./tdengineTest.c -o $(ROOT)/tdengineTest $(LFLAGS)
clean:
rm $(ROOT)tdengineTest
\ No newline at end of file
select * from db.devices where devgroup=0;
select * from db.devices where devgroup=10;
select * from db.devices where devgroup=20;
select * from db.devices where devgroup=30;
select * from db.devices where devgroup=40;
select * from db.devices where devgroup=50;
select * from db.devices where devgroup=60;
select * from db.devices where devgroup=70;
select * from db.devices where devgroup=80;
select * from db.devices where devgroup=90;
select count(*) from db.devices where devgroup<10;
select count(*) from db.devices where devgroup<20;
select count(*) from db.devices where devgroup<30;
select count(*) from db.devices where devgroup<40;
select count(*) from db.devices where devgroup<50;
select count(*) from db.devices where devgroup<60;
select count(*) from db.devices where devgroup<70;
select count(*) from db.devices where devgroup<80;
select count(*) from db.devices where devgroup<90;
select count(*) from db.devices;
select avg(temperature) from db.devices where devgroup<10;
select avg(temperature) from db.devices where devgroup<20;
select avg(temperature) from db.devices where devgroup<30;
select avg(temperature) from db.devices where devgroup<40;
select avg(temperature) from db.devices where devgroup<50;
select avg(temperature) from db.devices where devgroup<60;
select avg(temperature) from db.devices where devgroup<70;
select avg(temperature) from db.devices where devgroup<80;
select avg(temperature) from db.devices where devgroup<90;
select avg(temperature) from db.devices;
select sum(temperature) from db.devices where devgroup<10;
select sum(temperature) from db.devices where devgroup<20;
select sum(temperature) from db.devices where devgroup<30;
select sum(temperature) from db.devices where devgroup<40;
select sum(temperature) from db.devices where devgroup<50;
select sum(temperature) from db.devices where devgroup<60;
select sum(temperature) from db.devices where devgroup<70;
select sum(temperature) from db.devices where devgroup<80;
select sum(temperature) from db.devices where devgroup<90;
select sum(temperature) from db.devices;
select max(temperature) from db.devices where devgroup<10;
select max(temperature) from db.devices where devgroup<20;
select max(temperature) from db.devices where devgroup<30;
select max(temperature) from db.devices where devgroup<40;
select max(temperature) from db.devices where devgroup<50;
select max(temperature) from db.devices where devgroup<60;
select max(temperature) from db.devices where devgroup<70;
select max(temperature) from db.devices where devgroup<80;
select max(temperature) from db.devices where devgroup<90;
select max(temperature) from db.devices;
select min(temperature) from db.devices where devgroup<10;
select min(temperature) from db.devices where devgroup<20;
select min(temperature) from db.devices where devgroup<30;
select min(temperature) from db.devices where devgroup<40;
select min(temperature) from db.devices where devgroup<50;
select min(temperature) from db.devices where devgroup<60;
select min(temperature) from db.devices where devgroup<70;
select min(temperature) from db.devices where devgroup<80;
select min(temperature) from db.devices where devgroup<90;
select min(temperature) from db.devices;
select spread(temperature) from db.devices where devgroup<10;
select spread(temperature) from db.devices where devgroup<20;
select spread(temperature) from db.devices where devgroup<30;
select spread(temperature) from db.devices where devgroup<40;
select spread(temperature) from db.devices where devgroup<50;
select spread(temperature) from db.devices where devgroup<60;
select spread(temperature) from db.devices where devgroup<70;
select spread(temperature) from db.devices where devgroup<80;
select spread(temperature) from db.devices where devgroup<90;
select spread(temperature) from db.devices;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<10 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<20 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<30 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<40 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<50 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<60 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<70 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<80 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<90 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<10 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<20 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<30 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<40 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<50 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<60 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<70 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<80 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<90 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices interval(1m);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <taos.h>
#include <time.h>
#include <pthread.h>
#include <sys/time.h>
typedef struct {
char sql[256];
char dataDir[256];
int filesNum;
int writeClients;
int rowsPerRequest;
} ProArgs;
typedef struct {
int64_t totalRows;
} StatisInfo;
typedef struct {
pthread_t pid;
int threadId;
int sID;
int eID;
} ThreadObj;
static StatisInfo statis;
static ProArgs arguments;
void parseArg(int argc, char *argv[]);
void writeData();
void readData();
int main(int argc, char *argv[]) {
statis.totalRows = 0;
parseArg(argc, argv);
if (arguments.writeClients > 0) {
writeData();
} else {
readData();
}
}
void parseArg(int argc, char *argv[]) {
strcpy(arguments.sql, "./sqlCmd.txt");
strcpy(arguments.dataDir, "./testdata");
arguments.filesNum = 2;
arguments.writeClients = 0;
arguments.rowsPerRequest = 100;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-sql") == 0) {
if (i < argc - 1) {
strcpy(arguments.sql, argv[++i]);
}
else {
fprintf(stderr, "'-sql' requires a parameter, default:%s\n", arguments.sql);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-dataDir") == 0) {
if (i < argc - 1) {
strcpy(arguments.dataDir, argv[++i]);
}
else {
fprintf(stderr, "'-dataDir' requires a parameter, default:%s\n", arguments.dataDir);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-numOfFiles") == 0) {
if (i < argc - 1) {
arguments.filesNum = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-numOfFiles' requires a parameter, default:%d\n", arguments.filesNum);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-writeClients") == 0) {
if (i < argc - 1) {
arguments.writeClients = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-writeClients' requires a parameter, default:%d\n", arguments.writeClients);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-rowsPerRequest") == 0) {
if (i < argc - 1) {
arguments.rowsPerRequest = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-rowsPerRequest' requires a parameter, default:%d\n", arguments.rowsPerRequest);
exit(EXIT_FAILURE);
}
}
}
}
void taos_error(TAOS *con) {
printf("TDengine error: %s\n", taos_errstr(con));
taos_close(con);
exit(1);
}
int64_t getTimeStampMs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000L + (int64_t)systemTime.tv_usec / 1000;
}
void writeDataImp(void *param) {
ThreadObj *pThread = (ThreadObj *)param;
printf("Thread %d, writing sID %d, eID %d\n", pThread->threadId, pThread->sID, pThread->eID);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
int code = taos_query(taos, "use db");
if (code != 0) {
taos_error(taos);
}
char sql[65000];
int sqlLen = 0;
int lastMachineid = 0;
int counter = 0;
int totalRecords = 0;
for (int j = pThread->sID; j <= pThread->eID; j++) {
char fileName[256];
sprintf(fileName, "%s/testdata%d.csv", arguments.dataDir, j);
FILE *fp = fopen(fileName, "r");
if (fp == NULL) {
printf("failed to open file %s\n", fileName);
exit(1);
}
printf("open file %s success\n", fileName);
char *line = NULL;
size_t len = 0;
while (!feof(fp)) {
free(line);
line = NULL;
len = 0;
getline(&line, &len, fp);
if (line == NULL) break;
if (strlen(line) < 10) continue;
int machineid;
char machinename[16];
int machinegroup;
int64_t timestamp;
int temperature;
float humidity;
sscanf(line, "%d%s%d%lld%d%f", &machineid, machinename, &machinegroup, &timestamp, &temperature, &humidity);
if (counter == 0) {
sqlLen = sprintf(sql, "insert into");
}
if (lastMachineid != machineid) {
lastMachineid = machineid;
sqlLen += sprintf(sql + sqlLen, " dev%d using devices tags(%d,'%s',%d) values",
machineid, machineid, machinename, machinegroup);
}
sqlLen += sprintf(sql + sqlLen, "(%lld,%d,%f)", timestamp, temperature, humidity);
counter++;
if (counter >= arguments.rowsPerRequest) {
int code = taos_query(taos, sql);
if (code != 0) {
printf("thread:%d error:%d reason:%s\n", pThread->pid, code, taos_errstr(taos));
}
totalRecords += counter;
counter = 0;
lastMachineid = -1;
sqlLen = 0;
}
}
fclose(fp);
}
if (counter > 0) {
int code = taos_query(taos, sql);
if (code != 0) {
printf("thread:%d error:%d reason:%s\n", pThread->pid, code, taos_errstr(taos));
}
totalRecords += counter;
}
__sync_fetch_and_add(&statis.totalRows, totalRecords);
}
void writeData() {
printf("write data\n");
printf("---- writeClients: %d\n", arguments.writeClients);
printf("---- dataDir: %s\n", arguments.dataDir);
printf("---- numOfFiles: %d\n", arguments.filesNum);
printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest);
taos_init();
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
int code = taos_query(taos, "create database if not exists db");
if (code != 0) {
taos_error(taos);
}
code = taos_query(taos, "create table if not exists db.devices(ts timestamp, temperature int, humidity float) "
"tags(devid int, devname binary(16), devgroup int)");
if (code != 0) {
taos_error(taos);
}
int64_t st = getTimeStampMs();
int a = arguments.filesNum / arguments.writeClients;
int b = arguments.filesNum % arguments.writeClients;
int last = 0;
ThreadObj *threads = calloc((size_t)arguments.writeClients, sizeof(ThreadObj));
for (int i = 0; i < arguments.writeClients; ++i) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread->sID = last;
if (i < b) {
pthread->eID = last + a;
} else {
pthread->eID = last + a - 1;
}
last = pthread->eID + 1;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread);
}
for (int i = 0; i < arguments.writeClients; i++) {
pthread_join(threads[i].pid, NULL);
}
int64_t elapsed = getTimeStampMs() - st;
float seconds = (float)elapsed / 1000;
float rs = (float)statis.totalRows / seconds;
printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs);
}
void readData() {
printf("read data\n");
printf("---- sql: %s\n", arguments.sql);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
FILE *fp = fopen(arguments.sql, "r");
if (fp == NULL) {
printf("failed to open file %s\n", arguments.sql);
exit(1);
}
printf("open file %s success\n", arguments.sql);
char *line = NULL;
size_t len = 0;
while (!feof(fp)) {
free(line);
line = NULL;
len = 0;
getline(&line, &len, fp);
if (line == NULL) break;
if (strlen(line) < 10) continue;
int64_t st = getTimeStampMs();
int code = taos_query(taos, line);
if (code != 0) {
taos_error(taos);
}
void *result = taos_use_result(taos);
if (result == NULL) {
printf("failed to get result, reason:%s\n", taos_errstr(taos));
exit(1);
}
TAOS_ROW row;
int rows = 0;
//int num_fields = taos_field_count(taos);
//TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
rows++;
//char temp[256];
//taos_print_row(temp, row, fields, num_fields);
//printf("%s\n", temp);
}
taos_free_result(result);
int64_t elapsed = getTimeStampMs() - st;
float seconds = (float)elapsed / 1000;
printf("---- Spent %f seconds to query: %s", seconds, line);
}
fclose(fp);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册