Merge pull request #71 from agnosticeng/feat/auto-refresh
feat: Auto-refresh datasets on CREATE TABLE
This commit is contained in:
26
src/lib/olap-engine/EventListener.ts
Normal file
26
src/lib/olap-engine/EventListener.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
type Callback = (...param: any[]) => void;
|
||||
|
||||
type OptionalRecord<K extends keyof any, T> = { [P in K]?: T };
|
||||
|
||||
export interface IListener<Events extends string> {
|
||||
on(event: Events, fn: Callback): () => any;
|
||||
}
|
||||
|
||||
export abstract class InternalEventEmitter<Events extends string> implements IListener<Events> {
|
||||
#listeners: OptionalRecord<Events, Set<Callback>> = {};
|
||||
|
||||
on(event: Events, fn: Callback) {
|
||||
this.#listeners[event] ??= new Set<Callback>();
|
||||
|
||||
this.#listeners[event].add(fn);
|
||||
return () => this.#listeners[event]?.delete(fn);
|
||||
}
|
||||
|
||||
protected emit(event: Events, ...param: any[]) {
|
||||
if (!this.#listeners[event]?.size) return;
|
||||
|
||||
queueMicrotask(() => {
|
||||
if (this.#listeners[event]) for (const fn of this.#listeners[event]) fn(...param);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
type Callback = (param: any) => void;
|
||||
|
||||
export interface ILogger {
|
||||
on(level: 'error', fn: Callback): () => void;
|
||||
log(level: 'error', param: any): void;
|
||||
}
|
||||
|
||||
export abstract class Logger implements ILogger {
|
||||
#listeners: { [key: string]: Set<Callback> } = {};
|
||||
|
||||
on(logEvent: 'error', fn: Callback) {
|
||||
this.#listeners[logEvent] ??= new Set<Callback>();
|
||||
|
||||
this.#listeners[logEvent].add(fn);
|
||||
return () => this.#listeners[logEvent].delete(fn);
|
||||
}
|
||||
|
||||
log(level: 'error', param: any) {
|
||||
if (!this.#listeners[level]?.size) return;
|
||||
|
||||
queueMicrotask(() => {
|
||||
for (const fn of this.#listeners[level]) fn(param);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,37 +1,41 @@
|
||||
import { invoke } from '@tauri-apps/api/core';
|
||||
import type { OLAPEngine, OLAPResponse, Table } from './index';
|
||||
import { Logger } from './Logger';
|
||||
import { InternalEventEmitter } from './EventListener';
|
||||
import type { Events, OLAPEngine, OLAPResponse, Table } from './index';
|
||||
|
||||
import CLICKHOUSE_GET_SCHEMA from './queries/clickhouse_get_schema.sql?raw';
|
||||
import CLICKHOUSE_GET_UDFS from './queries/clickhouse_get_udfs.sql?raw';
|
||||
import CLICKHOUSE_INIT_DB from './queries/clickhouse_init_db.sql?raw';
|
||||
|
||||
export class CHDBEngine extends Logger implements OLAPEngine {
|
||||
export class CHDBEngine extends InternalEventEmitter<Events> implements OLAPEngine {
|
||||
async init() {
|
||||
await this.exec(CLICKHOUSE_INIT_DB);
|
||||
}
|
||||
|
||||
async exec(query: string) {
|
||||
async exec(query: string, _emit = true) {
|
||||
try {
|
||||
const r: string = await invoke('query', { query });
|
||||
if (!r) return;
|
||||
|
||||
return JSON.parse(r) as OLAPResponse;
|
||||
let data: OLAPResponse | undefined;
|
||||
if (r) data = JSON.parse(r) as OLAPResponse;
|
||||
|
||||
if (_emit) this.emit('success', query, data);
|
||||
|
||||
return data;
|
||||
} catch (e) {
|
||||
if (typeof e === 'string') e = new Error(e);
|
||||
console.error(e);
|
||||
this.log('error', e);
|
||||
if (_emit) this.emit('error', e);
|
||||
}
|
||||
}
|
||||
|
||||
async getSchema() {
|
||||
const response = await this.exec(CLICKHOUSE_GET_SCHEMA);
|
||||
const response = await this.exec(CLICKHOUSE_GET_SCHEMA, false);
|
||||
if (!response) return [];
|
||||
return response.data as Table[];
|
||||
}
|
||||
|
||||
async getUDFs() {
|
||||
const response = await this.exec(CLICKHOUSE_GET_UDFS);
|
||||
const response = await this.exec(CLICKHOUSE_GET_UDFS, false);
|
||||
if (!response) return [];
|
||||
|
||||
return response.data.map((row) => row.name as string);
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import type { OLAPEngine, OLAPResponse, Table } from './index';
|
||||
import { Logger } from './Logger';
|
||||
import { InternalEventEmitter } from './EventListener';
|
||||
import type { Events, OLAPEngine, OLAPResponse, Table } from './index';
|
||||
|
||||
import CLICKHOUSE_GET_SCHEMA from './queries/clickhouse_get_schema.sql?raw';
|
||||
import CLICKHOUSE_GET_UDFS from './queries/clickhouse_get_udfs.sql?raw';
|
||||
|
||||
export class RemoteEngine extends Logger implements OLAPEngine {
|
||||
export class RemoteEngine extends InternalEventEmitter<Events> implements OLAPEngine {
|
||||
async init() {}
|
||||
|
||||
async exec(query: string) {
|
||||
async exec(query: string, _emit = true) {
|
||||
try {
|
||||
const proxy =
|
||||
new URLSearchParams(window.location.search).get('proxy') ?? 'https://proxy.agx.app/query';
|
||||
@@ -22,21 +22,23 @@ export class RemoteEngine extends Logger implements OLAPEngine {
|
||||
|
||||
if ('exception' in data) throw new Error(data.exception);
|
||||
|
||||
if (_emit) this.emit('success', query, data);
|
||||
|
||||
return data;
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
this.log('error', e);
|
||||
if (_emit) this.emit('error', e);
|
||||
}
|
||||
}
|
||||
|
||||
async getSchema() {
|
||||
const response = await this.exec(CLICKHOUSE_GET_SCHEMA);
|
||||
const response = await this.exec(CLICKHOUSE_GET_SCHEMA, false);
|
||||
if (!response) return [];
|
||||
return response.data as Table[];
|
||||
}
|
||||
|
||||
async getUDFs() {
|
||||
const response = await this.exec(CLICKHOUSE_GET_UDFS);
|
||||
const response = await this.exec(CLICKHOUSE_GET_UDFS, false);
|
||||
if (!response) return [];
|
||||
|
||||
return response.data.map((row) => row.name as string);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { CHDBEngine } from './engine-chdb';
|
||||
import { RemoteEngine } from './engine-remote';
|
||||
import type { ILogger } from './Logger';
|
||||
import type { IListener } from './EventListener';
|
||||
|
||||
export type OLAPResponse = {
|
||||
meta: Array<ColumnDescriptor>;
|
||||
@@ -24,7 +24,9 @@ export interface Table {
|
||||
columns: ColumnDescriptor[];
|
||||
}
|
||||
|
||||
export interface OLAPEngine extends ILogger {
|
||||
export type Events = 'error' | 'success';
|
||||
|
||||
export interface OLAPEngine extends IListener<Events> {
|
||||
init(): Promise<void>;
|
||||
exec(query: string): Promise<OLAPResponse | undefined>;
|
||||
getSchema(): Promise<Table[]>;
|
||||
|
||||
@@ -3,6 +3,9 @@
|
||||
import { ContextMenuState } from '$lib/components/ContextMenu';
|
||||
import ContextMenu from '$lib/components/ContextMenu/ContextMenu.svelte';
|
||||
import Drawer from '$lib/components/Drawer.svelte';
|
||||
import { functions, keywords, operators, types } from '$lib/components/Editor/clickhouse';
|
||||
import Editor from '$lib/components/Editor/Editor.svelte';
|
||||
import { setupLanguage } from '$lib/components/Editor/language';
|
||||
import { SaveQueryModal } from '$lib/components/Queries';
|
||||
import Result from '$lib/components/Result.svelte';
|
||||
import SideBar from '$lib/components/SideBar.svelte';
|
||||
@@ -23,13 +26,10 @@
|
||||
import { historyRepository, type HistoryEntry } from '$lib/repositories/history';
|
||||
import { queryRepository, type Query } from '$lib/repositories/queries';
|
||||
import { tabRepository, type Tab } from '$lib/repositories/tabs';
|
||||
import Editor from '$lib/components/Editor/Editor.svelte';
|
||||
import { SplitPane } from '@rich_harris/svelte-split-pane';
|
||||
import debounce from 'p-debounce';
|
||||
import { format } from 'sql-formatter';
|
||||
import { tick, type ComponentProps } from 'svelte';
|
||||
import { setupLanguage } from '$lib/components/Editor/language';
|
||||
import { keywords, functions, operators, types } from '$lib/components/Editor/clickhouse';
|
||||
|
||||
let response = $state.raw<OLAPResponse>();
|
||||
let loading = $state(false);
|
||||
@@ -37,24 +37,15 @@
|
||||
|
||||
async function handleExec() {
|
||||
const query = currentTab.content;
|
||||
if (loading || !query) {
|
||||
return;
|
||||
}
|
||||
if (loading || !query) return;
|
||||
|
||||
loading = true;
|
||||
counter?.start();
|
||||
response = await engine.exec(query).finally(() => {
|
||||
try {
|
||||
response = await engine.exec(query);
|
||||
} finally {
|
||||
loading = false;
|
||||
counter?.stop();
|
||||
});
|
||||
|
||||
const last = await historyRepository.getLast();
|
||||
|
||||
if (response && last?.content !== query) await addHistoryEntry(query);
|
||||
|
||||
if (response) {
|
||||
bottomPanel.open = true;
|
||||
if (bottomPanelTab === 'logs') bottomPanelTab = 'data';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +73,23 @@
|
||||
$effect(() => void historyRepository.getAll().then((entries) => (history = entries)));
|
||||
$effect(() => void queryRepository.getAll().then((q) => (queries = q)));
|
||||
|
||||
engine.on('success', async (query: string) => {
|
||||
if (typeof query !== 'string') return;
|
||||
if (/(CREATE|DROP)/gi.test(query)) tables = await engine.getSchema();
|
||||
});
|
||||
|
||||
engine.on('success', async (query: string, response?: OLAPResponse) => {
|
||||
const last = await historyRepository.getLast();
|
||||
if (response && last?.content !== query) await addHistoryEntry(query);
|
||||
});
|
||||
|
||||
engine.on('success', (query: string, response?: OLAPResponse) => {
|
||||
if (response) {
|
||||
bottomPanel.open = true;
|
||||
if (bottomPanelTab === 'logs') bottomPanelTab = 'data';
|
||||
}
|
||||
});
|
||||
|
||||
async function addHistoryEntry(query: string) {
|
||||
try {
|
||||
const entry = await historyRepository.add(query);
|
||||
|
||||
Reference in New Issue
Block a user