Refactor structure
This commit is contained in:
184
db/src/lib.rs
Executable file
184
db/src/lib.rs
Executable file
@@ -0,0 +1,184 @@
|
||||
pub mod types;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use rocksdb::{ColumnFamilyDescriptor, IteratorMode, OptimisticTransactionDB, Options};
|
||||
use serde::{Serialize};
|
||||
use serde::de::DeserializeOwned;
|
||||
use crate::types::{RocksColumn, RocksReference, RocksReferences};
|
||||
use color_eyre::Result;
|
||||
use directories::BaseDirs;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
const APP_DIR_NAME: &str = "sus_manager";
|
||||
lazy_static! {
|
||||
static ref BASE_DIRS: BaseDirs = BaseDirs::new().unwrap();
|
||||
static ref APP_DB_DATA_DIR: PathBuf = BASE_DIRS.data_dir().to_path_buf().join(APP_DIR_NAME).join("db");
|
||||
}
|
||||
|
||||
fn get_db_options() -> Options {
|
||||
let mut opts = Options::default();
|
||||
|
||||
opts.create_missing_column_families(true);
|
||||
opts.create_if_missing(true);
|
||||
opts.increase_parallelism(num_cpus::get() as i32);
|
||||
|
||||
opts
|
||||
}
|
||||
|
||||
fn get_db_read_options() -> rocksdb::ReadOptions {
|
||||
let mut opts = rocksdb::ReadOptions::default();
|
||||
opts.set_async_io(true);
|
||||
opts
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RocksDBFactory {
|
||||
cfs: Vec<String>,
|
||||
path: PathBuf,
|
||||
db_opts: Options,
|
||||
cf_opts: Options,
|
||||
context: Option<RocksDB>
|
||||
}
|
||||
|
||||
impl RocksDBFactory {
|
||||
pub fn new(path: PathBuf, db_opts: Options, cf_opts: Options) -> Result<Self> {
|
||||
let instance = Self {
|
||||
cfs: vec![],
|
||||
path,
|
||||
db_opts,
|
||||
cf_opts,
|
||||
context: None
|
||||
};
|
||||
if !instance.path.exists() {
|
||||
std::fs::create_dir_all(instance.path.as_path())?;
|
||||
}
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
pub fn register<T>(&mut self) where T: RocksColumn {
|
||||
self.cfs.push(T::get_column_name());
|
||||
}
|
||||
|
||||
pub fn get_current_context(&mut self) -> Result<RocksDB> {
|
||||
if let Some(context) = &self.context {
|
||||
return Ok(context.clone());
|
||||
}
|
||||
let cfs = self.cfs
|
||||
.iter()
|
||||
.map(|cf| ColumnFamilyDescriptor::new(cf, self.cf_opts.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
let context = RocksDB::new(cfs, self.path.clone(), self.db_opts.clone())?;
|
||||
self.context = Some(context.clone());
|
||||
Ok(context)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RocksDBFactory {
|
||||
fn default() -> Self {
|
||||
Self::new(APP_DB_DATA_DIR.to_path_buf(), get_db_options(), Options::default()).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RocksDB {
|
||||
db: Arc<OptimisticTransactionDB>,
|
||||
}
|
||||
|
||||
impl RocksDB {
|
||||
pub fn new(cfs: Vec<ColumnFamilyDescriptor>, path: PathBuf, db_opts: Options) -> Result<Self> {
|
||||
let db = OptimisticTransactionDB::open_cf_descriptors(
|
||||
&db_opts,
|
||||
path.as_path(),
|
||||
cfs
|
||||
)?;
|
||||
let rocks = Self {
|
||||
db: Arc::new(db)
|
||||
};
|
||||
Ok(rocks)
|
||||
}
|
||||
|
||||
pub fn get_value<TColumn>(&self, id: &TColumn::Id) -> Result<Option<TColumn>>
|
||||
where TColumn: RocksColumn + DeserializeOwned
|
||||
{
|
||||
let cf = self.db.cf_handle(TColumn::get_column_name().as_str()).unwrap();
|
||||
let query_res = self.db.get_cf(&cf, serde_json::to_string(id)?)?;
|
||||
if query_res.is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut value: TColumn = serde_json::from_slice(&query_res.unwrap())?;
|
||||
value.set_id(id.clone());
|
||||
Ok(Some(value))
|
||||
}
|
||||
|
||||
pub fn set_value<TColumn>(&self, value: &TColumn) -> Result<()>
|
||||
where TColumn: RocksColumn + Serialize
|
||||
{
|
||||
let cf = self.db.cf_handle(TColumn::get_column_name().as_str()).unwrap();
|
||||
self.db.put_cf(&cf, serde_json::to_string(&value.get_id())?, serde_json::to_string(value)?)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_values<TColumn>(&self, ids: &[TColumn::Id]) -> Result<Vec<TColumn>>
|
||||
where TColumn: RocksColumn + DeserializeOwned
|
||||
{
|
||||
let transaction = self.db.transaction();
|
||||
let cf = self.db.cf_handle(TColumn::get_column_name().as_str()).unwrap();
|
||||
let mut values = Vec::new();
|
||||
for id in ids {
|
||||
let query_res = transaction.get_cf(&cf, serde_json::to_string(id)?)?;
|
||||
if let Some(res) = query_res {
|
||||
let mut value: TColumn = serde_json::from_slice(&res)?;
|
||||
value.set_id(id.clone());
|
||||
values.push(value);
|
||||
}
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub fn get_reference_value<TReference, TColumn>(&self, id: &TReference::Id) -> Result<Option<TReference>>
|
||||
where TReference: RocksColumn + DeserializeOwned,
|
||||
TColumn: RocksColumn + RocksReference<TReference>
|
||||
{
|
||||
let reference = self.get_value::<TReference>(id)?;
|
||||
if reference.is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(reference.unwrap()))
|
||||
}
|
||||
|
||||
pub fn get_reference_values<TReference, TColumn>(&self, ids: &[TReference::Id]) -> Result<Vec<TReference>>
|
||||
where TReference: RocksColumn + DeserializeOwned,
|
||||
TColumn: RocksColumn + RocksReferences<TReference>
|
||||
{
|
||||
self.get_values::<TReference>(ids)
|
||||
}
|
||||
|
||||
pub fn get_all_values<TColumn>(&self) -> Result<Vec<TColumn>>
|
||||
where TColumn: RocksColumn + DeserializeOwned
|
||||
{
|
||||
let cf = self.db.cf_handle(TColumn::get_column_name().as_str()).unwrap();
|
||||
let values = self.db.iterator_cf_opt(&cf, get_db_read_options(), IteratorMode::Start)
|
||||
.filter_map(Result::ok)
|
||||
.map(|(k, v)| {
|
||||
let id = serde_json::from_slice::<TColumn::Id>(&k).unwrap();
|
||||
let mut value = serde_json::from_slice::<TColumn>(&v).unwrap();
|
||||
value.set_id(id);
|
||||
value
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub fn set_values<TColumn>(&mut self, values: &[TColumn]) -> Result<()>
|
||||
where TColumn: RocksColumn + Serialize
|
||||
{
|
||||
let transaction = self.db.transaction();
|
||||
let cf = self.db.cf_handle(TColumn::get_column_name().as_str()).unwrap();
|
||||
for value in values {
|
||||
transaction.put_cf(&cf, serde_json::to_string(&value.get_id())?, serde_json::to_string(value)?)?;
|
||||
}
|
||||
transaction.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
17
db/src/types.rs
Executable file
17
db/src/types.rs
Executable file
@@ -0,0 +1,17 @@
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
|
||||
pub trait RocksColumn {
|
||||
type Id: Serialize + DeserializeOwned + Clone;
|
||||
fn get_id(&self) -> Self::Id;
|
||||
fn set_id(&mut self, id: Self::Id);
|
||||
fn get_column_name() -> String;
|
||||
}
|
||||
|
||||
pub trait RocksReference<T> where T: RocksColumn {
|
||||
fn get_reference_id(&self) -> T::Id;
|
||||
}
|
||||
|
||||
pub trait RocksReferences<T> where T: RocksColumn {
|
||||
fn get_reference_ids(&self) -> Vec<T::Id>;
|
||||
}
|
||||
Reference in New Issue
Block a user