欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

BDB/LevelDb/RocksDB - 基于本地磁盘数据库的本地队列实现和性能比较

最编程 2024-07-10 13:32:22
...

这是我参与更文挑战的第7天,活动详情查看: 更文挑战

需求背景:

接上一篇文章,《开箱即用|基于BDB实现本地消息队列》,我们在使用的时候会发现,当大量数据进行入队的时候会出现性能问题,内存飙升极快,而且延迟也比较高,所以我们这期做一个优化,不使用AbstractQueue来通过队头队尾的方式来实现本地消息队列,同时对比一下其他几个相同类型的本地磁盘数据的性能及使用方法;

Berkeley DB:

以下简称为BDB (Berkeley DB是Oracle一个开源的KV单机文件数据库)

Level DB :

以下简称为LDB (LevelDB是Google开源的持久化KV单机数据库,具有很高的随机写,顺序读/写性能,但是随机读的性能很一般,也就是说,LevelDB很适合应用在查询较少,而写很多的场景。)

基于BDB指针方式实现本地队列:

通过指针方式遍历BDB队列 有序 但性能不如队列方式 可实现队列方式 同levenDB(见下文)实现方式一致

代码实现:
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import com.sleepycat.je.*;
import lombok.extern.slf4j.Slf4j;

/**
 * 封装的bdb操作工具栏 集成了增、删、改、查、关闭、同步操作等方法
 *
 * @author taoze
 * @version 1.0
 * @date 4/9/21 15:31
 */
@Slf4j
public class BdbOperator {
    // 环境变量的声明
    /**
     * Environment这个对象,这个对象是bdb的环境
     * bdb je 只允许有一个写的进程,可有有多个只读的进程,
     * 但是当写的进程更新数据以后,读的进程不能发现数据的改变,
     * 只有close这个environment,在开启,
     * 所以一个Environment尽量使用一个进程(proccess)操作
     * 注意,关闭Environment是很耗时的
     * 不是特别需要,尽量不要关闭Environment
     */
    private Environment myDbEnvironment = null;
    // 数据库操作的对象声明
    private Database myDatabase = null;

    /**
     * bdb操作环境变量和数据库初始化
     * setAllowCreate 是否允许创建这个环境,true为是,false为否
     * setCacheSize 设置缓存单位为字节,比如设置1M缓存setCacheSize(1000000);
     * setTransactional 设置是否启用事务
     * setReadOnly 设置是否为只读模式访问,true为只读
     * setLocking 设置环境是否为锁定
     * 更多的参数设置可以用
     * setConfigParam
     * 这个方法可设置选项非常多
     * envConfig.setConfigParam("je.log.fileMax","20000000");设置日志文件最大为20M,默认是10M
     * je.log.bufferSize 设置日志的缓冲 缺省为1048576 (1M)
     * je.lock.timeout 锁定时间
     *
     * @param dbEnvFilePath
     * @param databaseName
     */

    public BdbOperator(String dbEnvFilePath, String databaseName) {

        /**
         * 初始化数据库参数
         */
        try {
            // 初始化数据存储根目录文件夹
            File f = new File(dbEnvFilePath);
            if (!f.exists()) {
                f.mkdirs();
            }
            // 数据库配置变量初始化
            DatabaseConfig dbConfig = new DatabaseConfig();// 打开数据库
            dbConfig.setAllowCreate(true);
            // 初始化环境配置变量,基于该变量去配置环境变量
            EnvironmentConfig envConfig = new EnvironmentConfig();
            // 当使用的数据库配置变量不存在的时候,就自动创建
            envConfig.setAllowCreate(true);
            // 正式初始化数据库的环境
            myDbEnvironment = new Environment(f, envConfig);
            // 打开一个数据库,如果不存在,则自动创建;第一个参数表示是否是事务
            myDatabase = myDbEnvironment.openDatabase(null, databaseName, dbConfig);
        } catch (Exception e) {
            log.warn("BdbOperator init DBD环境异常", e);
        }
    }

    /**
     * 将指定的kv存放到bdb当中,并可以选择是否实时同步到磁盘中
     *
     * @param key
     * @param value
     * @param isSync
     * @return
     */
    public boolean put(String key, String value, boolean isSync) {
        // 数据的key
        // 数据的value
        try {
            // 将key和value都封装到DatabaseEntry中
            DatabaseEntry theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
            DatabaseEntry theData = new DatabaseEntry(value.getBytes(StandardCharsets.UTF_8));
            // 写入数据库
            myDatabase.put(null, theKey, theData);
            if (isSync) {
                // 数据同步到磁盘
                this.sync();
            }
            return true;
        } catch (Exception e) {
            log.warn("BdbOperator put 失败", e);
        }
        return false;
    }

    // 删除bdb中指定的key值
    public boolean delete(String key) {
        DatabaseEntry theKey;
        try {
            theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
            myDatabase.delete(null, theKey);
            return true;
        } catch (Exception e) {
            log.warn("BdbOperator delete 失败", e);
        }
        return false;
    }

