Utils

Vampire Wrapper

class gym_saturation.vampire_wrapper.VampireWrapper(binary_path: str, command_line_arguments: str | None = None)

A wrapper around Vampire binary running in a manual clause selection mode.

Parameters:
  • binary_path – an absolute path to Vampire binary

  • command_line_arguments – command line arguments in one string

>>> from gym_saturation.constants import (MOCK_TPTP_PROBLEM,
...     MOCK_TPTP_FOLDER)
>>> vampire = VampireWrapper("vampire")
>>> vampire.pick_a_clause("2")
Traceback (most recent call last):
 ...
ValueError: start solving a problem first!
>>> result = vampire.start(MOCK_TPTP_PROBLEM, MOCK_TPTP_FOLDER)
pick_a_clause(clause_label: str) tuple[tuple[str, str, str], ...]

Select a clause and get response from Vampire.

Parameters:

clause_label – a given clause order number

Returns:

a sequence of action type, clause number and clause

property proc: spawn

Vampire process.

Raises:

ValueError – when called before reset

start(problem_filename: str, tptp_folder: str) tuple[tuple[str, str, str], ...]

Start Vampire in a manual mode on a given problem.

Time limit is one day, Vampire prints everything

Parameters:
  • problem_filename – full path of a TPTP problem file

  • tptp_folder – the root folder for TPTP library

Returns:

a sequence of action type, clause number and clause

terminate() None

Terminate Vampire process if any.

Relay Server between Two Sockets

class gym_saturation.relay_server.RelayServer(server_address: tuple[str, int], request_handler_class: type[BaseRequestHandler], bind_and_activate: bool = True)

Server relaying i/o of a long-running connection to a TCP socket to queues.

>>> import socket
>>> from threading import Thread
>>> with RelayServer(("localhost", 0), RelayTCPHandler
... ) as relay_server:
...     thread = Thread(target=relay_server.serve_forever)
...     thread.daemon = True
...     thread.start()
...     with socket.create_connection(relay_server.server_address
...     ) as socket_connection:
...         # data sent to the socket are stored in one queue
...         socket_connection.sendall(bytes(
...             f'{{"tag": "{QUERY_END_MESSAGE}"}}\n\x00\n', "utf8")
...         )
...         print(relay_server.input_queue.get())
...         # data from another queue are sent to the client
...         relay_server.output_queue.put(b"test")
...         print(str(socket_connection.recv(4096), "utf8"))
...         # message format for closing connection
...         socket_connection.sendall(bytes(
...             f'{{"tag": "{SESSION_END_MESSAGE}"}}\n\x00\n', "utf8"
...         ))
...     relay_server.shutdown()
...     thread.join()
{'tag': 'server_queries_start'}
test
class gym_saturation.relay_server.RelayTCPHandler(request, client_address, server)

The request handler class for relay server.

handle() None

Read data from another TCP socket or send data to it.

read_messages() list[dict[str, Any]]

Read messages from TCP request.

Returns:

parsed messages

Constants used throughout the package