Система распределённого управления на Rust

Страницы:  1

Ответить
 

Professor Seleznov


Недавно мне в голову пришла мысль о том, как бы получать список техники, на которой, например, установлен последний патч системы или нужное ПО. Можно написать скрипты, но, как мне кажется, это не совсем удобно и безопасно, особенно когда парк машин неоднообразен. Поэтому я предлагаю реализовать систему, где вся запрашиваемая информация будет собираться с помощью агентов на конечных устройствах в реальном времени с помощью специального языка запросов.
Как я думаю, у этой системы довольно большой потенциал, ведь его можно доработать и превратить в какой-нибудь аналог Ansible. Или в базу для SIEM системы. Или прикрутить к какой-нибудь ITSM-системе, чтобы проводить инвентаризацию по нужным параметрам.
Вся система будет строиться на gRPC, так как он из коробки идёт со всем необходимы, включая простую интеграцию систем аутентификации и TLS, плюс описание интерфейсов в .proto файлах позволит писать клиенты и агентов на различных языках и фреймворках. Единственный минус, так это то, что двунаправленное взаимодействие не укладывается в философию фреймворка, но это решим в процессе написания.
Как это будет работать
Наша система будет представлять из себя агентов, которые устанавливают соединение с севером, а сервер уже будет по этим соединениям отправлять команды и получать результаты выполнения. А отправлять команды на сервер будем посредством другого, простого gRPC клиента, который также будет устанавливать соединение с сервером, но работать с другим сервисом.
pic
Почему бы просто не прикрутить какую-нибудь командую строку серверу, чтобы сразу на нём выполнять команды? Мне показалось такое решение довольно неудобным, особенно в малоуправляемых средах по типу Docker-контейнера. Будет куда практичнее сделать именно так, как я описал выше. Это позволит даже с помощью скриптов запрашивать нужную информацию с сервера и не придётся городить для этого трёхэтажные костыли. Если я в чём-то ошибаюсь, то поправьте меня, пожалуйста, в комментариях.
gRPC
Начнём с того, что опишем весь интерфейс взаимодействия сервера с агентами и клиентами. Для этого создадим в корне проекта папку proto и создадим файл main.proto, в котором опишем сервис Excavator. Да, хотелось какое-то вычурное и необычное имя, которое будет олицетворять какую-то тяжёлую работу.
Как можете наблюдать ниже, агент-серверное общение будет происходить посредством стримов, это позволит достичь двунаправленное общение. Чтобы особо не путаться в типах сообщений, я решил просто объединить их в одно.
И наличие ExcavatorHeartbeat может показаться избыточным и ненужным, но оно на самом деле нужно для того, чтобы установить соединение.
syntax = "proto3";
package main;
service Excavator {
rpc RunExcavator (stream ExcavatorMessage) returns (stream ExcavatorMessage);
}
message ExcavatorHeartbeat {}
message ExcavatorCommandArg {
string key = 1;
string value = 2;
}
message ExcavatorCommand {
string uid = 1;
string name = 2;
repeated ExcavatorCommandArg args = 3;
}
message ExcavatorResponseResult {
string key = 1;
string value = 2;
}
message ExcavatorResponse {
string uid = 1;
int64 code = 2;
repeated ExcavatorResponseResult results = 3;
}
message ExcavatorMessage {
oneof request {
ExcavatorHeartbeat heartbeat = 1;
ExcavatorResponse response = 2;
ExcavatorCommand command = 3;
}
}
Теперь можем описать клиент-серверное общение, для этого создадим ещё один файл client.proto и опишем сервис Query, содержащий два метода: для запросов и логов. Да, будем отправлять логи сервера на клиент.
Как можете наблюдать, стримов тут нет. Это всё из-за того, что не сервер будет обращаться к клиенту, а клиент к серверу.
syntax = "proto3";
package client;
service Query {
rpc Query (QueryRequest) returns (QueryResponse);
rpc Logs (LogsQueryRequest) returns (stream LogsQueryResponse);
}
message LogsQueryRequest {}
message LogsQueryResponse {
repeated string log = 1;
}
message QueryRequest {
string command = 1;
}
message TableCol {
string key = 1;
string data = 2;
}
message TableRow {
repeated TableCol cols = 1;
}
message QueryResponse {
repeated TableRow rows = 1;
}
Сервер
Предлагаю начать от того, куда все будут пытаться подключиться, а именно с сервера. Для этого давайте инициализируем проект. Но для начала создадим Cargo.toml в корне проекта.
[workspace]
resolver = "3"
Теперь можем инициализировать проект сервера следующей командой, она автоматом добавит проект в члены воркспейса. Но также создадим проект библиотеки, которую будут использовать все проекты для генерации кода gRPC.
cargo new server
cargo new --lib build-common
Библиотека будет очень миниатюрная, будет содержать лишь пару функций для генерации кода в build.rs. Но для начала перейдём в этот проект и в зависимости добавим следующее:
[package]
name = "build-common"
version = "0.1.0"
edition = "2024"
[dependencies]
tonic-prost-build = "0.14"
glob = "0.3"
Теперь в lib.rs можем написать следующие функции:
use std::{env, fs, io, path::Path};
pub const PROTO_FOLDER: &str = "proto";
pub fn get_proto_folder() -> std::path::PathBuf {
Path::new(&env::var("CARGO_MANIFEST_DIR").expect("CARGO_WORKSPACE not set"))
.join("..")
.join(PROTO_FOLDER)
}
pub fn compile_protos_folder(folder: impl AsRef<Path>) -> io::Result<()> {
let folder = folder.as_ref();
println!("cargo:rerun-if-changed={}/*", folder.display());
for entry in fs::read_dir(folder)? {
tonic_prost_build::compile_protos(entry.unwrap().path())?;
}
Ok(())
}
Откроем server/Cargo.toml и добавим этот проект в build-зависимости проекта сервера. Также сразу в обычные зависимости добавляем следующие крейты, они потребуются для кодогенерации.
[package]
name = "server"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.41", features = ["full"] }
prost = "0.14"
tonic = { version = "0.14", features = ["tls-ring"] }
tonic-prost = "0.14"
hex = "0.4"
tokio-stream = "0.1"
lazy_static = "1.5"
futures = "0.3"
small_uid = "0.2.4"
nom = "8.0.0"
[build-dependencies]
build-common = { path = "../build-common" }
Создадим файл server\build.rs и добавим туда следующие строки:
fn main() {
let proto_folder = build_common::get_proto_folder();
build_common::compile_protos_folder(&proto_folder).unwrap();
}
Если проект собирается, то код успешно генерируется. Чтобы взаимодействовать с ним, создадим папку server\proto с файлом mod.rs, в нём добавим следующие строчки:
tonic::include_proto!("main");
tonic::include_proto!("client");
Предлагаю продолжить с чего-то простого, как, например, реализация логов. Для этого создадим модуль client.rs в server\src\proto. Все логи будем хранить в глобальном векторе и будем динамически подгружать их клиентам с помощью стрима. Чтобы ручками каждый раз не делать всё это, создадим глобальную структуру LogsManager, которая будет помещать логи в этот вектор и отправлять в поток.
В целом, думаю, тут нечего объяснять, всё довольно просто. Кроме того, что lazy_static нужен из-за того, что Mutex из tokio не const-совместимый, как, например, Mutex из стандартной библиотеки, поэтому просто так в статике его не инициализировать.
Обёртка в виде LogsReceiver нужен чисто для того, чтобы не запутаться в каналах, компилятор будет защищать нас от глупых ошибок. Наверное.
lazy_static! {
pub static ref LOGS_MANAGER: LogsManager = LogsManager(broadcast::channel(128).0);
static ref LOGS: Mutex<Vec<String>> = Mutex::new(Vec::new());
}
pub struct LogsManager(broadcast::Sender<String>);
impl LogsManager {
pub async fn send_log(&self, log: String) {
LOGS.lock().await.push(log.clone());
if let Err(err) = self.0.send(log) {
match err {
SendError(log) => {
LOGS.lock().await.push(format!("dropped log when no subscribers: '{log}'"));
}
}
}
}
pub fn subscribe(&self) -> LogsReceiver {
LogsReceiver(self.0.subscribe())
}
}
pub struct LogsReceiver(broadcast::Receiver<String>);
impl Deref for LogsReceiver {
type Target = broadcast::Receiver<String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
Давайте реализуем сервис Query. Для этого создадим структуру QueryService, которая будет содержать ссылку на наш канал LogsReceiver.
pub struct QueryService(LogsReceiver);
impl QueryService {
pub fn new(logs: LogsReceiver) -> Self {
Self(logs)
}
}
Имплементируем сгенерированный из proto-файла трейт Query для созданной структуры. Метод query будет пока пустым, чуть позже его заполним. В силу того, что методы асинхронные, а нормальной поддержки асинхронных трейтов в языке ещё нет, используется реэкспорт async_trait, помогающий этого добиться.
#[tonic::async_trait]
impl Query for QueryService {
async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> {
todo!()
}
type LogsStream = Pin<Box<dyn Stream<Item = Result<LogsQueryResponse, Status>> + Send>>;
async fn logs(&self, _: Request<LogsQueryRequest>) -> Result<Response<Self::LogsStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let mut logs_rx = self.0.resubscribe();
// Свободно плавающий поток, который будет слушать поток от LogsManager
// и отправлять в стрим новые логи.
tokio::spawn(async move {
let log = LOGS.lock().await.clone();
let entry = LogsQueryResponse { log };
// при первом подключении отправляем все логи сразу
tx.send(Ok(entry)).await.expect("failed to send log");
// динамически отправляем новые логи
while let Ok(log) = logs_rx.recv().await {
let entry = LogsQueryResponse { log: vec![log] };
tx.send(Ok(entry)).await.expect("failed to send log");
}
});
// Создаём поток из канала с помощью обёртки из tokio-stream
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
}
Перед тем, как продолжить с реализацией метода query, предлагаю сначала реализовать небольшой язык запросов. В силу того, что на момент написания статьи мне было лень его прорабатывать, он будет очень простым и будет состоять всего из нескольких выражений. Для этого создадим модуль query.rs в server\src.
Для реализации парсера воспользуемся комбинаторами из крейта nom. Что такое парсинг-комбинаторы можете ознакомиться в документации, ссылку на которую я привёл.
Первые наши команды будут выглядеть как-то так. То есть первая команда будет составлять список агентов по какому-то критерию, а вторая будет делать запросы к конкретному агенту. Запросы будут выглядеть как функции с массивом аргументов.
list by field_name
from agent select say_hello(name = world), machine(get = name)
Начнём с того, что опишем AST структурами Rust. То есть мы будем перемещаться по тексту запроса с помощью комбинаторов и превращать его в структуры. Как можете наблюдать, AST будет очень простой, так как те же числа мы не будем парсить, поэтому язык будет пока довольно ограничен, но для примера хватит, я думаю.
#[derive(Debug, PartialEq)]
pub struct InvokeFuncArg {
pub name: String,
pub value: String,
}
#[derive(Debug, PartialEq)]
pub struct InvokeFunc {
pub name: String,
pub args: Vec<InvokeFuncArg>,
}
#[derive(Debug, PartialEq)]
pub enum QueryExpr {
ListBy(String),
SelectFrom { from: String, select: Vec<InvokeFunc> },
}
Начнём от частного к общему, то есть реализуем сначала составные части, по типу парсеров значений и будем двигаться в сторону парсера всего выражения. Для начала реализуем парсеры identifier и field. Всё отличие в том, что первый парсит текст с числами, а второе просто текстовые значения. То есть будет запрещено в названиях полей использовать что-то, кроме буков.
fn identifier(input: &str) -> IResult<&str, &str> {
// берёт символы до тех пор, пока функция char::is_alphanumeric не вернёт false,
// а потом возвращает эту подстроку
take_while(char::is_alphanumeric).parse(input)
}
fn field(input: &str) -> IResult<&str, &str> {
take_while(char::is_alphabetic).parse(input)
}
Думаю, этого хватит, чтобы реализовать парсер для выражения list by field_name. Парсер tag берёт какую-то строку и проверяет, соответствует ли входная строка этому значению.
Все парсеры возвращают слайс оставшегося входного текста. То есть если на вход этому парсеру вошла строчка list by field_name, то вернёт от оставшегося текста слайс field_name.
Парсер space1 - встроенный в крейт парсер, который парсит пробелы. Число в конце функции означает то, что должен встретиться минимум один пробел. Есть аналогичный парсер space0, который делает то же самое, но пробелов может вовсе не быть.
fn list_by(input: &str) -> IResult<&str, QueryExpr> {
let (input, _) = tag("list by").parse(input)?;
let (input, _) = space1(input)?;
let (input, field_name) = field(input)?;
Ok((input, QueryExpr::ListBy(field_name.to_string())))
}
Давайте реализуем парсер функций. Для этого имплементируем парсер аргументов. Он тоже довольно простой, ведь сначала мы реализуем парсер одного аргумента invoke_arg, а потом с помощью парсера separated_list0, принимающий на вход два парсера: разделителя и значения.
Если честно, то не понял разницы между separated_list0 и separated_list1, так как у них поведение идентичное: выбрасывают ошибку, если один из парсеров на входе выдал ошибку. Так как у нас тут не может встретиться EOF, то это не проблема, но далее это создаст проблему.
fn invoke_arg(input: &str) -> IResult<&str, InvokeFuncArg> {
// считываем название аргумента
let (input, name) = field(input)?;
// возможные пробелы перед и после знака '='
let (input, _) = space0(input)?;
let (input, _) = char('=').parse(input)?; // то же самое, что парсер `tag`, но на вход принимает исключительно символы
let (input, _) = space0(input)?;
// считываем значение аргумента
let (input, value) = identifier(input)?;
Ok((
input,
InvokeFuncArg {
name: name.to_string(),
value: value.to_string(),
},
))
}
fn invoke_args(input: &str) -> IResult<&str, Vec<InvokeFuncArg>> {
separated_list0(invoke_list_separate, invoke_arg).parse(input)
}
fn invoke_list_separate(input: &str) -> IResult<&str, ()> {
let (input, _) = space0(input)?;
let (input, _) = char(',').parse(input)?;
let (input, _) = space0(input)?;
Ok((input, ()))
}
Реализуем парсер функции и списка функций. Так как парсер функции очень простой, то не буду на нём подробно останавливаться.
У нас обязательно присутствует хоть одна функция, поэтому сначала вызываем парсер функции и добавляем результат его парсинга в вектор. Далее сепаратор и последующие вызовы функций опциональны, но если у нас может быть плавающая запятая, поэтому мы не возвращаем ошибку из парсера invoke_list_separate, а лишь прерываем цикл и возвращаем массив распарсенных функций; а вот вызов последующих функций обязателен, поэтому возвращаем ошибку из invoke_func. Но если удалось распарсить функцию, то сохраняем слайс, чтобы парсер не ушёл в бесконечный цикл парсинга одного и того же, и полученную функцию в вектор.
Как упоминалось ранее, все эти костыли нужны из-за того, что separated_list0 выкидывает ошибку при встрече EOF, который может встретиться в данном случае.
// парсер одной функции
fn invoke_func(input: &str) -> IResult<&str, InvokeFunc> {
let (input, name) = map(field, |s| s.to_string()).parse(input)?;
let (input, _) = space0(input)?;
let (input, _) = char('(').parse(input)?;
let (input, _) = space0(input)?;
let (input, args) = invoke_args(input)?;
let (input, _) = space0(input)?;
let (input, _) = char(')').parse(input)?;
Ok((input, InvokeFunc { name, args }))
}
// парсер списка функций
fn invoke_list(input: &str) -> IResult<&str, Vec<InvokeFunc>> {
let (input, first) = invoke_func(input)?;
let mut acc = vec![first];
let mut input = input;
while let Ok((i, _)) = invoke_list_separate(input) {
let (i, func) = invoke_func(i)?;
acc.push(func);
input = i;
}
Ok((input, acc))
}
Далее можем реализовать парсер более сложного выражения from agent select say_hello(name = world), machine(get = name). Тут тоже ничего особенного, поэтому перейдём сразу к функции parse_query, где есть парсер alt, который запускает последовательно кортеж/вектор переданных в него парсеров пока один из них не вернёт Ok. То есть сначала запускаем парсер list_by и если парсер tag("list by") возвращает Err, то эта ошибка возвращается выше в комбинатор alt, который, в свою очередь, запускает далее парсер select_from.
fn select_from(input: &str) -> IResult<&str, QueryExpr> {
let (input, _) = tag("from").parse(input)?;
let (input, _) = space1(input)?;
let (input, from) = map(identifier, |s| s.to_string()).parse(input)?;
let (input, _) = space1(input)?;
let (input, _) = tag("select").parse(input)?;
let (input, _) = space1(input)?;
let (input, invoke_list) = invoke_list(input)?;
Ok((input, QueryExpr::SelectFrom { from, select: invoke_list }))
}
pub async fn parse_query(query: &str) -> Result<QueryExpr, QueryParseError> {
let query = alt((list_by, select_from)).parse(query);
match query {
Ok(result) => Ok(result.1),
Err(err) => {
LOGS_MANAGER.send_log(format!("parse error: {err}")).await;
Err(QueryParseError)
}
}
}
Но толку от этого всего, если нельзя сделать выборку нужных устройств. Поэтому предлагаю немного усложнить синтаксис нашего языка запросов и добавить стейтмент where, после которого будем вычислять boolean значение, включая запросы к агентам, которые будут выполнены в виде функций.
Первое, что нам потребуется сделать, это дополнить модель AST. Будем использовать метод рекурсивного спуска, поэтому добавляем детерминированные значения Term и Value в перечисления QueryConditionExpr и QueryConditionTerm, соответственно.
#[derive(Debug, PartialEq, Clone)]
pub struct InvokeFuncArg {
pub name: String,
pub value: String,
}
#[derive(Debug, PartialEq, Clone)]
pub struct InvokeFunc {
pub name: String,
pub args: Vec<InvokeFuncArg>,
}
#[derive(Debug, PartialEq, Clone)]
pub enum QueryValue {
FnField { func: InvokeFunc, field: String },
Identifier(String),
String(String),
Number(f64),
Bool(bool),
Null,
}
impl Display for QueryValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
QueryValue::FnField { func, field } => format!(
"{}({}).{field}",
func.name,
func.args
.iter()
.map(|arg| format!("{}={}", arg.name, arg.value))
.collect::<Vec<_>>()
.join(",")
),
QueryValue::Identifier(s) => s.clone(),
QueryValue::String(s) => s.clone(),
QueryValue::Number(n) => n.to_string(),
QueryValue::Bool(b) => b.to_string(),
QueryValue::Null => "null".to_string(),
};
write!(f, "{}", str)
}
}
#[derive(Debug, PartialEq)]
enum QueryOperation {
Eq,
More,
Less,
}
#[derive(Debug, PartialEq)]
pub enum QueryConditionTerm {
Eq {
left: Box<QueryConditionTerm>,
right: Box<QueryConditionTerm>,
},
More {
left: Box<QueryConditionTerm>,
right: Box<QueryConditionTerm>,
},
Less {
left: Box<QueryConditionTerm>,
right: Box<QueryConditionTerm>,
},
Value(QueryValue),
}
#[derive(Debug, PartialEq)]
enum QueryExprOperation {
And,
Or,
}
#[derive(Debug, PartialEq)]
pub enum QueryConditionExpr {
And {
left: Box<QueryConditionExpr>,
right: Box<QueryConditionExpr>,
},
Or {
left: Box<QueryConditionExpr>,
right: Box<QueryConditionExpr>,
},
Term(QueryConditionTerm),
}
#[derive(Debug, PartialEq)]
pub enum QueryExpr {
ListBy { field: String, condition: Option<QueryConditionExpr> },
SelectFrom { from: String, select: Vec<InvokeFunc> },
}
Теперь можем написать новенькие парсеры. Предлагаю начать с парсера value, который как раз будет парсить конечные значения в нашей модели.
Я, если честно, не уверен, как более правильно реализовать парсер с take_while, чтоб он не выдавал пустую строчку за идентификатор, поэтому воспользовался map_opt, чтоб тот завершал работу парсера ошибкой, если строчка пустая.
У нас есть парсер string, он очень простой, поэтому пока не будет обрабатывать строчки с экранированием символов. Если кому нужно экранирование, то в nom есть парсер escaped для таких целей.
fn identifier(input: &str) -> IResult<&str, QueryValue> {
map_opt(take_while(|c: char| c.is_alphanumeric() || c == '_' || c == '-'), |s: &str| {
if s.is_empty() {
None
}
else {
Some(QueryValue::Identifier(s.to_string()))
}
})
.parse(input)
}
fn string(input: &str) -> IResult<&str, QueryValue> {
map_opt(delimited(char('"'), take_till(|c: char| c == '"'), char('"')), |s: &str| {
if s.is_empty() { None } else { Some(QueryValue::String(s.to_string())) }
})
.parse(input)
}
fn number(input: &str) -> IResult<&str, QueryValue> {
map(double, QueryValue::Number).parse(input)
}
fn boolean(input: &str) -> IResult<&str, QueryValue> {
map(alt((tag("true"), tag("false"))), |s: &str| QueryValue::Bool(s == "true")).parse(input)
}
fn null(input: &str) -> IResult<&str, QueryValue> {
map(tag("null"), |_| QueryValue::Null).parse(input)
}
fn func_field(input: &str) -> IResult<&str, QueryValue> {
map(separated_pair(invoke_func, char('.'), identifier), |(f, i)| QueryValue::FnField {
func: f,
field: i.to_string(),
})
.parse(input)
}
fn value(input: &str) -> IResult<&str, QueryValue> {
alt((string, func_field, identifier, number, boolean, null)).parse(input)
}
Теперь можем рекурсивно распарсить выражение по типу info(type = modules).info = version, написав небольшой парсер, где мы сначала парсим гарантированно какое-то значение, а потом проверяем, идёт ли какая-нибудь операции далее, и если есть, то запускаем рекурсию.
fn operation(input: &str) -> IResult<&str, QueryOperation> {
alt((
map(char('='), |_| QueryOperation::Eq),
map(char('<'), |_| QueryOperation::Less),
map(char('>'), |_| QueryOperation::More),
))
.parse(input)
}
fn condition_term(input: &str) -> IResult<&str, QueryConditionTerm> {
let (input, left) = map(value, QueryConditionTerm::Value).parse(input)?;
let (input, _) = space0(input)?;
let (input, op) = opt(operation).parse(input)?;
if let Some(op) = op {
let (input, _) = space0(input)?;
let (input, right) = condition_term(input)?;
Ok((
input,
match op {
QueryOperation::Eq => QueryConditionTerm::Eq {
left: Box::new(left),
right: Box::new(right),
},
QueryOperation::More => QueryConditionTerm::More {
left: Box::new(left),
right: Box::new(right),
},
QueryOperation::Less => QueryConditionTerm::Less {
left: Box::new(left),
right: Box::new(right),
},
},
))
}
else {
Ok((input, left))
}
}
Можем проделать то же самое с выражениями по типу true & false, то есть term & term.
fn condition_expr_op(input: &str) -> IResult<&str, QueryExprOperation> {
alt((map(char('&'), |_| QueryExprOperation::And), map(char('|'), |_| QueryExprOperation::Or))).parse(input)
}
fn condition_expr(input: &str) -> IResult<&str, QueryConditionExpr> {
let (input, left) = map(condition_term, QueryConditionExpr::Term).parse(input)?;
let (input, _) = space0(input)?;
let (input, op) = opt(condition_expr_op).parse(input)?;
if let Some(op) = op {
let (input, _) = space0(input)?;
let (input, right) = condition_expr(input)?;
Ok((
input,
match op {
QueryExprOperation::And => QueryConditionExpr::And {
left: Box::new(left),
right: Box::new(right),
},
QueryExprOperation::Or => QueryConditionExpr::Or {
left: Box::new(left),
right: Box::new(right),
},
},
))
}
else {
Ok((input, left))
}
}
Доработает парсер тем, что будем парсить сначала стейтмент where, а поток рекурсивно условие.
fn condition(input: &str) -> IResult<&str, QueryConditionExpr> {
let (input, _) = space1(input)?;
let (input, _) = tag("where").parse(input)?;
let (input, _) = space1(input)?;
condition_expr(input)
}
fn list_by(input: &str) -> IResult<&str, QueryExpr> {
let (input, _) = tag("list by").parse(input)?;
let (input, _) = space1(input)?;
let (input, field_name) = identifier(input)?;
let (input, condition) = opt(condition).parse(input)?;
Ok((
input,
QueryExpr::ListBy {
field: field_name.to_string(),
condition,
},
))
}

