Module khepri_machine

Khepri private low-level API.

Behaviours: ra_machine.

Description

Khepri private low-level API.

This module exposes the private "low-level" API to the Khepri database and state machine. Main functions correspond to Ra commands implemented by the state machine. All functions in khepri are built on top of this module.

This module is private. The documentation is still visible because it may help understand some implementation details. However, this module should never be called directly outside of Khepri.

State machine history

VersionWhat changed
0 Initial version
1
  • Added deduplication mechanism:
    • new command option protect_against_dups
    • new commands #dedup{} and #dedup_ack{}
    • new state field dedups
  • Added command #unregister_projections{}. The previous #unregister_projection{} is still supported for backward-compatibility but it is no longer created.
2
  • Changed the data structure for the reverse index used to track keep-while conditions to be a prefix tree (see khepri_prefix_tree).
  • Moved the expiration of dedups to the tick aux effect (see handle_aux/5). This also introduces a new command #drop_dedups{}.
  • Added the delete_reason to the list of properties that can be returned. It is returned by default if the effective machine version is 2 or more.

Data Types

api_behaviour()

api_behaviour() = dedup_protection | delete_reason_in_node_props | indirect_deletes_in_ret | uniform_write_ret | atom()

Name of a state machine API behaviour.

async_ret()

async_ret() = ok

aux_query()

aux_query() = {query, query_fun(), khepri:query_options()}

Query executed through the Ra aux mechanism.

command()

command() = #put{path = khepri_path:native_pattern(), payload = khepri_payload:payload(), options = khepri:tree_options() | khepri:put_options()} | #delete{path = khepri_path:native_pattern(), options = khepri:tree_options()} | #tx{'fun' = horus:horus_fun() | khepri_path:pattern(), args = list()} | #register_trigger{id = khepri:trigger_id(), event_filter = khepri_evf:event_filter(), sproc = khepri_path:native_path()} | #ack_triggered{triggered = [khepri_machine:triggered()]} | #register_projection{pattern = khepri_path:native_pattern(), projection = khepri_projection:projection()} | #unregister_projections{names = all | [khepri_projection:name()]} | #dedup{ref = reference(), expiry = integer(), command = khepri_machine:command()} | #dedup_ack{ref = reference()}

Commands specific to this Ra machine.

dedups_map()

dedups_map() = #{reference() => {any(), integer()}}

Map to handle command deduplication.

delayed_aux_query()

delayed_aux_query() = {aux_query(), gen_statem:from(), ra:query_condition() | none}

Aux query that is delayed until a condition is met.

machine_config()

machine_config() = #config{store_id = khepri:store_id(), member = ra:server_id(), snapshot_interval = non_neg_integer()}

Configuration record, holding read-only or rarely changing fields.

machine_init_args()

machine_init_args() = #{store_id := khepri:store_id(), member := ra:server_id(), snapshot_interval => non_neg_integer(), commands => [command()], atom() => any()}

Structure passed to init/1.

metrics()

metrics() = #{applied_command_count => non_neg_integer()}

Internal state machine metrics.

old_command()

old_command() = #unregister_projection{name = khepri_projection:name()}

Old commands that are still accepted by the Ra machine but never created.

Even though Khepri no longer creates these commands, they may still be present in existing Ra log files and thus be applied after an ugprade of Khepri.

We keep them supported for backward-compatibility.

projection_map()

projection_map() = #{khepri_projection:name() => khepri_path:pattern()}

A mapping between the names of projections and patterns to which each projection is registered.

projection_tree()

projection_tree() = khepri_pattern_tree:tree([khepri_projection:projection()])

A pattern tree that holds all registered projections in the machine's state.

props()

props() = #{payload_version := khepri:payload_version(), child_list_version := khepri:child_list_version()}

Properties attached to each node in the tree structure.

query_fun()

query_fun() = fun((state()) -> any()) | fun((ra_machine:command_meta_data(), state()) -> any())

Function representing a query and used process_query/3.

state()

state() = state_v1() | khepri_machine_v0:state()

State of this Ra state machine.

state_v1()

abstract datatype: state_v1()

State of this Ra state machine, version 1.

Note that this type is used also for machine version 2. Machine version 2 changes the type of an opaque member of the khepri_tree record and doesn't need any changes to the khepri_machine type. See the moduledoc of this module for more information about version 2.

triggered()

triggered() = #triggered{id = khepri:trigger_id(), event_filter = khepri_evf:event_filter(), sproc = horus:horus_fun(), props = map()}

triggers_map()

triggers_map() = #{khepri:trigger_id() => #{sproc := khepri_path:native_path(), event_filter := khepri_evf:event_filter()}}

Internal triggers map in the machine state.

tx_ret()

