Utils¶
Vampire Wrapper¶
- class gym_saturation.vampire_wrapper.VampireWrapper(binary_path: str, command_line_arguments: str | None = None)¶
A wrapper around Vampire binary running in a manual clause selection mode.
- Parameters:
binary_path – an absolute path to Vampire binary
command_line_arguments – command line arguments in one string
>>> from gym_saturation.constants import (MOCK_TPTP_PROBLEM, ... MOCK_TPTP_FOLDER) >>> vampire = VampireWrapper("vampire") >>> vampire.pick_a_clause("2") Traceback (most recent call last): ... ValueError: start solving a problem first! >>> result = vampire.start(MOCK_TPTP_PROBLEM, MOCK_TPTP_FOLDER)
- pick_a_clause(clause_label: str) tuple[tuple[str, str, str], ...]¶
Select a clause and get response from Vampire.
- Parameters:
clause_label – a given clause order number
- Returns:
a sequence of action type, clause number and clause
- property proc: spawn¶
Vampire process.
- Raises:
ValueError – when called before
reset
- start(problem_filename: str, tptp_folder: str) tuple[tuple[str, str, str], ...]¶
Start Vampire in a manual mode on a given problem.
Time limit is one day, Vampire prints everything
- Parameters:
problem_filename – full path of a TPTP problem file
tptp_folder – the root folder for TPTP library
- Returns:
a sequence of action type, clause number and clause
- terminate() None¶
Terminate Vampire process if any.
Relay Server between Two Sockets¶
- class gym_saturation.relay_server.RelayServer(server_address: tuple[str, int], request_handler_class: type[BaseRequestHandler], bind_and_activate: bool = True)¶
Server relaying i/o of a long-running connection to a TCP socket to queues.
>>> import socket >>> from threading import Thread >>> with RelayServer(("localhost", 0), RelayTCPHandler ... ) as relay_server: ... thread = Thread(target=relay_server.serve_forever) ... thread.daemon = True ... thread.start() ... with socket.create_connection(relay_server.server_address ... ) as socket_connection: ... # data sent to the socket are stored in one queue ... socket_connection.sendall(bytes( ... f'{{"tag": "{QUERY_END_MESSAGE}"}}\n\x00\n', "utf8") ... ) ... print(relay_server.input_queue.get()) ... # data from another queue are sent to the client ... relay_server.output_queue.put(b"test") ... print(str(socket_connection.recv(4096), "utf8")) ... # message format for closing connection ... socket_connection.sendall(bytes( ... f'{{"tag": "{SESSION_END_MESSAGE}"}}\n\x00\n', "utf8" ... )) ... relay_server.shutdown() ... thread.join() {'tag': 'server_queries_start'} test
- class gym_saturation.relay_server.RelayTCPHandler(request, client_address, server)¶
The request handler class for relay server.
- handle() None¶
Read data from another TCP socket or send data to it.
- read_messages() list[dict[str, Any]]¶
Read messages from TCP request.
- Returns:
parsed messages