    /**
     * 读取bdb的key对应的数据
     *
     * @param key
     * @return
     */
    public String getValue(String key) {
        // 要读取数据的key
        try {
            // 将读取数据的key封装到DatabaseEntry中
            DatabaseEntry theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
            // 将读取出来的值以二进制形式放到DatabaseEntry中
            DatabaseEntry theData = new DatabaseEntry();
            // 执行读取操作
            myDatabase.get(null, theKey, theData, LockMode.DEFAULT);
            if (theData.getData() == null) {
                return null;
            }
            // 将二进制数据转化成字符串值
            return new String(theData.getData(), StandardCharsets.UTF_8);
        } catch (Exception e) {
            log.warn("BdbOperator getValue 失败", e);
        }
        return null;
    }

    /**
     * 查询所有,可遍历数据
     * selectAll(Here describes this method function with a few words)*
     * <p>
     * void
     */
    public List<String> selectAll() {
        Cursor cursor = null;
        cursor = myDatabase.openCursor(null, null);
        DatabaseEntry theKey = null;
        DatabaseEntry theData = null;
        theKey = new DatabaseEntry();
        theData = new DatabaseEntry();

        ArrayList<String> list = new ArrayList<>();
        while (cursor.getNext(theKey, theData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
            list.add(new String(theData.getData()));
        }
        cursor.close();
        return list;

    }

    public Database getMyDatabase() {
        return myDatabase;
    }

    /**
     * 同步数据到磁盘当中,相当于让数据实时持久化
     *
     * @return
     */
    public boolean sync() {
        if (myDbEnvironment != null) {
            try {
                myDbEnvironment.sync();
            } catch (Exception e) {
                log.warn("BdbOperator sync 失败", e);
            }
            return true;
        }
        return false;
    }

    /**
     * 关闭环境变量数据库
     *
     * @return
     */
    public boolean close() {
        try {
            if (myDatabase != null) {
                myDatabase.close();
            }
            if (myDbEnvironment != null) {
                myDbEnvironment.sync();
                myDbEnvironment.cleanLog();
                myDbEnvironment.close();
            }
            return true;
        } catch (DatabaseException e) {
            log.warn("BdbOperator close 失败", e);
        }
        return false;
    }
}

基于LevelDB实现本地队列:

原理:

内存中维护俩个值:head:队头 tail:队尾 同时将这俩个值 插入到levelDB里

InitLevelDB:容器加载时初始化,获取队头队尾的值