tx_ret() = khepri:ok(khepri_tx:tx_fun_result()) | khepri_tx:tx_abort() | no_return()

write_ret()

write_ret() = khepri:ok(khepri:node_props_map()) | khepri:error()

Function Index

fold/5Returns all tree nodes matching the given path pattern.
fence/2Blocks until all updates received by the cluster leader are applied locally.
delete/3Deletes all tree nodes matching the path pattern.
transaction/5Runs a transaction and returns the result.
handle_tx_exception/1
register_trigger/5Registers a trigger.
register_projection/4Registers a projection.
unregister_projections/3Removes the given projections from the store.
version/0Returns the state machine version.
which_module/1Returns the state machine module corresponding to the given version.
does_api_comply_with/2Indicates if a new behaviour of the transaction API is activated.

Function Details

fold/5

fold(StoreId, PathPattern, Fun, Acc, Options) -> Ret

StoreId: the name of the Ra cluster.
PathPattern: the path (or path pattern) to the nodes to get.
Options: query options such as favor.

returns: an {ok, NodePropsMap} tuple with a map with zero, one or more entries, or an {error, Reason} tuple.

Returns all tree nodes matching the given path pattern.

fence/2

fence(StoreId, Timeout) -> Ret

StoreId: the name of the Ra cluster
Timeout: the time limit after which the call returns with an error.

returns: ok or an {error, Reason} tuple.

Blocks until all updates received by the cluster leader are applied locally.

delete/3

delete(StoreId, PathPattern, Options) -> Ret

StoreId: the name of the Ra cluster.
PathPattern: the path (or path pattern) to the nodes to delete.
Options: command options such as the command type.

returns: in the case of a synchronous delete, an {ok, NodePropsMap} tuple with a map with zero, one or more entries, or an {error, Reason} tuple; in the case of an asynchronous put, always ok (the actual return value may be sent by a message if a correlation ID was specified).

Deletes all tree nodes matching the path pattern.

transaction/5

transaction(StoreId, FunOrPath, Args, ReadWrite, Options) -> Ret

StoreId: the name of the Ra cluster.
FunOrPath: an arbitrary anonymous function or a path pattern pointing to a stored procedure.
Args: a list of arguments to pass to FunOrPath.
ReadWrite: the read/write or read-only nature of the transaction.
Options: command options such as the command type.

returns: in the case of a synchronous transaction, {ok, Result} where Result is the return value of FunOrPath, or {error, Reason} if the anonymous function was aborted; in the case of an asynchronous transaction, always ok (the actual return value may be sent by a message if a correlation ID was specified).

Runs a transaction and returns the result.

handle_tx_exception/1

handle_tx_exception(X1) -> any()

register_trigger/5

register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options) -> Ret

StoreId: the name of the Ra cluster.
TriggerId: the name of the trigger.
EventFilter: the event filter used to associate an event with a stored procedure.
StoredProcPath: the path to the stored procedure to execute when the corresponding event occurs.

returns: ok if the trigger was registered, an {error, Reason} tuple otherwise.

Registers a trigger.

register_projection/4

register_projection(StoreId, PathPattern, Projection, Options) -> Ret

StoreId: the name of the Ra cluster.
PathPattern: the pattern of tree nodes which should be projected.
Projection: the projection record created with khepri_projection:new/3.
Options: command options such as the command type.

returns: ok if the projection was registered, an {error, Reason} tuple otherwise.

Registers a projection.

unregister_projections/3

unregister_projections(StoreId, Names, Options) -> Ret

StoreId: the name of the Ra cluster.
Names: the names of projections to unregister or the atom all to remove all projections.
Options: command options such as the command type.

returns: {ok, ProjectionMap} if the command succeeds, {error, Reason} otherwise. The ProjectionMap is a map with projection names (khepri_projection:name() keys associated to the pattern to which each projection was registered.

Removes the given projections from the store.

Names may either be a list of projection names to remove or the atom all. When all is passed, every projection in the store is removed.

version/0

version() -> MacVer

Returns the state machine version.

which_module/1

which_module(MacVer) -> Module

Returns the state machine module corresponding to the given version.

does_api_comply_with/2

does_api_comply_with(Behaviour, MacVer::MacVer | StoreId) -> DoesUse

returns: true if the given behaviour is activated, false if it is not or if the behaviour is unknown.

Indicates if a new behaviour of the transaction API is activated.

The transaction code is compiled on one Erlang node with a specific version of Khepri. However, it is executed on all members of the Khepri cluster. Some Erlang nodes might use another version of Khepri, newer or older, and the transaction API may differ.

For instance in Khepri 0.17.x, the return values of the khepri_tx_adv functions changed. The transaction code will have to handle both versions of the API to work correctly. Thus it can use this function to adapt.


Generated by EDoc