Чтобы убедиться в том, что всё работает корректно, я написал несколько простых юнит-тестов

#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_parse_field_parser() {
let query = "list by name";
let result = identifier(&query[8..]);
assert_eq!(result, Ok(("", QueryValue::Identifier("name".to_owned()))));
}
#[tokio::test]
async fn test_parse_list_by() {
let query = "list by name";
let result = list_by(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::ListBy {
field: "name".to_owned(),
condition: None
}
))
);
}
#[tokio::test]
async fn test_parse_list_by_where() {
let query = "list by name where addr = \"127.0.0.1:8080\"";
let result = list_by(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::ListBy {
field: "name".to_owned(),
condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
left: Box::new(QueryConditionTerm::Value(QueryValue::Identifier("addr".to_string()))),
right: Box::new(QueryConditionTerm::Value(QueryValue::String("127.0.0.1:8080".to_string()))),
}))
}
))
);
}
#[tokio::test]
async fn test_parse_list_by_where_invoke_func_field() {
let query = "list by name where version().version = \"1.0.0\"";
let result = list_by(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::ListBy {
field: "name".to_owned(),
condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
func: InvokeFunc {
name: "version".to_owned(),
args: vec![]
},
field: "version".to_owned()
})),
right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))),
}))
}
))
);
}
#[tokio::test]
async fn test_parse_list_by_where_invoke_func_field_with_args() {
let query = "list by name where info(type = modules).version = \"1.0.0\"";
let result = list_by(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::ListBy {
field: "name".to_owned(),
condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
func: InvokeFunc {
name: "info".to_owned(),
args: vec![InvokeFuncArg {
name: "type".to_owned(),
value: "modules".to_owned(),
}],
},
field: "version".to_owned()
})),
right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))),
}))
}
))
);
}
#[tokio::test]
async fn test_parse_list_by_where_invoke_func_field_with_args_and_other() {
let query = "list by name where info(type = modules).version = \"version\" & version().version = \"0.1.0\"";
let result = list_by(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::ListBy {
field: "name".to_owned(),
condition: Some(QueryConditionExpr::And {
left: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq {
left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
func: InvokeFunc {
name: "info".to_string(),
args: vec![InvokeFuncArg {
name: "type".to_string(),
value: "modules".to_string(),
}],
},
field: "version".to_string()
})),
right: Box::new(QueryConditionTerm::Value(QueryValue::String("version".to_string()))),
})),
right: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq {
left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
func: InvokeFunc {
name: "version".to_string(),
args: vec![],
},
field: "version".to_string()
})),
right: Box::new(QueryConditionTerm::Value(QueryValue::String("0.1.0".to_string()))),
})),
})
}
))
);
}
#[tokio::test]
async fn test_parse_identifier_parser() {
let query = "from name select version()";
let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
let (rest, _) = space1::<_, nom::error::Error<&str>>(rest).unwrap();
let result = identifier(rest);
assert_eq!(result, Ok((" select version()", QueryValue::Identifier("name".to_string()))));
}
#[tokio::test]
async fn test_parse_space_parser() {
let query = "from name select version()";
let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
let result = space1::<_, nom::error::Error<&str>>(rest);
assert_eq!(result, Ok(("name select version()", " ")));
}
#[tokio::test]
async fn test_parse_more_space_parser() {
let query = "from name select version()";
let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
let result = space1::<_, nom::error::Error<&str>>(rest);
assert_eq!(result, Ok(("name select version()", " ")));
}
#[tokio::test]
async fn test_parse_from_parser() {
let query = "from name select version()";
let result = tag::<_, _, nom::error::Error<&str>>("from").parse(query);
assert_eq!(result, Ok((" name select version()", "from")));
}
#[tokio::test]
async fn test_parse_maybe_space_no_spaces() {
let query = "";
let result = space0::<_, nom::error::Error<&str>>(query);
assert_eq!(result, Ok(("", "")))
}
#[tokio::test]
async fn test_parse_maybe_space() {
let query = " ,";
let result = space0::<_, nom::error::Error<&str>>(query);
assert_eq!(result, Ok((",", " ")))
}
#[tokio::test]
async fn test_parse_invoke_list_separate() {
let query = " , hello";
let result = invoke_list_separate(query);
assert_eq!(result, Ok(("hello", ())));
}
#[tokio::test]
async fn test_parse_invoke_list_separate_without_space() {
let query = ",hello";
let result = invoke_list_separate(query);
assert_eq!(result, Ok(("hello", ())));
}
#[tokio::test]
async fn test_parse_no_invoke_args() {
let query = "()";
let (input, _) = char::<_, nom::error::Error<&str>>('(').parse(query).unwrap();
let result = invoke_args(input);
assert_eq!(result, Ok((")", vec![])));
}
#[tokio::test]
async fn test_parse_invoke_args() {
let query = "one = first, two = second)";
let result = invoke_args(query);
assert_eq!(
result,
Ok((
")",
vec![
InvokeFuncArg {
name: "one".to_owned(),
value: "first".to_owned()
},
InvokeFuncArg {
name: "two".to_owned(),
value: "second".to_owned()
}
]
))
)
}
#[tokio::test]
async fn test_parse_invoke_func() {
let query = "version()";
let result = invoke_func(query);
assert_eq!(
result,
Ok((
"",
InvokeFunc {
name: "version".to_string(),
args: vec![]
}
))
);
}
#[tokio::test]
async fn test_parse_invoke_list() {
let query = "version()";
let result = invoke_list(query);
assert_eq!(
result,
Ok((
"",
vec![InvokeFunc {
name: "version".to_string(),
args: vec![]
}]
))
)
}
#[tokio::test]
async fn test_parse_invoke_list_many_invoke() {
let query = "version(), hello()";
let result = invoke_list(query);
assert_eq!(
result,
Ok((
"",
vec![
InvokeFunc {
name: "version".to_string(),
args: vec![]
},
InvokeFunc {
name: "hello".to_string(),
args: vec![]
}
]
))
)
}
#[tokio::test]
async fn test_parse_select_from() {
let query = "from name select version()";
let result = select_from(query);
assert_eq!(
result,
Ok((
"",
QueryExpr::SelectFrom {
from: "name".to_string(),
select: vec![InvokeFunc {
name: "version".to_string(),
args: vec![]
}]
}
))
)
}
#[tokio::test]
async fn test_parse_query_select() {
let query = "from name select version()";
let result = parse_query(query).await;
assert_eq!(
result,
Ok(QueryExpr::SelectFrom {
from: "name".to_string(),
select: vec![InvokeFunc {
name: "version".to_string(),
args: vec![]
}]
})
);
}
}
Отлично, можем продолжить, хотел бы сказать, что реализовывать трейт Query, но у нас пока нет пула соединений с агентами, поэтому предлагаю реализовать трейт сервиса Excavator. Для этого создам новый модуль server\src\excavator.rs.
Для начала давайте имплементируем AgentId, чтобы как-то идентифицировать агента в пуле.
#[derive(Eq, PartialEq, Hash)]
pub struct AgentInner {
pub name: String,
pub addr: SocketAddr,
}
#[derive(Clone)]
pub struct AgentId(Arc<Mutex<AgentInner>>);
impl Hash for AgentId {
fn hash<H: Hasher>(&self, state: &mut H) {
let inner = block_in_place(|| Handle::current().block_on(async { self.0.lock().await }));
inner.name.hash(state);
}
}
impl Eq for AgentId {}
impl PartialEq for AgentId {
fn eq(&self, other: &Self) -> bool {
block_in_place(|| {
Handle::current().block_on(async {
let left = self.0.lock().await;
//
match other.0.try_lock() {
Ok(right) => *left == *right,
Err(_) => true,
}
})
})
}
}
impl Deref for AgentId {
type Target = Arc<Mutex<AgentInner>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AgentId {
pub fn new(name: &str, addr: SocketAddr) -> Self {
let inner = AgentInner { name: name.to_owned(), addr };
Self(Arc::new(Mutex::new(inner)))
}
pub fn generate(addr: SocketAddr) -> Self {
// получаем слайс байтов из адреса агента
let bytes: &[u8] = unsafe { std::slice::from_raw_parts(&addr as *const _ as *const u8, std::mem::size_of::<SocketAddr>()) };
// кодируем половину этого массива в шестнадцатеричное представление
let hex = hex::encode(&bytes[..bytes.len() / 2]);
let name = format!("client{}", hex);
let inner = AgentInner { name, addr };
Self(Arc::new(Mutex::new(inner)))
}
}
Ключ для идентификации агентов в пуле есть, а значением будем использовать AgentCommandManager, который будет хранить ссылки на каналы отправки-получения сообщений из стримов. И по мелочи будет отправлять команду агенту и дожидаться ответа.
pub struct AgentCommandManager {
command_tx: mpsc::Sender<Result<ExcavatorMessage, Status>>,
response_rx: broadcast::Receiver<ExcavatorResponse>,
}
impl AgentCommandManager {
pub async fn send(&mut self, msg: ExcavatorCommand) -> Option<ExcavatorResponse> {
let uid = msg.uid.clone();
let msg = ExcavatorMessage {
request: Some(excavator_message::Request::Command(msg)),
};
// отправляем команду
self.command_tx.send(Ok(msg)).await.expect("channel closed");
// дожидаемся нужного ответа из потока
while let Ok(msg) = self.response_rx.recv().await {
if msg.uid == uid {
return Some(msg);
}
}
None
}
}
impl Clone for AgentCommandManager {
fn clone(&self) -> Self {
Self {
command_tx: self.command_tx.clone(),
response_rx: self.response_rx.resubscribe(),
}
}
}
Для хранения всего этого будем использовать обычную HashMap, обёрнутую в структуру AgentsMap.
type AgentsMapInner = Arc<RwLock<HashMap<AgentId, AgentCommandManager>>>;
#[derive(Default, Clone)]
pub struct AgentsMap(AgentsMapInner);
impl Deref for AgentsMap {
type Target = AgentsMapInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
Отлично, теперь можем реализовать трейт Excavator. Для этого создадим структуру ExcavatorService, которая будет хранить ссылку на описанную ранее AgentsMap.
pub struct ExcavatorService(AgentsMap);
impl ExcavatorService {
pub fn new(map: AgentsMap) -> Self {
Self(map)
}
}
#[tonic::async_trait]
impl Excavator for ExcavatorService {
type RunExcavatorStream = Pin<Box<dyn Stream<Item = Result<ExcavatorMessage, Status>> + Send + 'static>>;
async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> {
todo!()
}
}
Предлагаю для начала получить heartbeat-сообщение из стрима и распаковать его до ExcavatorMessage. Другие сообщения нам неинтересны, поэтому мы один раз вызываем stream.next().
async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> {
let addr = request.remote_addr().ok_or(Status::aborted("remote address not found"))?;
let mut stream = request.into_inner();
match stream.next().await {
Some(msg) => match msg {
Ok(msg) => {
if let Some(heartbeat) = msg.request {
match heartbeat {
excavator_message::Request::Heartbeat(_) => {}
// остальные сообщения нам не интересны
_ => {
LOGS_MANAGER
.send_log(format!(
"the agent ({addr}) tried to connect, but the first message should be a heartbeat"
))
.await;
Err(Status::failed_precondition("no heartbeat"))
}
}
}
else {
LOGS_MANAGER
.send_log(format!("the agent {addr} tried to connect, but no heartbeat was detected"))
.await;
Err(Status::failed_precondition("empty heartbeat"))
}
}
Err(err) => {
LOGS_MANAGER
.send_log(format!(
"agent {addr} tried to connect, but the connection could not be established due to: {err}"
))
.await;
Err(Status::unknown("failed to connect"))
}
},
None => {
LOGS_MANAGER
.send_log(format!(
"the agent ({addr}) tried to connect, but it failed because no message was received"
))
.await;
Err(Status::unknown("failed to connect"))
}
}
}
Теперь следите за руками, так как сейчас будет много различных каналов и стримов.
excavator_message::Request::Heartbeat(_) => {
let id = AgentId::generate(addr);
let (command_tx, command_rx) = mpsc::channel(128);
let (response_tx, response_rx) = broadcast::channel(128);
let manager = AgentCommandManager { command_tx, response_rx };
self.0.write().await.insert(id.clone(), manager);
// ...
}
Так как соединение установлено и каналы агента сохранены, теперь можем начать слушать другие сообщения. Делать это будем в отдельном потоке. А в данном потоке возвращаем стрим.
let id = AgentId::generate(addr);
let (command_tx, command_rx) = mpsc::channel(128);
let (response_tx, response_rx) = broadcast::channel(128);
let manager = AgentCommandManager { command_tx, response_rx };
self.0.write().await.insert(id.clone(), manager);
tokio::spawn({
let id = id.clone();
async move {
while let Some(msg) = stream.next().await {
// ...
}
}
});
LOGS_MANAGER
.send_log(format!("agent ({}) connected successfully", id.lock().await.name))
.await;
let stream = ReceiverStream::new(command_rx);
Ok(Response::new(Box::pin(stream) as Self::RunExcavatorStream))
Можем реализовать цикл событий, где нас интересует лишь ответы от агентов. Как только получаем его - отправляем его всем слушателям, то есть запросам от сервиса Query.
while let Some(msg) = stream.next().await {
match msg {
Ok(ExcavatorMessage { request: Some(request) }) => match request {
// отправляем результат команды соответствующему клиенту
excavator_message::Request::Response(response) => {
if let Err(err) = response_tx.send(response) {
LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
}
}
// остальные сообщения нам неинтересны
excavator_message::Request::Heartbeat(_) => {
LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await;
}
excavator_message::Request::Command(_) => {
LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await;
}
},
Err(err) => {
LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
}
_ => {
LOGS_MANAGER
.send_log(format!("received empty message from {}", id.lock().await.name))
.await;
}
}
}
async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(ExcavatorMessage { request: Some(request) }) => match request {
// отправляем результат команды соответствующему клиенту
excavator_message::Request::Response(response) => {
if let Err(err) = response_tx.send(response) {
LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
break;
}
}
// остальные сообщения нам неинтересны
excavator_message::Request::Heartbeat(_) => {
LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await;
}
excavator_message::Request::Command(_) => {
LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await;
}
},
Err(err) => {
LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
break;
}
_ => {
LOGS_MANAGER
.send_log(format!("received empty message from {}", id.lock().await.name))
.await;
}
}
}
// если цикл завершился по какой-либо причине, то, вероятно,
// агент просто напросто отключился, поэтому удаляем его из пула агентов
LOGS_MANAGER.send_log(format!("exit agent ({}) event loop", id.lock().await.name)).await;
if map.write().await.remove_entry(&id).is_some() {
LOGS_MANAGER.send_log(format!("agent ({}) disconnected", id.lock().await.name)).await;
}
}
Всё, это вся реализация сервиса Excavator, довольно просто, не так ли? Теперь, после того, как реализовали AgentsMap, можем приступить к реализации метода query. Думаю, тут нет ничего сложного, поэтому предлагаю перейти к реализации запросов.
async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> {
let query = request.into_inner().command;
if query.is_empty() {
return Err(Status::invalid_argument("empty query"));
}
LOGS_MANAGER.send_log(format!("got query: '{query}'")).await;
let query = parse_query(query.trim())
.await
.map_err(|_| Status::invalid_argument("failed to parse query"))?;
match query {
QueryExpr::ListBy(field) => todo!(),
QueryExpr::SelectFrom { from, select } => todo!(),
}
}
Но перед тем, как продолжить, давайте реализуем структуру EvalContext, которая будет хранить состояние нашего QueryConditionExpr. Вычислять конечное значение будем тоже через рекурсию.
#[derive(Error, Debug)]
enum EvalError {
#[error("not a number: {0}")]
NotNumber(QueryValue),
#[error("not a boolean: {0}")]
NotBool(QueryValue),
#[error("invoke error {0}")]
InvokeError(String),
}
fn must_number(val: &QueryValue) -> Result<f64, EvalError> {
match val {
QueryValue::Number(n) => Ok(*n),
_ => Err(EvalError::NotNumber(val.clone())),
}
}
struct EvalContext<'a> {
agent: &'a mut AgentCommandManager,
}
impl<'a> EvalContext<'a> {
fn new(agent: &'a mut AgentCommandManager) -> Self {
Self { agent }
}
async fn eval(&mut self, expr: &QueryConditionExpr) -> Result<bool, EvalError> {
match expr {
QueryConditionExpr::And { left, right } => {
let left = Box::pin(self.eval(left)).await?;
let right = Box::pin(self.eval(right)).await?;
Ok(left && right)
}
QueryConditionExpr::Or { left, right } => {
let left = Box::pin(self.eval(left)).await?;
let right = Box::pin(self.eval(right)).await?;
Ok(left || right)
}
QueryConditionExpr::Term(term) => match self.eval_term(term).await? {
QueryValue::Bool(b) => Ok(b),
val => Err(EvalError::NotBool(val.clone())),
},
}
}
async fn eval_term(&mut self, term: &QueryConditionTerm) -> Result<QueryValue, EvalError> {
match term {
QueryConditionTerm::Eq { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_eq(left, right)).await?)),
QueryConditionTerm::More { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_more(left, right)).await?)),
QueryConditionTerm::Less { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_less(left, right)).await?)),
QueryConditionTerm::Value(val) => Ok(val.clone()),
}
}
async fn eval_eq(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
let left = self.eval_term(left).await?;
let right = self.eval_term(right).await?;
if let QueryValue::FnField { func, field } = &left {
// ...
}
else {
Ok(left == right)
}
}
async fn eval_less(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
let left = must_number(&self.eval_term(left).await?)?;
let right = must_number(&self.eval_term(right).await?)?;
Ok(left < right)
}
async fn eval_more(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
let left = must_number(&self.eval_term(left).await?)?;
let right = must_number(&self.eval_term(right).await?)?;
Ok(left > right)
}
}
Всё довольно просто, но давайте реализуем выполнение функций, для чего делали интерфейс структуры асинхронным. Тут мы составляем команду, после чего отправляем её агенту и дожидаемся ответа. Как только ответ пришёл, проверяем соответствует ли хоть одна строчка ответа правому значению.
if let QueryValue::FnField { func, field } = &left {
let cmd = ExcavatorCommand {
uid: SmallUid::new().to_string(),
name: func.name.clone(),
args: func
.args
.iter()
.map(|a| ExcavatorCommandArg {
key: a.name.clone(),
value: a.value.clone(),
})
.collect(),
};
Ok(self
.agent
.send(cmd)
.await
.ok_or(EvalError::InvokeError(format!("{}.{}", func.name, field)))?
.results
.iter()
.any(|r| {
r.key == *field
&& (match &right {
QueryValue::FnField { .. } => false,
QueryValue::Identifier(i) => r.value == *i,
QueryValue::String(s) => r.value == *s,
QueryValue::Number(n) => {
if let Ok(val) = r.value.parse::<f64>() {
val == *n
}
else {
false
}
}
QueryValue::Bool(b) => *b,
QueryValue::Null => r.value == "null",
})
}))
}
Теперь можем реализовать ListBy. Предлагаю начать с ветки без условия. Она довольно простая, ведь мы просто итерируемся по всему пулу агентов и в зависимости от вхожего значения field подставляем нужное значение, после чего мапим результат к нужному виду.
QueryExpr::ListBy { field, condition } => {
let mut map = self.1.write().await;
if let Some(condition) = condition {
todo!()
}
else {
let rows = stream::iter(map.keys())
.filter_map(|k| {
let field = field.clone();
async move {
let id = k.lock().await;
Some(match field.as_str() {
"name" => id.name.clone(),
"addr" => id.addr.to_string(),
_ => return None,
})
}
})
.map(|data| TableRow {
cols: vec![TableCol { key: field.clone(), data }],
})
.collect::<Vec<_>>()
.await;
let response = Response::new(QueryResponse { rows });
Ok(response)
}
}
С условием делаем похожее, но в этот раз инициализируем наш вычислительный контекст, ловим результат и уже в зависимости от поля field подставляем результат.
if let Some(condition) = condition {
let condition = Arc::new(condition);
let rows = stream::iter(map.iter_mut())
.filter_map({
|(id, agent)| {
let condition = condition.clone();
let field = field.clone();
async move {
let result = match EvalContext::new(agent).eval(&condition).await {
Ok(res) => res,
Err(err) => {
LOGS_MANAGER.send_log(format!("failed to evaluate condition: '{err}'")).await;
return None;
}
};
if result {
let id = id.lock().await;
Some(match field.as_str() {
"name" => id.name.clone(),
"addr" => id.addr.to_string(),
_ => return None,
})
}
else {
None
}
}
}
})
.map({
let field = field.clone();
move |r| TableRow {
cols: vec![TableCol { key: field.clone(), data: r }],
}
})
.collect::<Vec<_>>()
.await;
let response = Response::new(QueryResponse { rows });
Ok(response)
}
Реализуем теперь SelectFrom. Для начала получаем агента. Если не получилось его найти по хэшу, то итерируемся по HashMap и ищем значение, которое начинается на полученную строку. Подглядел это у git, где достаточно лишь часть хэша коммита ввести, ведь у нас довольно длинное имя агента получается.
QueryExpr::SelectFrom { from, select } => {
let map = self.1.read().await;
let id = AgentId::new(&from, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)));
let mut agent = if let Some(agent) = map.get(&id).cloned() {
agent
}
else {
stream::iter(map.iter())
.filter_map(|(k, m)| Box::pin(async { if k.lock().await.name.starts_with(&from) { Some(m.clone()) } else { None } }))
.next()
.await
.ok_or(Status::not_found("failed to get agent id"))?
};
// ...
}
Агента мы получили, но давайте теперь реализуем выполнение команд.
QueryExpr::SelectFrom { from, select } => {
// ...
let mut rows = Vec::new();
for crate::query::InvokeFunc { name, args } in select {
let uid = SmallUid::new().to_string();
let args = args.into_iter().map(|a| ExcavatorCommandArg { key: a.name, value: a.value }).collect();
LOGS_MANAGER.send_log(format!("executing command: '{name}' with args: '{args:?}'")).await;
let result = agent
.send(ExcavatorCommand {
uid,
name: name.clone(),
args,
})
.await;
// преобразуем результат в табличный вид
if let Some(response) = result {
LOGS_MANAGER
.send_log(format!("'{name}' command is done with status '{}'", response.code))
.await;
let cols = response
.results
.into_iter()
.fold(HashMap::new(), |mut acc: HashMap<String, Vec<String>>, res| {
if let Some(col) = acc.get_mut(&res.key) {
col.push(res.value);
}
else {
acc.insert(res.key, vec![res.value]);
}
acc
});
if rows.is_empty() {
for (col_name, values) in cols {
for row in values {
rows.push(TableRow {
cols: vec![TableCol {
key: col_name.clone(),
data: row,
}],
});
}
}
}
else {
for (col_name, values) in &cols {
for (i, r) in values.iter().enumerate() {
let row = rows.get_mut(i);
if let Some(row) = row {
row.cols.push(TableCol {
key: col_name.clone(),
data: r.clone(),
});
}
else {
let cols_len = rows.first().map(|r| r.cols.len()).unwrap_or(0);
let cols = (0..cols_len)
.filter_map(|i| {
cols.iter().nth(i).map(|(c, _)| TableCol {
key: c.clone(),
data: Default::default(),
})
})
.chain(vec![TableCol {
key: col_name.clone(),
data: r.clone(),
}])
.collect();
rows.push(TableRow { cols });
}
}
}
}
}
else {
return Err(Status::cancelled("failed to execute command".to_string()));
}
}
let response = Response::new(QueryResponse { rows });
Ok(response)
}
Сервисы готовы, но сам сервер пока ничего не слушает и даже не знает ничего об этих сервисах, предлагаю это исправить. Теперь наш сервер слушает 1299 порт и готов отвечать на запросы.
#[tokio::main]
async fn main() {
// переложим бремя парсинга адреса на `SocketAddr`
let addr = format!("{}:{}", "0.0.0.0", 1299)
.parse()
.expect("could not parse address");
let client_map = AgentsMap::default();
LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await;
Server::builder()
.add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
.add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
.serve(addr)
.await
.expect("failed to serve")
}
Агент
Сервер мы реализовали, но вот того, кто будет выполнять команды, нету. Предлагаю это исправить, создав проект agent.
cargo new agent
Сразу в зависимости добавим необходимые крейты.
[package]
name = "agent"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "fs", "macros"] }
tokio-stream = "0.1"
tonic = "0.14"
prost = "0.14"
tonic-prost = "0.14"
async-trait = "0.1"
knus = "3.3"
[build-dependencies]
build-common = { path = "../build-common" }
Создаём agent\build.rs со следующим содержимым. Оно немного отличается от предыдущего скрипта, так как будем ещё копировать файл agent.kdl, конфиг агента, в выходную директорию.
use std::{env, fs, path::PathBuf};
const CONFIG_FILE: &str = "agent.kdl";
fn main() {
let proto_folder = build_common::get_proto_folder();
build_common::compile_protos_folder(&proto_folder).unwrap();
println!("cargo:rerun-if-changed={CONFIG_FILE}");
let target_folder = PathBuf::from_iter([
env::var("CARGO_MANIFEST_DIR").unwrap(),
"..".to_owned(),
"target".to_owned(),
env::var("PROFILE").unwrap(),
]);
let config_path = target_folder.join(CONFIG_FILE);
fs::copy(CONFIG_FILE, &config_path).unwrap();
}
Предлагаю создать этот конфиг-файл agent\agent.kdl со следующим содержимым. Да, небогато, но это пока что.
server "127.0.0.1"
Но, чтобы распарсить этот конфиг, нужна модель для knus. Предлагаю описать её, создав модуль agent\src\config.rs.
#[derive(Decode)]
pub struct Server {
#[knus(argument)]
pub address: String,
#[knus(argument)]
pub port: Option<u16>,
}
#[derive(Decode)]
pub struct Config {
#[knus(child)]
pub server: Server,
}
Создаём модуль agent\src\proto.rs, где подключаем сгенерированный код gRPC.
tonic::include_proto!("main");
Но что это за агент, если он ничего не умеет. Предлагаю это решить тем, что создадим трейт Module в agent\src\modules\mod.rs, чтобы добиться модульности агента. Хотя я бы предпочёл заводить модули на сервере на каком-нибудь скриптовом языке или байт-коде и рассылать их агентам в случае необходимости, но на данный момент это слишком заморочено.
#[async_trait::async_trait]
pub trait Module {
fn name(&self) -> &'static str;
fn description(&self) -> &'static str;
fn args(&self) -> Vec<ModuleArg>;
async fn execute(&self, args: Args) -> ExecuteResult;
}
pub struct ExecuteResult {
pub code: i64,
pub output: Vec<String>,
}
pub struct ModuleArg {
pub name: &'static str,
pub description: &'static str,
pub required: bool,
pub default: Option<String>,
}
Модули нужно где-то хранить, поэтому предлагаю создать структуру ModulesRegistry. Так как у нас HashMap внутри RwLock и чтобы каждый раз не вызывать await после каждого вызова метода, то мы залочим RwLock и будем передавать ссылку на его внутренний HashMap.
type ModulesMap = HashMap<&'static str, Arc<dyn Module + Send + Sync>>;
pub struct ModulesRegistryBuilder<'a>(&'a mut ModulesMap);
impl ModulesRegistryBuilder<'_> {
pub fn register(&mut self, module: impl Module + 'static + Send + Sync) -> &mut Self {
self.0.insert(module.name(), Arc::new(module));
self
}
}
#[derive(Default, Clone)]
pub struct ModulesRegistry(Arc<RwLock<ModulesMap>>);
impl ModulesRegistry {
pub async fn build<F>(&self, builder: F)
where
F: FnOnce(&mut ModulesRegistryBuilder, &Self),
{
let mut s = self.0.write().await;
builder(&mut ModulesRegistryBuilder(&mut s), self);
}
pub async fn get(&self, name: &str) -> Option<Arc<dyn Module + Send + Sync>> {
self.0.read().await.get(name).cloned()
}
pub async fn get_all(&self) -> Vec<&'static str> {
self.0.read().await.values().map(|v| v.name()).collect()
}
}
Давайте создадим пару модулей, они будут очень простые.
// agent\src\modules\info.rs
pub struct GetInfoModule(ModulesRegistry);
unsafe impl Send for GetInfoModule {}
impl GetInfoModule {
pub fn new(registry: ModulesRegistry) -> Self {
Self(registry)
}
}
#[async_trait::async_trait]
impl Module for GetInfoModule {
fn name(&self) -> &'static str {
"info"
}
fn description(&self) -> &'static str {
"get information about the agent modules"
}
fn args(&self) -> Vec<ModuleArg> {
vec![
ModuleArg {
name: "type",
description: "type of information to retrieve (modules, etc.)",
required: true,
default: None,
},
ModuleArg {
name: "name",
description: "name of the module to retrieve information about",
required: false,
default: None,
},
]
}
async fn execute(&self, args: Args) -> ExecuteResult {
let ty = args.get("type");
if let Some(ty) = ty {
match ty.as_str() {
"modules" => {
let modules = self.0.get_all().await.into_iter().map(|s| s.to_owned()).collect();
ExecuteResult { code: 0, output: modules }
}
_ => ExecuteResult {
code: 1,
output: vec!["unknown type".to_string()],
},
}
}
else {
ExecuteResult {
code: 1,
output: vec!["missing type parameter".to_string()],
}
}
}
}
// agent\src\modules\version.rs
pub struct AgentVersionModule;
unsafe impl Send for AgentVersionModule {}
#[async_trait::async_trait]
impl Module for AgentVersionModule {
fn name(&self) -> &'static str {
"version"
}
fn description(&self) -> &'static str {
"get agent version"
}
fn args(&self) -> Vec<ModuleArg> {
vec![]
}
async fn execute(&self, _: Args) -> ExecuteResult {
ExecuteResult {
code: 0,
output: vec![env!("CARGO_PKG_VERSION").to_string()],
}
}
}
Теперь можем инициализировать и зарегистрировать модули, после чего считать конфиг с адресом сервера.
#[tokio::main]
async fn main() {
let registry = ModulesRegistry::default();
registry.build(|builder, registry| {
builder
.register(AgentVersionModule)
.register(GetInfoModule::new(registry.clone()));
}).await;
let config_path = path::use_config_path();
let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
let config: Config = knus::parse("config", &config_content).expect("failed to parse config");
// ...
}
К счастью, клиент не нужно реализовывать, кодогенерация всё сделала за нас и наш клиент готов, достаточно указать адрес сервера. И сразу посылаем heartbeat-сообщение и получаем поток.
#[tokio::main]
async fn main() {
// ...
let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(1299));
println!("listening server at {}", addr);
let mut client = ExcavatorClient::connect(addr).await.expect("failed to connect to excavator");
let (tx, rx) = tokio::sync::mpsc::channel(128);
tx.send(ExcavatorMessage {
request: Some(Request::Heartbeat(ExcavatorHeartbeat::default())),
})
.await
.expect("failed to send heartbeat");
let mut stream = client
.run_excavator(ReceiverStream::new(rx))
.await
.expect("failed to run excavator")
.into_inner();
// ...
}
Теперь можем начать слушать поток и обрабатывать входящие команды. Чтобы не блокировать поток и не пропускать другие команды от сервера, запускаем отдельный поток.
while let Some(msg) = stream.next().await {
let Ok(msg) = msg
else {
continue;
};
let Some(command) = msg.request
else {
continue;
};
match command {
Request::Command(cmd) => {
// парсим аргументы
let args = Args::new(cmd.args);
// получаем модуль
let module = registry.get(&cmd.name).await;
if let Some(module) = module {
let tx = tx.clone();
// спавним поток под задачу
tokio::spawn(async move {
// выполняем команду
let ExecuteResult { code, output } = module.execute(args).await;
let results = output
.into_iter()
.map(|output| MessageResult {
key: cmd.name.clone(),
value: output,
})
.collect();
// отправляем результат обратно серверу
tx.send(Message::new(cmd.uid, code, results).into())
.await
.expect("failed to send response");
});
}
else {
tx.send(
Message::new(
cmd.uid,
1,
vec![MessageResult {
key: "error".to_string(),
value: format!("module '{}' not found", cmd.name),
}],
)
.into(),
)
.await
.expect("failed to send response");
}
}
Request::Response(_) => {}
Request::Heartbeat(_) => {}
}
}
Клиент
Отлично, всё готово, осталось намалевать клиент. Он будет очень простым, поэтому будем использовать TUI для взаимодействия с пользователем.
Инициализируем проект.
cargo new client-tui
Добавляем в зависимости нужные крейты.
[package]
name = "client-tui"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "macros"] }
# для кодогенерации
tonic = "0.14"
prost = "0.14"
tonic-prost = "0.14"
# для парсинга аргументов
clap = { version = "4.6.1", features = ["derive"] }
# для TUI
ratatui = "0.30"
tui-input = "0.15"
[build-dependencies]
build-common = { path = "../build-common" }
И также создаём скрипт client-tui\build.rs с таким же содержимым, как у сервера, так как никаких конфигов у нас не будет.
fn main() {
let proto_folder = build_common::get_proto_folder();
build_common::compile_protos_folder(&proto_folder).unwrap();
}
И включаем сгенерированный код в модуле client-tui\src\proto.rs.
tonic::include_proto!("client");
Если речь зашла о том, что конфигов у нас не будет, будем использовать аргументы. Для того, чтобы распарсить их, будем использовать clap. Давайте для него создадим простую модель.
#[derive(Subcommand)]
pub enum Command {
#[command(about = "Connect to a Selecit server")]
Connect { host: String, port: Option<u16> },
}
#[derive(Parser)]
#[command(about = "Selecit client for connecting to Selecit servers")]
pub struct Args {
#[command(subcommand)]
pub command: Command,
}
Наш TUI будет очень простым, поэтому не буду особо подробно останавливаться на этом. Что тут происходит можете посмотреть в документации ratatui и tui-input. К сожалению, библиотека не поддерживает асинхронность, но предлагает для этого акторную модель, но мне было слишком лень это делать, проще воткнуть блокировку посредством block_in_place.
#[derive(Default, Clone, Copy)]
enum Tab {
#[default]
Query = 0,
Logs = 1,
}
impl Tab {
fn next(self) -> Option<Self> {
match self {
Tab::Query => Some(Tab::Logs),
Tab::Logs => None,
}
}
fn back(self) -> Option<Self> {
match self {
Tab::Query => None,
Tab::Logs => Some(Tab::Query),
}
}
}
#[derive(Default, PartialEq)]
enum AppMode {
#[default]
Input,
Normal,
}
#[derive(Default)]
pub struct App {
current_tab: Tab,
app_mode: AppMode,
addr: String,
input: Input,
client: Option<QueryClient<Channel>>,
logs_cache: Arc<RwLock<Vec<String>>>,
query_cache: Option<Vec<TableRow>>,
}
impl App {
pub async fn new(addr: String, mut client: QueryClient<Channel>) -> Self {
let mut stream = client.logs(LogsQueryRequest {}).await.expect("unable to load server logs").into_inner();
let logs = Arc::new(RwLock::new(Vec::new()));
tokio::spawn({
let logs = logs.clone();
async move {
loop {
while let Some(entry) = stream.next().await {
match entry {
Ok(entry) => {
for entry in entry.log {
logs.write().await.push(entry);
}
}
Err(err) => {
logs.write().await.push(format!("unable to read logs because: {}", err));
}
}
}
}
}
});
Self {
addr,
client: Some(client),
logs_cache: logs,
..Default::default()
}
}
pub fn run(mut self, terminal: &mut DefaultTerminal) -> io::Result<()> {
loop {
terminal.draw(|frame| self.render(frame))?;
let event = event::read()?;
if let Event::Key(key) = event {
match key.code {
KeyCode::Char('q') if key.modifiers.contains(KeyModifiers::CONTROL) => return Ok(()),
KeyCode::Enter if self.app_mode == AppMode::Input && !self.input.value().is_empty() => {
if let Some(ref mut client) = self.client {
let command = self.input.value_and_reset();
let result = block_in_place(|| Handle::current().block_on(async move { client.query(QueryRequest { command }).await }));
match result {
Ok(response) => {
let rows = response.into_inner().rows;
self.query_cache = Some(rows);
}
Err(err) => block_in_place(|| {
Handle::current().block_on(async {
self.logs_cache
.write()
.await
.push(format!("an error occurred while trying to execute the command: {err}"))
});
}),
}
}
}
KeyCode::Right if let Some(tab) = self.current_tab.next() => {
self.stop_editing();
self.current_tab = tab;
}
KeyCode::Left if let Some(tab) = self.current_tab.back() => {
self.start_editing();
self.current_tab = tab;
}
_ if self.app_mode == AppMode::Input => {
self.input.handle_event(&event);
}
_ => {}
}
}
}
}
fn start_editing(&mut self) {
self.app_mode = AppMode::Input;
}
fn stop_editing(&mut self) {
self.app_mode = AppMode::Normal;
}
fn render(&mut self, frame: &mut Frame) {
let [title_area, tabs_area, content_area] =
Layout::vertical([Constraint::Length(1), Constraint::Length(1), Constraint::Fill(1)]).areas(frame.area());
let title = Line::from(format!("Selecit Client - connected to {}", self.addr)).centered();
frame.render_widget(title, title_area);
let tabs = Tabs::new(vec!["Query", "Logs"]).select(self.current_tab as usize);
frame.render_widget(tabs, tabs_area);
match self.current_tab {
Tab::Query => self.render_query(frame, content_area),
Tab::Logs => self.render_logs(frame, content_area),
}
}
fn render_query(&self, frame: &mut Frame, content_area: Rect) {
let[table_area, input_area]= Layout::vertical([Constraint::Fill(8), Constraint::Fill(2)]).areas(content_area);
let (rows, widths) = if let Some(rows) = self.query_cache.as_ref() {
// получаем ссылку на первую строчку, чтобы составить из неё заголовок таблицы
if let Some(first) = rows.first() {
let widths = first.cols.iter().map(|_| Constraint::Percentage(30)).collect();
let cols = first.cols.iter().map(|c| c.key.clone()).collect::<Vec<_>>();
let body = rows
.into_iter()
.map(|r| Row::new(r.cols.iter().map(|c| c.data.clone()).collect::<Vec<_>>()))
.collect::<Vec<_>>();
let rows = vec![Row::new(cols)].into_iter().chain(body).collect();
(rows, widths)
}
else {
(vec![], vec![])
}
}
else {
(vec![], vec![])
};
let table = Table::new(rows, widths).block(Block::bordered());
frame.render_widget(table, table_area);
let input = Paragraph::new(self.input.value()).block(Block::bordered().title("Command"));
frame.render_widget(input, input_area);
}
fn render_logs(&self, frame: &mut Frame, content_area: Rect) {
let logs = block_in_place(|| Handle::current().block_on(async { self.logs_cache.read().await }));
let list = List::new(logs.iter().cloned()).block(Block::bordered());
frame.render_widget(list, content_area);
}
}
Отлично, теперь можем запустить проект.
cargo run -p server
cargo run -p agent
cargo run -p client-tui -- connect 127.0.0.1
И пробуем получить список агентов с помощью команды list by name.
pic
Или попробуем отправить команду агенту и получить результат.
pic
Можем также с помощью стрелочек переключиться на другую вкладку и посмотреть логи.
pic
Безопасность
TLS
Всё отлично работает, но есть одно но: весь трафик незашифрован, а это значит, что любой может прочитать наши сообщения, как пример:
pic
Чтобы это исправить, давайте подключим TLS, чтобы гонять трафик по HTTPS. Для этого я сначала сгенерирую сертификаты в директории certs с помощью скрипта certs\generate.ps1.
$SUBJ = "/C=RU/ST=Test/L=Test/O=Selecit/OU=/CN=localhost/emailAddress="
# CA
openssl req -x509 -newkey rsa:4096 -days 36500 -keyout ca-key.pem -out ca-cert.pem -nodes -subj $SUBJ
# CSR
openssl req -newkey rsa:4096 -keyout server-key.pem -out server-req.pem -subj $SUBJ -nodes
# server cert
openssl x509 -req -in server-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile localhost.ext
Чтобы корректно разрешать адрес сервера, создадим файл localhost.ext со следующим содержимым:
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost
IP.1 = 127.0.0.1
После корректного выполнения скрипта должна появиться пачка сертификатов, как на скриншоте:
pic
Так как сразу во всех проектах будет работа с файлами, то для удобства создам ещё один проект common, в котором будут функции для получения путей до каких-либо директорий.
cargo new --lib common
Создам модуль common\src\path.rs со следующим содержимым:
use std::{env, path::PathBuf};
const CERTS_FOLDER: &str = "certs";
pub fn use_app_folder() -> PathBuf {
env::current_exe()
.expect("failed to get current executable")
.parent()
.expect("failed to get current executable parent")
.to_path_buf()
}
#[cfg(debug_assertions)]
pub fn use_project_folder() -> PathBuf {
PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("failed to get CARGO_MANIFEST_DIR"))
}
#[cfg(debug_assertions)]
pub fn use_workspace_folder() -> PathBuf {
use_project_folder().join("..")
}
pub fn use_certs_folder() -> PathBuf {
if cfg!(debug_assertions) {
use_workspace_folder().join(CERTS_FOLDER)
}
else {
use_app_folder().join(CERTS_FOLDER)
}
}
А в файле common\src\lib.rs добавлю константу SERVER_PORT. Всё, это всё назначение этого проекта.
pub mod path;
pub const SERVER_PORT: u16 = 1299;
Теперь можем перейти к проекту сервера и создать модуль server\src\config.rs, в котором опишем модель для конфига.
use std::path::PathBuf;
use crate::path;
use knus::Decode;
#[derive(Decode)]
pub struct Server {
pub address: String,
pub port: u16,
}
#[derive(Decode, Clone)]
pub struct Auth {
#[knus(child, unwrap(argument))]
pub token: String,
}
#[derive(Decode, Default)]
pub struct Path {
#[knus(argument)]
pub path: PathBuf
}
#[derive(Decode)]
pub struct Certificates {
#[knus(child)]
pub server_cert: Path,
#[knus(child)]
pub server_key: Path,
}
#[derive(Decode, Default)]
pub struct Config {
#[knus(child)]
pub server: Option<Server>,
#[knus(child)]
pub auth: Option<Auth>,
#[knus(child)]
pub certificates: Option<Certificates>,
}
pub async fn use_config() -> Config {
let config_path = path::use_config_path();
if tokio::fs::metadata(&config_path).await.is_ok() {
let config_content = tokio::fs::read_to_string(config_path).await.expect("could not read config file");
knus::parse("config", &config_content).expect("could not parse config file")
}
else {
Config::default()
}
}
И создадим сам конфиг по пути server\server.kdl и добавим пути до файлов сертификатов. Можно написать относительный путь, так как далее будем разрешать это.
certificates {
server-cert "server-cert.pem"
server-key "server-key.pem"
}
Бежим в server\src\main.rs и немного переделываем функцию main. Тут всё довольно просто, не вижу особого смысла разглагольствовать, поэтому предлагаю перейти далее.
#[tokio::main]
async fn main() {
let config = use_config().await;
let addr = if let Some(server) = config.server {
format!("{}:{}", server.address, server.port).parse().expect("could not parse address")
}
else {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), SERVER_PORT)
};
let client_map = AgentsMap::default();
LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await;
let mut builder = if let Some(certs) = config.certificates {
let cert_path = {
let path = certs.server_cert.path;
if path.is_absolute() { path } else { use_certs_folder().join(path) }
};
let key_path = {
let path = certs.server_key.path;
if path.is_absolute() { path } else { use_certs_folder().join(path) }
};
let cert = tokio::fs::read_to_string(cert_path).await.expect("could not read certificate");
let key = tokio::fs::read_to_string(key_path).await.expect("could not read key");
let identity = Identity::from_pem(cert, key);
let tls_config = ServerTlsConfig::new().identity(identity);
match Server::builder().tls_config(tls_config) {
Ok(b) => {
LOGS_MANAGER.send_log("server using tls".to_owned()).await;
b
}
Err(err) => {
LOGS_MANAGER.send_log(format!("failed to set tls config: {}", err)).await;
Server::builder()
}
}
}
else {
Server::builder()
};
builder
.add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
.add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
.serve(addr)
.await
.expect("failed to serve")
}
Перейдём к агенту и сделаем примерно то же самое. Изменим немного модель конфига у агента в файле agent\src\config.rs.
use knus::Decode;
#[derive(Decode)]
pub struct Server {
#[knus(argument)]
pub address: String,
#[knus(argument)]
pub port: Option<u16>,
}
#[derive(Decode)]
pub struct Certificate {
#[knus(argument)]
pub ca_cert: String,
}
#[derive(Decode, Default)]
pub struct Auth {
#[knus(child, unwrap(argument))]
pub token: String,
}
#[derive(Decode)]
pub struct Config {
#[knus(child)]
pub server: Server,
#[knus(child)]
pub auth: Option<Auth>,
#[knus(child)]
pub certificate: Option<Certificate>,
}
Теперь можем добавить настройки TLS клиенту в main функции.
#[tokio::main]
async fn main() {
// ...
let config_path = path::use_config_path();
let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
let config: Config = knus::parse("config", &config_content).expect("failed to parse config");
let mut client = if let Some(cert) = config.certificate {
let cert = {
let path = PathBuf::from(cert.ca_cert);
if path.is_absolute() { path } else { use_certs_folder().join(path) }
};
let cert_content = tokio::fs::read_to_string(cert).await.expect("unable to read certificate file");
let cert = Certificate::from_pem(cert_content);
let addr = format!("https://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT));
let tls = ClientTlsConfig::new().ca_certificate(cert);
let channel = Channel::from_shared(addr.clone())
.unwrap()
.tls_config(tls)
.unwrap()
.connect()
.await
.expect("TLS connection failed");
println!("listening server at {}", addr);
ExcavatorClient::new(channel)
}
else {
let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT));
println!("listening server at {}", addr);
ExcavatorClient::connect(addr.clone()).await.expect("connection to server failed")
};
// ...
}
Теперь наш трафик успешно шифруется, в чём можно убедиться, посмотрев в WireShark, что у нас теперь гоняются TLS пакеты.
pic
Аутентификация
Трафик шифруется, но подключиться может всё равно каждый. Давайте это исправим, добавив аутентификацию пользователей.
Тут есть два пути: классическая аутентификация по каком-нибудь паролю или mTLS (пример из репозитория tonic). Тут выбирайте сами, что вам удобнее или проще, но я для примера буду использовать простой токен.
Для этого добавим слой в server\src\main.rs, который будет обрабатывать все входящие запросы и проверять, есть ли в заголовках поле authorization с нужным нам токеном.
#[tokio::main]
async fn main() {
let config = use_config().await;
// ...
if config.auth.is_some() {
LOGS_MANAGER.send_log("server using token auth".to_owned()).await;
}
let mut auth = config.auth;
builder
.layer(InterceptorLayer::new(move |req: Request<()>| {
if let Some(auth) = auth.as_mut() {
if req
.metadata()
.get("authorization")
.map(|header| header.to_str())
.is_some_and(|s| s.is_ok_and(|s| s == auth.token))
{
Ok(req)
}
else {
Err(Status::unauthenticated("unauthorized"))
}
}
else {
Ok(req)
}
}))
.add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
.add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
.serve(addr)
.await
.expect("failed to serve")
}
Теперь можем модифицировать модель для аргументов в client-tui\src\args.
use clap::{Parser, Subcommand};
#[derive(Subcommand)]
pub enum Command {
#[command(about = "Connect to a Selecit server")]
Connect {
/// Server host
host: String,
/// Server port
port: Option<u16>,
/// Path to the CA certificate
#[arg(long)]
ca: Option<String>,
/// Authentication token
#[arg(long, short)]
token: Option<String>,
},
}
#[derive(Parser)]
#[command(about = "Selecit client for connecting to Selecit servers")]
pub struct Args {
#[command(subcommand)]
pub command: Command,
}
И в client-tui\srv\main.rs создадим замыкание, которое будет вставлять в заголовки запроса токен из аргументов. После чего закидываем это замыкание в QueryClient::with_interceptor. Мне пришлось упаковать замыкание в Box, так как client дальше передаётся в App, из-за чего компилятор жаловался на несоответствие типов.
pub type InterceptFn = Box<dyn FnMut(Request<()>) -> Result<Request<()>, Status>>;
#[tokio::main]
async fn main() {
let args = Args::parse();
match args.command {
Command::Connect {
host,
port,
ca: ca_cert,
mut token,
} => {
let interceptor: InterceptFn = Box::new(move |mut req: tonic::Request<()>| {
if let Some(auth) = token.as_mut() {
req.metadata_mut().insert(AUTHORIZATION, MetadataValue::from_str(auth.as_str()).unwrap());
}
Ok(req)
});
let (client, server_addr) = if let Some(ca_cert) = ca_cert {
// ...
(QueryClient::with_interceptor(channel, interceptor), server_addr)
}
else {
// ...
(QueryClient::with_interceptor(channel, interceptor), server_addr)
};
let mut terminal = ratatui::init();
App::new(server_addr, client).await.run(&mut terminal).expect("failed to run app");
ratatui::restore();
}
}
}
Можем теперь проделать то же самое и с агентом.
#[tokio::main]
async fn main() {
// ...
let config_path = path::use_config_path();
let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
let mut config: Config = knus::parse("config", &config_content).expect("failed to parse config");
let interceptor = move |mut req: tonic::Request<()>| {
if let Some(auth) = config.auth.as_mut() {
req.metadata_mut()
.insert(AUTHORIZATION, MetadataValue::from_str(auth.token.as_str()).unwrap());
}
Ok(req)
};
let mut client = if let Some(cert) = config.certificate {
// ...
ExcavatorClient::with_interceptor(channel, interceptor)
}
else {
// ...
ExcavatorClient::with_interceptor(channel, interceptor)
};
// ...
}
Итого
Мы реализовали систему распределённого управления, которая позволяет нам получать список агентов, подходящие под какие-то нужные нам критерии, и даже выполнять команды посредством этих агентов на удалённых хостах.
Наша система далеко не идеальна, везде используются except да unwrap, нет переподключения агентов, язык запросов пока что в зачаточном состоянии, нет никакого кэширования, нужно прикрутить БД для хранения какого-то состояния системы, и куча других проблем, но, доработав слабые места, думаю, система жизнеспособна.
Если кому нужно посмотреть полный код проекта, то вот его репозиторий.
Из похожего я нашёл bssh, который стремится или даже уже достиг что-то похожее, к чему стремился я в начале статьи, но он выбрал другой путь работы поверх SSH.
-Источник
 
Loading...
Error