    @PostConstruct
    public void initLevelDB() throws IOException {
        if (StringUtils.isNotBlank(dbPath)) {
            DBFactory factory = new Iq80DBFactory();
            Options options = new Options();
            options.createIfMissing(true);
            db = factory.open(new File(dbPath), options);
            String headString = get(HEAD_KEY);
            if (headString != null) {
                head = Long.parseLong(headString);
            }
            String tailString = get(TAIL_KEY);
            if (tailString != null) {
                tail = Long.parseLong(tailString);
            }
            //启动时读到末尾,重置队列游标。队头越过队尾重置队头
            if (head == tail && head != 0) {
                head = 0;
                tail = 0;
                put(HEAD_KEY, String.valueOf(head));
                put(TAIL_KEY, String.valueOf(tail));
            } else if (head > tail) {
                head = tail;
            }
        }
    }

Push:每次push插入到队尾 tail+=1,存储K:V为:(tail+=1,value),更新tail_key队尾的值

/**
     * 插入队尾
     * @param value
     */
    public synchronized void push(String value) {
        if (db == null) {
            return;
        }
        if (tail == Long.MAX_VALUE) {
            log.error("本地缓存队列已超过系统瓶颈,请重启服务;msg={}", value);
            return;
        }
        tail += 1;
        put(TAIL_KEY, String.valueOf(tail));
        put(String.valueOf(tail), value);
    }

Pop:获取当前初始化的held队头+1(初始化后对头=队尾,每次push队尾+1),为空二分法获取最小有值队头,删除当前弹出队头, 更新head_key为head+1

/**
     * 弹出队头
     * @return
     */
    public synchronized String pop() {
        if (db == null) {
            return null;
        }
        String find = String.valueOf(head + 1);
        String value = get(find);
        if (value == null) {
            Long hasValueHead = findHasValueHead(head + 1, tail);
            if (hasValueHead != null) {
                delete(String.valueOf(hasValueHead.longValue()));
                head = hasValueHead;
                put(HEAD_KEY, String.valueOf(head));
                return get(String.valueOf(head));
            }
            return null;
        }
        delete(find);
        head += 1;
        put(HEAD_KEY, String.valueOf(head));
        return value;
    }

二分法查找最小有值对头:

/**
     * 二分查找最小有值的队头
     * @param head
     * @param tail
     * @return
     */
    private Long findHasValueHead(long head, long tail) {
        if (head > tail) {
            return null;
        }
        long mid = (head + tail) / 2;
        if (get(String.valueOf(mid)) == null) {
            return findHasValueHead(mid + 1, tail);
        } else {
            if (head == mid) {
                return head;
            }
            return findHasValueHead(head, mid);
        }
    }

具体代码实现:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBFactory;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.impl.Iq80DBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;

@Service
@Slf4j
public class LevelDb {
    @Value("${levelDB.folder:}")
    private String dbPath;
    private DB db = null;
    private static final String HEAD_KEY = "head_key";
    private long head = 0;
    private static final String TAIL_KEY = "tail_key";
    private long tail = 0;
    @PostConstruct
    public void initLevelDB() throws IOException {
        if (StringUtils.isNotBlank(dbPath)) {
            DBFactory factory = new Iq80DBFactory();
            Options options = new Options();
            options.createIfMissing(true);
            db = factory.open(new File(dbPath), options);
            String headString = get(HEAD_KEY);
            if (headString != null) {
                head = Long.parseLong(headString);
            }
            String tailString = get(TAIL_KEY);
            if (tailString != null) {
                tail = Long.parseLong(tailString);
            }
            //启动时读到末尾,重置队列游标。队头越过队尾重置队头
            if (head == tail && head != 0) {
                head = 0;
                tail = 0;
                put(HEAD_KEY, String.valueOf(head));
                put(TAIL_KEY, String.valueOf(tail));
            } else if (head > tail) {
                head = tail;
            }
        }
    }
    /**
     * 插入队尾
     * @param value
     */
    public synchronized void push(String value) {
        if (db == null) {
            return;
        }
        if (tail == Long.MAX_VALUE) {
            log.error("本地缓存队列已超过系统瓶颈,请重启服务;msg={}", value);
            return;
        }
        tail += 1;
        put(TAIL_KEY, String.valueOf(tail));
        put(String.valueOf(tail), value);
    }
    /**
     * 弹出队头
     * @return
     */
    public synchronized String pop() {
        if (db == null) {
            return null;
        }
        String find = String.valueOf(head + 1);
        String value = get(find);
        if (value == null) {
            Long hasValueHead = findHasValueHead(head + 1, tail);
            if (hasValueHead != null) {
                delete(String.valueOf(hasValueHead.longValue()));
                head = hasValueHead;
                put(HEAD_KEY, String.valueOf(head));
                return get(String.valueOf(head));
            }
            return null;
        }
        delete(find);
        head += 1;
        put(HEAD_KEY, String.valueOf(head));
        return value;
    }
    /**
     * 获取缓存的值
     * @param key
     * @return
     */
    public String get(String key) {
        if (db == null) {
            return null;
        }
        byte[] bytes = db.get(Iq80DBFactory.bytes(key));
        return Iq80DBFactory.asString(bytes);
    }
    /**
     * 游标方式获取列表
     * @param max
     * @return
     */
    public LinkedHashMap<String, String> iteratorDb(int max) {
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
        if (db == null) {
            return linkedHashMap;
        }
        DBIterator iterator = db.iterator();
        int num = 0;
        while (iterator.hasNext()) {
            if (num ++ > max) {
                break;
            }
            Map.Entry<byte[], byte[]> next = iterator.next();
            String key = Iq80DBFactory.asString(next.getKey());
            String value = Iq80DBFactory.asString(next.getValue());
            linkedHashMap.put(key, value);
        }
        return linkedHashMap;
    }
    /**
     * 获取队头游标
     * @return
     */
    public long getHead() {
        return head;
    }
    /**
     * 获取队尾游标
     * @return
     */
    public long getTail() {
        return tail;
    }
    /**
     * 获取队列长度
     * @return
     */
    public long getLength() {
        return tail - head;
    }
    /**
     * 写入k-v db
     * @param key
     * @param value
     */
    private void put(String key, String value)  {
        db.put(Iq80DBFactory.bytes(key), Iq80DBFactory.bytes(value));;
    }
    /**
     * 删除key
     * @param key
     */
    public void delete(String key)  {
        db.delete(Iq80DBFactory.bytes(key));;
    }
    /**
     * 二分查找最小有值的队头
     * @param head
     * @param tail
     * @return
     */
    private Long findHasValueHead(long head, long tail) {
        if (head > tail) {
            return null;
        }
        long mid = (head + tail) / 2;
        if (get(String.valueOf(mid)) == null) {
            return findHasValueHead(mid + 1, tail);
        } else {
            if (head == mid) {
                return head;
            }
            return findHasValueHead(head, mid);
        }
    }
 }

性能对比结果:

push性能比较:

DBD 填充ms 单条 耗时: 3 500条:548 5000条:2820

LDB 填充ms 单条 耗时: 3 500条:36 5000条:129

LDB吞吐量测试:

吞吐量:
1条:push2ms  pop2ms
100条:push10ms  pop:11ms
1000条:push33ms  pop:138ms
10000条:push375ms  pop:1213ms
100000条:push2707ms  pop:7293ms
1000000条:push33601ms  pop:159013ms
10000000条:push494457ms 
最大容量 10000000+
并发:200线程/每个线程push1000条消息 
java.net.SocketTimeoutException: Read timed out

推荐阅读