在快速入门文章中我们介绍了 OpenTenBase 的架构、源码编译安装、集群运行状态、启动停止等内容。
本篇将介绍应用程序如何连接 OpenTenBase 数据库进行建库、建表、数据导入、查询等操作。
OpenTenBase 兼容所有支持 Postgres 协议的客户端连接,这里将详细介绍 JAVA、C 语音、shell 语言、Python、PHP、Golang 这 6 种最常用的开发语言连接 OpenTenBase 的操作方法。
1、JAVA 开发
1.1、创建数据表
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
public class createtable {
public static void main( String args[] )
{
Connection c = null;
Statement stmt = null;
try {
Class.forName("org.postgresql.Driver");
c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
System.out.println("Opened database successfully");
stmt = c.createStatement();
String sql = "create table opentenbase(id int,nickname text) distribute by shard(id) to group default_group" ;
stmt.executeUpdate(sql);
stmt.close();
c.close();
} catch ( Exception e ) {
System.err.println( e.getClass().getName()+": "+ e.getMessage() );
System.exit(0);
}
System.out.println("Table created successfully");
}
}
说明:
- 这里连接的节点为任意 CN 主节点,后面所有操作,没特别说明,都是连接到 CN 主节点进行操作。
1.2、使用普通协议插入数据
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
public class insert {
public static void main(String args[]) {
Connection c = null;
Statement stmt = null;
try {
Class.forName("org.postgresql.Driver");
c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
c.setAutoCommit(false);
System.out.println("Opened database successfully");
stmt = c.createStatement();
String sql = "INSERT INTO opentenbase (id,nickname) "
+ "VALUES (1,'opentenbase');";
stmt.executeUpdate(sql);
sql = "INSERT INTO opentenbase (id,nickname) "
+ "VALUES (2, 'pgxz' ),(3,'pgxc');";
stmt.executeUpdate(sql);
stmt.close();
c.commit();
c.close();
} catch (Exception e) {
System.err.println( e.getClass().getName()+": "+ e.getMessage() );
System.exit(0);
}
System.out.println("Records created successfully");
}
}
1.3、使用扩展协议插入数据
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.*;
import java.util.Random;
public class insert_prepared {
public static void main(String args[]) {
Connection c = null;
PreparedStatement stmt;
try {
Class.forName("org.postgresql.Driver");
c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
c.setAutoCommit(false);
System.out.println("Opened database successfully");
//插入数据
String sql = "INSERT INTO opentenbase (id,nickname) VALUES (?,?)";
stmt = c.prepareStatement(sql);
stmt.setInt(1, 9999);
stmt.setString(2, "opentenbase_prepared");
stmt.executeUpdate();
//插入更新
sql = "INSERT INTO opentenbase (id,nickname) VALUES (?,?) ON CONFLICT(id) DO UPDATE SET nickname=?";
stmt = c.prepareStatement(sql);
stmt.setInt(1, 9999);
stmt.setString(2, "opentenbase_prepared");
stmt.setString(3, "opentenbase_prepared_update");
stmt.executeUpdate();
stmt.close();
c.commit();
c.close();
} catch (Exception e) {
System.err.println( e.getClass().getName()+": "+ e.getMessage() );
System.exit(0);
}
System.out.println("Records created successfully");
}
}
1.4、copy from 加载文件到表
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import java.io.*;
public class copyfrom {
public static void main( String args[] )
{
Connection c = null;
Statement stmt = null;
FileInputStream fs = null;
try {
Class.forName("org.postgresql.Driver");
c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
System.out.println("Opened database successfully");
CopyManager cm = new CopyManager((BaseConnection) c);
fs = new FileInputStream("/data/opentenbase/opentenbase.csv");
String sql = "COPY opentenbase FROM STDIN DELIMITER AS ','";
cm.copyIn(sql, fs);
c.close();
fs.close();
} catch ( Exception e ) {
System.err.println( e.getClass().getName()+": "+ e.getMessage() );
System.exit(0);
}
System.out.println("Copy data successfully");
}
}
1.5、copy to 导出数据到文件
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import java.io.*;
public class copyto {
public static void main( String args[] )
{
Connection c = null;
Statement stmt = null;
FileOutputStream fs = null;
try {
Class.forName("org.postgresql.Driver");
c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","opentenbase", "opentenbase");
System.out.println("Opened database successfully");
CopyManager cm = new CopyManager((BaseConnection) c);
fs = new FileOutputStream("/data/opentenbase/opentenbase.csv");
String sql = "COPY opentenbase TO STDOUT DELIMITER AS ','";
cm.copyOut(sql, fs);
c.close();
fs.close();
} catch ( Exception e ) {
System.err.println( e.getClass().getName()+": "+ e.getMessage() );
System.exit(0);
}
System.out.println("Copy data successfully");
}
}
1.6、jdbc 包下载地址
https://jdbc.postgresql.org/download.html
2、C 程序开发
2.1、连接数据库
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
int
main(int argc, char **argv){
const char *conninfo;
PGconn *conn;
if (argc > 1){
conninfo = argv[1];
}else{
conninfo = "dbname = postgres";
}
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK){
fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
}else{
printf("连接数据库成功!\n");
}
PQfinish(conn);
return 0;
}
编译
gcc -c -I /usr/local/install/opentenbase_pgxz/include/ conn.c
gcc -o conn conn.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq
运行
./conn "host=172.16.0.3 dbname=postgres port=11000"
连接数据库成功!
./conn "host=172.16.0.3 dbname=postgres port=15432 user=opentenbase"
连接数据库成功!
2.2、建立数据表
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
int
main(int argc, char **argv){
const char *conninfo;
PGconn *conn;
PGresult *res;
const char *sql = "create table opentenbase(id int,nickname text) distribute by shard(id) to group default_group";
if (argc > 1){
conninfo = argv[1];
}else{
conninfo = "dbname = postgres";
}
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK){
fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
}else{
printf("连接数据库成功!\n");
}
res = PQexec(conn,sql);
if(PQresultStatus(res) != PGRES_COMMAND_OK){
fprintf(stderr, "建立数据表失败: %s",PQresultErrorMessage(res));
}else{
printf("建立数据表成功!\n");
}
PQclear(res);
PQfinish(conn);
return 0;
}
编译
gcc -c -I /usr/local/install/opentenbase_pgxz/include/ createtable.c
gcc -o createtable createtable.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq
运行
./createtable "port=11000 dbname=postgres"
连接数据库成功!
建立数据表成功!
2.3、插入数据
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
int
main(int argc, char **argv){
const char *conninfo;
PGconn *conn;
PGresult *res;
const char *sql = "INSERT INTO opentenbase (id,nickname) values(1,'opentenbase'),(2,'pgxz')";
if (argc > 1){
conninfo = argv[1];
}else{
conninfo = "dbname = postgres";
}
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK){
fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
}else{
printf("连接数据库成功!\n");
}
res = PQexec(conn,sql);
if(PQresultStatus(res) != PGRES_COMMAND_OK){
fprintf(stderr, "插入数据失败: %s",PQresultErrorMessage(res));
}else{
printf("插入数据成功!\n");
}
PQclear(res);
PQfinish(conn);
return 0;
}
编译
gcc -c -I /usr/local/install/opentenbase_pgxz/include/ insert.c
gcc -o insert insert.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq
运行
./insert "dbname=postgres port=15432"
2.4、查询数据
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
int
main(int argc, char **argv){
const char *conninfo;
PGconn *conn;
PGresult *res;
const char *sql = "select * from opentenbase";
if (argc > 1){
conninfo = argv[1];
}else{
conninfo = "dbname = postgres";
}
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK){
fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
}else{
printf("连接数据库成功!\n");
}
res = PQexec(conn,sql);
if(PQresultStatus(res) != PGRES_TUPLES_OK){
fprintf(stderr, "插入数据失败: %s",PQresultErrorMessage(res));
}else{
printf("查询数据成功!\n");
int rownum = PQntuples(res) ;
int colnum = PQnfields(res);
for(int j = 0;j< colnum; ++j){
printf("%s\t",PQfname(res,j));
}
printf("\n");
for(int i = 0;i< rownum; ++i){
for(int j = 0;j< colnum; ++j){
printf("%s\t",PQgetvalue(res,i,j));
}
printf("\n");
}
}
PQclear(res);
PQfinish(conn);
return 0;
}
编译
gcc -std=c99 -c -I /usr/local/install/opentenbase_pgxz/include/ select.c
gcc -o select select.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq
运行
./select "dbname=postgres port=15432"
连接数据库成功!
查询数据成功!
id nickname
1 opentenbase
2 pgxz
2.5、流数据 COPY 入表
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
int
main(int argc, char **argv){
const char *conninfo;
PGconn *conn;
PGresult *res;
const char *buffer = "1,opentenbase\n2,pgxz\n3,opentenbase牛";
if (argc > 1){
conninfo = argv[1];
}else{
conninfo = "dbname = postgres";
}
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK){
fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn));
}else{
printf("连接数据库成功!\n");
}
res=PQexec(conn,"COPY opentenbase FROM STDIN DELIMITER ',';");
if(PQresultStatus(res) != PGRES_COPY_IN){
fprintf(stderr, "copy数据出错1: %s",PQresultErrorMessage(res));
}else{
int len = strlen(buffer);
if(PQputCopyData(conn,buffer,len) == 1){
if(PQputCopyEnd(conn,NULL) == 1){
res = PQgetResult(conn);
if(PQresultStatus(res) == PGRES_COMMAND_OK){
printf("copy数据成功!\n");
}else{
fprintf(stderr, "copy数据出错2: %s",PQerrorMessage(conn));
}
}else{
fprintf(stderr, "copy数据出错3: %s",PQerrorMessage(conn));
}
}else{
fprintf(stderr, "copy数据出错4: %s",PQerrorMessage(conn));
}
}
PQclear(res);
PQfinish(conn);
return 0;
}
编译
gcc -c -I /usr/local/install/opentenbase_pgxz/include/ copy.c
gcc -o copy copy.o -L /usr/local/install/opentenbase_pgxz/lib/ -lpq
执行
./copy "dbname=postgres port=15432"
连接数据库成功!
copy数据成功!
3、shell 脚本开发
#!/bin/sh
if [ $# -ne 0 ]
then
echo "usage: $0 exec_sql"
exit 1
fi
exec_sql=$1
masters=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_host, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"`
port_list=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_port::text, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"`
node_cnt=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select count(*) from pgxc_node where node_type = 'D'"`
masters=($masters)
ports=($port_list)
echo $node_cnt
flag=0
for((i=0;i<$node_cnt;i++));
do
seq=$(($i+1))
master=${masters[$i]}
port=${ports[$i]}
echo $master
echo $port
psql -h $master -p $port postgres -c "$exec_sql"
done
4、python 程序开发
4.1、安装 psycopg2 模块
[root@VM_0_29_centos ~]# yum install python-psycopg2
4.2、连接数据库
#coding=utf-8
#!/usr/bin/python
import psycopg2
try:
conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
print "连接数据库成功"
conn.close()
except psycopg2.Error,msg:
print "连接数据库出错,错误详细信息: %s" %(msg.args[0])
运行
[opentenbase@VM_0_29_centos python]$ python conn.py
连接数据库成功
4.3、创建数据表
#coding=utf-8
#!/usr/bin/python
import psycopg2
try:
conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
print "连接数据库成功"
cur = conn.cursor()
sql = """
create table opentenbase
(
id int,
nickname varchar(100)
)distribute by shard(id) to group default_group
"""
cur.execute(sql)
conn.commit()
print "建立数据表成功"
conn.close()
except psycopg2.Error,msg:
print "OpenTenBase Error %s" %(msg.args[0])
运行
[opentenbase@VM_0_29_centos python]$ python createtable.py
连接数据库成功
建立数据表成功
4.4、插入数据
#coding=utf-8
#!/usr/bin/python
import psycopg2
try:
conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
print "连接数据库成功"
cur = conn.cursor()
sql = "insert into opentenbase values(1,'opentenbase'),(2,'opentenbase');"
cur.execute(sql)
sql = "insert into opentenbase values(%s,%s)"
cur.execute(sql,(3,'pg'))
conn.commit()
print "插入数据成功"
conn.close()
except psycopg2.Error,msg:
print "操作数据库出库 %s" %(msg.args[0])
运行
[opentenbase@VM_0_29_centos python]$ python insert.py
连接数据库成功
插入数据成功
4.5、查询数据
#coding=utf-8
#!/usr/bin/python
import psycopg2
try:
conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
print "连接数据库成功"
cur = conn.cursor()
sql = "select * from opentenbase"
cur.execute(sql)
rows = cur.fetchall()
for row in rows:
print "ID = ", row[0]
print "NICKNAME = ", row[1],"\n"
conn.close()
except psycopg2.Error,msg:
print "操作数据库出库 %s" %(msg.args[0])
运行
[opentenbase@VM_0_29_centos python]$ python select.py
连接数据库成功
ID = 1
NICKNAME = opentenbase
ID = 2
NICKNAME = pgxz
ID = 3
NICKNAME = pg
4.6、copy from 加载文件到表
#coding=utf-8
#!/usr/bin/python
import psycopg2
try:
conn = psycopg2.connect(database="postgres", user="opentenbase", password="", host="172.16.0.29", port="15432")
print "连接数据库成功"
cur = conn.cursor()
filename = "/data/opentenbase/opentenbase.txt"
cols = ('id','nickname')
tablename="public.opentenbase"
cur.copy_from(file=open(filename),table=tablename,columns=cols,sep=',')
conn.commit()
print "导入数据成功"
conn.close()
except psycopg2.Error,msg:
print "操作数据库出库 %s" %(msg.args[0])
执行
[opentenbase@VM_0_29_centos python]$ python copy_from.py
连接数据库成功
导入数据成功
5、PHP 程序开发
5.1、连接数据库
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n<BR>"; ;
exit;
}else{
echo "连接数据库成功"."\n<BR>";
}
//关闭连接
pg_close($conn);
?>
执行
[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/conn.php
连接数据库成功
5.2、创建数据表
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "连接数据库成功"."\n";
}
//建立数据表
$sql="create table public.opentenbase(id integer,nickname varchar(100)) distribute by shard(id) to group default_group;";
$result = @pg_exec($conn,$sql) ;
if (!$result){
$error_msg=@pg_errormessage($conn);
echo "创建数据表出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "创建数据表成功"."\n";
}
//关闭连接
pg_close($conn);
?>
执行
[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/createtable.php
连接数据库成功
创建数据表成功
5.3、插入数据
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "连接数据库成功"."\n";
}
//插入数据
$sql="insert into public.opentenbase values(1,'opentenbase'),(2,'pgxz');";
$result = @pg_exec($conn,$sql) ;
if (!$result){
$error_msg=@pg_errormessage($conn);
echo "插入数据出错,详情:".$error_msg."\n";
exit;
}else{
echo "插入数据成功"."\n";
}
//关闭连接
pg_close($conn);
?>
执行
[opentenbase@VM_0_47_centos test]$ curl http://127.0.0.1:8080/dbsta/test/insert.php
连接数据库成功
插入数据成功
5.4、查询数据
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "连接数据库成功"."\n";
}
//查询数据
$sql="select id,nickname from public.opentenbase";
$result = @pg_exec($conn,$sql) ;
if (!$result){
$error_msg=@pg_errormessage($conn);
echo "查询数据出错,详情:".$error_msg."\n";
exit;
}else{
echo "插入数据成功"."\n";
}
$record_num = pg_numrows($result);
echo "返回记录数".$record_num."\n";
$rec=pg_fetch_all($result);
for($i=0;$i<$record_num;$i++){
echo "记录数#".strval($i+1)."\n";
echo "id:".$rec[$i]["id"]."\n";
echo "nickname:".$rec[$i]["nickname"]."\n\n";
}
//关闭连接
pg_close($conn);
?>
调用方法
[root@VM_0_47_centos ~]# curl http://127.0.0.1:8080/dbsta/test/select.php
连接数据库成功
插入数据成功
返回记录数2
记录数#1
id:1
nickname:opentenbase
记录数#2
id:2
nickname:pgxz
5.5、流数据 copy 入表
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "连接数据库成功"."\n";
}
$row=ARRAY("1,opentenbase","2,pgxz");
$flag=pg_copy_from($conn,"public.opentenbase",$row,",");
if (!$flag){
$error_msg=@pg_errormessage($conn);
echo "copy出错,详情:".$error_msg."\n";
}else{
echo "copy成功"."\n";
}
//关闭连接
pg_close($conn);
?>
调用方法
curl http://127.0.0.1/dbsta/cron/php_copy_from.php
连接数据库成功
copy成功
5.6、copy to 导出数据到一个数组中
<?php
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="opentenbase" ;
$password="";
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");
if (!$conn){
$error_msg=@pg_errormessage($conn);
echo "连接数据库出错,详情:".$error_msg."\n"; ;
exit;
}else{
echo "连接数据库成功"."\n";
}
$row=pg_copy_to($conn,"public.opentenbase",",");
if (!$row){
$error_msg=@pg_errormessage($conn);
echo "copy出错,详情:".$error_msg."\n";
}else{
print_r($row);
}
//关闭连接
pg_close($conn);
?>
调用方法
curl http://127.0.0.1/dbsta/cron/php_copy_to.php
连接数据库成功
Array
(
[0] => 1,opentenbase
[1] => 2,pgxz
)
6、golang 程序开发
6.1、连接数据库
package main
import (
"fmt"
"time"
"github.com/jackc/pgx"
)
func main() {
var error_msg string
//连接数据库
conn, err := db_connect()
if err != nil {
error_msg = "连接数据库失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
//程序运行结束时关闭连接
defer conn.Close()
write_log("Log", "连接数据库成功")
}
/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是是Error或Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
//打印错误信息
fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("日志级别:", log_level)
fmt.Println("详细信息:", error_msg)
}
/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error --错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
var config pgx.ConnConfig
config.Host = "127.0.0.1" //数据库主机host或ip
config.User = "opentenbase" //连接用户
config.Password = "pgsql" //用户密码
config.Database = "postgres" //连接数据库名
config.Port = 15432 //端口号
conn, err = pgx.Connect(config)
return conn, err
}
[root@VM_0_29_centos opentenbase]# go run conn.go
访问时间: 2018-04-03 20:40:28
日志级别: Log
详细信息: 连接数据库成功
编译后运行
[root@VM_0_29_centos opentenbase]# go build conn.go
[root@VM_0_29_centos opentenbase]# ./conn
访问时间: 2018-04-03 20:40:48
日志级别: Log
详细信息: 连接数据库成功
6.2、创建数据表
package main
import (
"fmt"
"time"
"github.com/jackc/pgx"
)
func main() {
var error_msg string
var sql string
//连接数据库
conn, err := db_connect()
if err != nil {
error_msg = "连接数据库失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
//程序运行结束时关闭连接
defer conn.Close()
write_log("Log", "连接数据库成功")
//建立数据表
sql = "create table public.opentenbase(id varchar(20),nickname varchar(100)) distribute by shard(id) to group default_group;"
_, err = conn.Exec(sql)
if err != nil {
error_msg = "创建数据表失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "创建数据表成功")
}
}
/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是是Error或Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
//打印错误信息
fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("日志级别:", log_level)
fmt.Println("详细信息:", error_msg)
}
/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error --错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
var config pgx.ConnConfig
config.Host = "127.0.0.1" //数据库主机host或ip
config.User = "opentenbase" //连接用户
config.Password = "pgsql" //用户密码
config.Database = "postgres" //连接数据库名
config.Port = 15432 //端口号
conn, err = pgx.Connect(config)
return conn, err
}
[root@VM_0_29_centos opentenbase]# go run createtable.go
访问时间: 2018-04-03 20:50:24
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-03 20:50:24
日志级别: Log
详细信息: 创建数据表成功
6.3、插入数据
package main
import (
"fmt"
"strings"
"time"
"github.com/jackc/pgx"
)
func main() {
var error_msg string
var sql string
var nickname string
//连接数据库
conn, err := db_connect()
if err != nil {
error_msg = "连接数据库失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
//程序运行结束时关闭连接
defer conn.Close()
write_log("Log", "连接数据库成功")
//插入数据
sql = "insert into public.opentenbase values('1','opentenbase'),('2','pgxz');"
_, err = conn.Exec(sql)
if err != nil {
error_msg = "插入数据失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "插入数据成功")
}
//绑定变量插入数据,不需要做防注入处理
sql = "insert into public.opentenbase values($1,$2),($1,$3);"
_, err = conn.Exec(sql, "3", "postgresql", "postgres")
if err != nil {
error_msg = "插入数据失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "插入数据成功")
}
//拼接sql语句插入数据,需要做防注入处理
nickname = "OpenTenBase is ' good!"
sql = "insert into public.opentenbase values('1','" + sql_data_encode(nickname) + "')"
_, err = conn.Exec(sql)
if err != nil {
error_msg = "插入数据失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "插入数据成功")
}
}
/*
功能描述:sql查询拼接字符串编码
参数说明:
str -- 要编码的字符串
返回值说明:
返回编码过的字符串
*/
func sql_data_encode(str string) string {
return strings.Replace(str, "'", "''", -1)
}
/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是是Error或Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
//打印错误信息
fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("日志级别:", log_level)
fmt.Println("详细信息:", error_msg)
}
/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error --错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
var config pgx.ConnConfig
config.Host = "127.0.0.1" //数据库主机host或ip
config.User = "opentenbase" //连接用户
config.Password = "pgsql" //用户密码
config.Database = "postgres" //连接数据库名
config.Port = 15432 //端口号
conn, err = pgx.Connect(config)
return conn, err
}
[root@VM_0_29_centos opentenbase]# go run insert.go
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功
访问时间: 2018-04-03 21:05:51
日志级别: Log
详细信息: 插入数据成功
6.4、查询数据
package main
import (
"fmt"
"strings"
"time"
"github.com/jackc/pgx"
)
func main() {
var error_msg string
var sql string
//连接数据库
conn, err := db_connect()
if err != nil {
error_msg = "连接数据库失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
//程序运行结束时关闭连接
defer conn.Close()
write_log("Log", "连接数据库成功")
sql = "SELECT id,nickname FROM public.opentenbase LIMIT 2"
rows, err := conn.Query(sql)
if err != nil {
error_msg = "查询数据失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "查询数据成功")
}
var nickname string
var id string
for rows.Next() {
err = rows.Scan(&id, &nickname)
if err != nil {
error_msg = "执行查询失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
error_msg = fmt.Sprintf("id:%s nickname:%s", id, nickname)
write_log("Log", error_msg)
}
rows.Close()
nickname = "opentenbase"
sql = "SELECT id,nickname FROM public.opentenbase WHERE nickname ='" + sql_data_encode(nickname) + "' "
rows, err = conn.Query(sql)
if err != nil {
error_msg = "查询数据失败,详情:" + err.Error()
write_log("Error", error_msg)
return
} else {
write_log("Log", "查询数据成功")
}
defer rows.Close()
for rows.Next() {
err = rows.Scan(&id, &nickname)
if err != nil {
error_msg = "执行查询失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
error_msg = fmt.Sprintf("id:%s nickname:%s", id, nickname)
write_log("Log", error_msg)
}
}
/*
功能描述:sql查询拼接字符串编码
参数说明:
str -- 要编码的字符串
返回值说明:
返回编码过的字符串
*/
func sql_data_encode(str string) string {
return strings.Replace(str, "'", "''", -1)
}
/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是是Error或Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
//打印错误信息
fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("日志级别:", log_level)
fmt.Println("详细信息:", error_msg)
}
/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error --错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
var config pgx.ConnConfig
config.Host = "127.0.0.1" //数据库主机host或ip
config.User = "opentenbase" //连接用户
config.Password = "pgsql" //用户密码
config.Database = "postgres" //连接数据库名
config.Port = 15432 //端口号
conn, err = pgx.Connect(config)
return conn, err
}
[root@VM_0_29_centos opentenbase]# go run select.go
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 查询数据成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:2 nickname:opentenbase
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:3 nickname:postgresql
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: 查询数据成功
访问时间: 2018-04-09 10:35:50
日志级别: Log
详细信息: id:1 nickname:opentenbase
6.5、流数据 copy from 入表
package main
import (
"fmt"
"math/rand"
"time"
"github.com/jackc/pgx"
)
func main() {
var error_msg string
//连接数据库
conn, err := db_connect()
if err != nil {
error_msg = "连接数据库失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
//程序运行结束时关闭连接
defer conn.Close()
write_log("Log", "连接数据库成功")
//构造5000行数据
inputRows := [][]interface{}{}
var id string
var nickname string
for i := 0; i < 5000; i++ {
id = fmt.Sprintf("%d", rand.Intn(10000))
nickname = fmt.Sprintf("%d", rand.Intn(10000))
inputRows = append(inputRows, []interface{}{id, nickname})
}
copyCount, err := conn.CopyFrom(pgx.Identifier{"opentenbase"}, []string{"id", "nickname"}, pgx.CopyFromRows(inputRows))
if err != nil {
error_msg = "执行copyFrom失败,详情:" + err.Error()
write_log("Error", error_msg)
return
}
if copyCount != len(inputRows) {
error_msg = fmt.Sprintf("执行copyFrom失败,copy行数:%d 返回行数为:%d", len(inputRows), copyCount)
write_log("Error", error_msg)
return
} else {
error_msg = "Copy 记录成功"
write_log("Log", error_msg)
}
}
/*
功能描述:写入日志处理
参数说明:
log_level -- 日志级别,只能是是Error或Log
error_msg -- 日志内容
返回值说明:无
*/
func write_log(log_level string, error_msg string) {
//打印错误信息
fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("日志级别:", log_level)
fmt.Println("详细信息:", error_msg)
}
/*
功能描述:连接数据库
参数说明:无
返回值说明:
conn *pgx.Conn -- 连接信息
err error --错误信息
*/
func db_connect() (conn *pgx.Conn, err error) {
var config pgx.ConnConfig
config.Host = "127.0.0.1" //数据库主机host或ip
config.User = "opentenbase" //连接用户
config.Password = "pgsql" //用户密码
config.Database = "postgres" //连接数据库名
config.Port = 15432 //端口号
conn, err = pgx.Connect(config)
return conn, err
}
[root@VM_0_29_centos opentenbase]# go run copy_from.go
访问时间: 2018-04-09 10:36:40
日志级别: Log
详细信息: 连接数据库成功
访问时间: 2018-04-09 10:36:40
日志级别: Log
详细信息: Copy 记录成功
6.6、golang 相关资源包
需要 git 的资源包:
https://github.com/jackc/pgx
https://github.com/pkg/errors