TdOrm/serve/Serve.go
2024-10-30 15:29:19 +08:00

342 lines
7.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package serve
import (
"encoding/json"
"fmt"
"io"
"orm/DB"
"os"
"strings"
"sync"
"time"
)
func getParams(fieldName []string, queryParam map[string]string) {
// 获取参数
}
func Serve() {
table := "tongbai_wt16"
selectToCsvFile(table)
// 创建数据库
//createTable()
// 通过csv文件插入数据
//allStart := time.Now()
//for i := 0; i < 100; i++ {
//insertByCsv("./test.csv")
//}
//allSince := time.Since(allStart)
//fmt.Println("总时间", allSince)
}
// 读取数据库数据并写入文件
func selectToCsvFile(table string) {
// 获取sql语句
sql := fmt.Sprintf("select * from `lxc_db`.`%s`", table)
//sql := "select * from `lxc_db`.`tongbai_wt11` limit 100"
//sql = "select * from `test`.`meters` limit 10000"
// 执行select语句
mData := selectExec(sql)
// 写入文件
writeToFile(mData, "./test.csv")
}
// 执行select语句
func selectExec(sql string) []map[string]interface{} {
//now := time.Now()
// 执行sql语句
query, err := DB.Db.Query(sql)
if err != nil {
panic(err)
}
//since := time.Since(now)
// 数据列
columns, err := query.Columns()
if err != nil {
panic(err)
}
// 数据个数
count := len(columns)
// 返回值 Map切片
mData := make([]map[string]interface{}, 0)
// 一条数据的各列的值(需要指定长度为列的个数,以便获取地址)
values := make([]interface{}, count)
// 一条数据的各列的值的地址
valPointers := make([]interface{}, count)
// 处理查询结果
for query.Next() {
// 获取各列的值的地址
for i := 0; i < count; i++ {
valPointers[i] = &values[i]
}
// 获取各列的值,放到对应的地址中
query.Scan(valPointers...)
// 一条数据的Map (列名和值的键值对)
entry := make(map[string]interface{})
// Map 赋值
for i, col := range columns {
var v interface{}
// 值复制给val(所以Scan时指定的地址可重复使用)
val := values[i]
b, ok := val.([]byte)
if ok {
// 字符切片转为字符串
v = string(b)
} else {
v = val
}
entry[col] = v
}
mData = append(mData, entry)
}
return mData
// 循环打印mData
//for _, v := range mData {
// fmt.Println(v)
//}
}
// 判断文件是否存在
func fileExist(path string) bool {
// 判断文件是否存在
_, err := os.Stat(path)
if err != nil {
return false
}
return true
}
// 将数据写入文件
func writeToFile(mData []map[string]interface{}, path string) {
// 写入文件的字符串
var fileStr string
// 创建表头
var fileHead []string
// 创建文件
var csvFile *os.File
var err error
defer csvFile.Close()
// 判断文件是否存在
if fileExist(path) {
file := readCsvFile(path)
if file == "" || file == "\n" {
} else {
csvFile, err = os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
panic(err)
}
fileLine := strings.Split(file, "\n")
fileHead = strings.Split(strings.ReplaceAll(fileLine[0], "\"", ""), ",")
fmt.Println("fileHead", fileHead)
}
} else {
csvFile, err = os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
panic(err)
}
for k := range mData[0] {
// 将key转为字符串
key, err := json.Marshal(k)
if err != nil {
panic(err)
}
// 将key转为字符串后拼接到fileStr
fileStr += string(key)
fileStr += ","
// 向fileHead添加key
fileHead = append(fileHead, k)
fmt.Println("fileHead", fileHead)
}
// 去掉最后一个逗号此时fileStr第一行为表头
fileStr = strings.TrimRight(fileStr, ",")
fileStr += "\n"
}
// 根据表头读数据并写入文件
// 将mData写入文件 循环[]map[string]interface{}
dataPoint := 0
go func() {
// 写入文件
if _, err := io.WriteString(csvFile, fileStr); err != nil {
fmt.Println("写入文件失败: ", err)
os.Exit(1)
}
}()
for {
if len(mData)-dataPoint < 500 {
fmt.Println("数据量小于500")
fileStr += getStrHandle(mData[dataPoint:], fileHead)
break
}
go func() {
fileStr += getStrHandle(mData[dataPoint:dataPoint+500], fileHead)
dataPoint += 500
}()
}
// 循环表头根据表头读每一行的map的数据
// 删除最后一个换行符
//fileStr = strings.TrimRight(fileStr, "\n")
//fmt.Println(fileStr)
//getSqlByContent(fileStr)
// 写入文件
if _, err := io.WriteString(csvFile, fileStr); err != nil {
fmt.Println("写入文件失败: ", err)
os.Exit(1)
}
}
func getStrHandle(mData []map[string]interface{}, fileHead []string) (fileStr string) {
now := time.Now()
for _, mapEntry := range mData {
// 循环表头根据表头读每一行的map的数据
for _, key := range fileHead {
if key == "ts" {
now = now.Add(time.Hour * 1)
fileStr += "'" + now.Format("2006-01-02 15:04:05") + "'"
//value, err := json.Marshal(mapEntry[key])
//if err != nil {
// panic(err)
//}
//fileStr += string(value)
} else {
value, err := json.Marshal(mapEntry[key])
if err != nil {
panic(err)
}
fileStr += string(value)
}
fileStr += ","
}
fileStr = strings.TrimRight(fileStr, ",")
fileStr += "\n"
fmt.Println("写完一行数据")
}
// 删除最后一个换行符
//fileStr = strings.TrimRight(fileStr, "\n")
//fmt.Println(fileStr)
//getSqlByContent(fileStr)
return
}
// 通过csv文件插入数据
func insertByCsv(filePath string) {
// 将文件写入数据库, 试用版不支持
//sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt12` FILE %s;", filePath)
//_, err := DB.Db.Exec(sql)
//if err != nil {
// panic(err)
//}
// 读取文件冰生成sql语句
// 读取文件
readFront := time.Now()
fileContent := readCsvFile(filePath)
readSince := time.Since(readFront)
fmt.Println("读文件所用时间", readSince)
// 生成写入数据库的sql语句
sqlFront := time.Now()
sqlList := getSqlByContent(fileContent)
sqlSince := time.Since(sqlFront)
fmt.Println("生成sql语句所用时间", sqlSince)
// 执行sql语句
//fmt.Println("sql", sql)
execFront := time.Now()
wg := sync.WaitGroup{}
for _, sql := range sqlList {
wg.Add(1)
go func() {
_, err := DB.Db.Exec(sql)
if err != nil {
}
wg.Done()
}()
//_, err := DB.Db.Exec(sql)
//if err != nil {
// panic(err)
//} else {
// //fmt.Println("插入成功")
//}
}
wg.Wait()
execSince := time.Since(execFront)
fmt.Println("执行sql语句所用时间", execSince)
}
// 读取csv文件
func readCsvFile(filePath string) string {
// 打开文件
csvFile, err := os.OpenFile(filePath, os.O_RDONLY, 0666)
if err != nil {
panic(err)
}
defer csvFile.Close()
// 读取CSV文件
fileContent, err := io.ReadAll(csvFile)
if err != nil {
panic(err)
}
return string(fileContent)
}
// 根据文件内容生成sql语句
func getSqlByContent(fileContent string) []string {
// 将文件内容按行分割
dataLine := strings.Split(fileContent, "\n")
// 第一行为表头
head := dataLine[0]
// 剩余行为数据
content := dataLine[1:]
var data string
var sqlList []string
var l int
for i, v := range content {
content[i] = "(" + v + ")"
data += content[i]
l++
// 每500条数据生成一条sql语句
if l > 500 {
l = 0
sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt16` (%s) VALUES %s;", head, data)
sqlList = append(sqlList, sql)
data = ""
}
}
// 生成sql语句
sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt16` (%s) VALUES %s;", head, data)
sqlList = append(sqlList, sql)
return sqlList
}
// 创建表
func createTable() {
// 创建表
sql := "CREATE TABLE `lxc_db`.`tongbai_wt16` USING `lxc_db`.`meters` (`groupid`, `ip_addr`) TAGS (10011, NULL)"
exec, err := DB.Db.Exec(sql)
if err != nil {
panic(err)
}
id, err := exec.RowsAffected()
if err != nil {
panic(err)
}
fmt.Println("exec.RowsAffected", id)
}