大数据

JAVA操作InfluxDB的一个Demo

宋兴柱 · 11月27日 · 2019年 624次已读

一、基础连接类

package com.test.repository.utils;

import com.test.domain.entry.bo.common.InfluxDbRow;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.util.List;
import java.util.concurrent.TimeUnit;


public class InfluxDBConnect {

    private String username;
    private String password;
    private String url;
    private String database;
    private int retentionDay;
    private int replicationCount;

    private InfluxDB influxDB;


    public InfluxDBConnect(String username, String password, String url, String database, int retentionDay, int replicationCount) {
        this.username = username;
        this.password = password;
        this.url = url;
        this.database = database;
        this.retentionDay = retentionDay;
        this.replicationCount = replicationCount;
    }

    /** 连接时序数据库;获得InfluxDB **/
    void connection() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(url, username, password);
        }
    }

    /**
     * 设置数据保存策略
     * defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1  副本个数为1/ 结尾DEFAULT 表示 设为默认的策略
     */
    void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "default", database, retentionDay + "d", replicationCount);
        this.query(command);
    }

    /**
     * 查询
     * @param command 查询语句
     * @return 查询结果
     */
    QueryResult query(String command) {
        return influxDB.query(new Query(command, database));
    }

    /**
     * 插入
     */
    public void insert(InfluxDbRow influxDbRow) {
        if (influxDbRow == null) {
            return;
        }
        Point.Builder builder = Point.measurement(influxDbRow.getMeasurement());
        builder.tag(influxDbRow.getTags());
        builder.fields(influxDbRow.getFields());
        if (influxDbRow.getTimeSecond() != null) {
            builder.time(influxDbRow.getTimeSecond(), TimeUnit.SECONDS);
        }
        influxDB.write(database, "default", builder.build());
    }

    /**
     * 删除
     * @param command 删除语句
     * @return 返回错误信息
     */
    public String deleteMeasurementData(String command) {
        QueryResult result = influxDB.query(new Query(command, database));
        return result.getError();
    }

    /**
     * 创建数据库
     * @param dbName 库名称
     */
    public void createDB(String dbName) {
        this.query("create database " + dbName);
    }

    /**
     * 删除数据库
     * @param dbName
     */
    public void deleteDB(String dbName) {
        this.query("drop database " + dbName);
    }

    public void close() {
        this.influxDB.close();
    }

    /**
     * 指导导入
     * @param influxDbRows 行记录
     */
    public void batchPointsImport(List<InfluxDbRow> influxDbRows) {
        if (influxDbRows == null || influxDbRows.size() == 0) {
            return;
        }
        BatchPoints batchPoints = BatchPoints.database(this.database).retentionPolicy("default").build();
        for (InfluxDbRow influxDbRow : influxDbRows) {
            if (influxDbRow.getTags().size() + influxDbRow.getFields().size() == 0) continue;
            Point.Builder builder = Point.measurement(influxDbRow.getMeasurement());
            builder.tag(influxDbRow.getTags());
            builder.fields(influxDbRow.getFields());
            if (influxDbRow.getTimeSecond() != null) {
                builder.time(influxDbRow.getTimeSecond(), TimeUnit.SECONDS);
            } else {
                builder.time(System.currentTimeMillis() / 1000, TimeUnit.SECONDS);
            }
            batchPoints.point(builder.build());
        }
        influxDB.write(batchPoints);
    }
}
package com.test.repository.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
@Slf4j
public class InfluxDBConnectConfig {

    @Value("${spring.influx.url}")
    private String url;
    @Value("${spring.influx.user}")
    private String username;
    @Value("${spring.influx.password}")
    private String password;
    @Value("${spring.influx.database}")
    private String database;
    @Value("${spring.influx.retentionDay}")
    private Integer retentionDay;
    @Value("${spring.influx.replicationCount}")
    private Integer replicationCount;

    @Bean
    @Scope("prototype")
    public InfluxDBConnect influxDBConnectFactory() {
        if (this.retentionDay == null) this.retentionDay = 30;
        if (this.replicationCount == null) this.replicationCount = 1;
        return new InfluxDBConnect(username, password, url, database, retentionDay, replicationCount);
    }

}

 

 
0 条回应