leap_ec.distrib package
Submodules
leap_ec.distrib.asynchronous module
This provides an asynchronous steady-state fitness evaluation pipeline operator.
A common feature here is a population of evaluated individuals that is asynchronously updated via dask.
- leap_ec.distrib.asynchronous.eval_population(population, client, context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})
Concurrently evaluate all the individuals in the given population
- Parameters
population – to be evaluated
client – dask client
context – for storing count of non-viable individuals
- Returns
dask distrib iterator for futures
- leap_ec.distrib.asynchronous.greedy_insert_into_pop(individual, pop, max_size)
Insert the given individual into the pop of evaluated individuals.
This is greedy because we always compare the new individual with the current weakest in the pop. This is similar to tournament selection.
Just insert individuals if the pop isn’t at capacity yet
- Parameters
individual – that was just evaluated
pop – of already evaluated individuals
- Returns
None
- leap_ec.distrib.asynchronous.replace_if(new_individual, pop, index)
Convenience function for possibly replacing pop[index] individual with new_individual depending on which has higher fitness.
- Parameters
new_individual – is a newly evaluated individual
pop – of already evaluated individuals
index – of individual in pop to be compared against
- Returns
None
- leap_ec.distrib.asynchronous.steady_state(client, max_births, init_pop_size, pop_size, representation, problem, offspring_pipeline, inserter=<function greedy_insert_into_pop>, count_nonviable=False, evaluated_probe=None, pop_probe=None, context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})
Implements an asynchronous steady-state EA
- Parameters
client – Dask client that should already be set-up
max_births – how many births are we allowing?
init_pop_size – size of initial population sent directly to workers at start
pop_size – how large should the population be?
representation – of the individuals
problem – to be solved
offspring_pipeline – for creating new offspring from the pop
inserter – function with signature (new_individual, pop, popsize) used to insert newly evaluated individuals into the population; defaults to greedy_insert_into_pop()
count_nonviable – True if we want to count non-viable individuals towards the birth budget
evaluated_probe – is a function taking an individual that is given the next evaluated individual; can be used to print newly evaluated individuals
pop_probe – is an optional function that writes a snapshot of the population to a CSV formatted stream ever N births
- Returns
the population containing the final individuals
- leap_ec.distrib.asynchronous.tournament_insert_into_pop(individual, pop, max_size)
Insert the given individual into the pop of evaluated individuals.
Randomly select an individual in the pop, and the individual will replace the selected individual iff it has a better fitness. This is essentially binary tournament selection.
Just insert individuals if the pop isn’t at capacity yet
TODO as with tournament selection, we should have an optional k to specify the tournament size. However, we have to be mindful that this is already k=2, so we would have to draw k-1 individuals from the population for comparison.
- Parameters
individual – that was just evaluated
pop – of already evaluated individuals
max_size – of the pop
- Returns
None
leap_ec.distrib.evaluate module
contains common evaluate() used in sync.eval_pool and async.eval_pool
- leap_ec.distrib.evaluate.evaluate(individual='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})
concurrently evaluate the given individual
This is what’s invoked on each dask worker to evaluate each individual.
We log the start and end times for evaluation.
An individual is viable if an exception is NOT thrown, else it is NOT a viable individual. If not viable, we increment the context[‘leap’][ ‘distrib’][‘non_viable’] count to track such instances.
This function sets:
individual.start_eval_time has the time() of when evaluation started. individual.stop_eval_time has the time() of when evaluation finished. individual.is_viable is True if viable, else False individual.exception will be assigned any raised exceptions individual.fitness will be NaN if not viable, else the calculated fitness individual.hostname is the name of the host on which this individual was evaluated individual.pid is the process ID associated with evaluating the individual
- Parameters
individual – to be evaluated
- Returns
evaluated individual
- leap_ec.distrib.evaluate.is_viable(individual)
evaluate.evaluate() will set an individual’s fitness to NaN and the attributes is_viable to False, and will assign any exception triggered during the individuals evaluation to exception. This just checks the individual’s is_viable; if it doesn’t have one, this assumes it is viable.
- Parameters
individual – to be checked if viable
- Returns
True if individual is viable
leap_ec.distrib.individual module
Subclass of core.Individual that adds some state relevant for distrib runs.
Adds:
uuid for each individual
birth ID, a unique birth number; first individual has ID 0, the last N-1.
- class leap_ec.distrib.individual.DistributedIndividual(genome, decoder=None, problem=None)
Bases:
RobustIndividual
- birth_id = count(202)
Core individual that has unique UUID and birth ID.
- clone()
Create a ‘clone’ of this Individual, copying the genome, but not fitness.
The fitness of the clone is set to None. A new UUID is generated and assigned to sefl.uuid. The parents set is updated to include the UUID of the parent. A shallow copy of the parent is made, too, so that ancillary state is also copied.
A deep copy of the genome will be created, so if your Individual has a custom genome type, it’s important that it implements the __deepcopy__() method.
>>> from leap_ec.binary_rep.problems import MaxOnes >>> from leap_ec.decoder import IdentityDecoder >>> import numpy as np >>> genome = np.array([0, 1, 1, 0]) >>> ind = Individual(genome, IdentityDecoder(), MaxOnes()) >>> ind_copy = ind.clone() >>> ind_copy.genome == ind.genome array([ True, True, True, True]) >>> ind_copy.problem == ind.problem True >>> ind_copy.decoder == ind.decoder True
leap_ec.distrib.logger module
This provides a dask logging plugin that reports the hostname, worker ID, and process ID for each worker. This is useful for checking that all workers have been sanely assigned to targeted resources.
Note that once this plugin is installed that dask will ensure that each worker restarted after a failure gets the plugin re-installed, too.
- class leap_ec.distrib.logger.EvaluatorLogFilter
Bases:
Filter
Convenience for adding hostname and worker ID to log messages
Cribbed from https://stackoverflow.com/questions/55584115/python-logging-how-to-track-hostname-in-logs
- filter(record)
Determine if the specified record is to be logged.
Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.
- class leap_ec.distrib.logger.WorkerLoggerPlugin(verbose=False, *args, **kwargs)
Bases:
WorkerPlugin
This dask worker plugin adds a logger for each worker that reports the hostname, worker ID, and process ID.
Usage:
client.register_worker_plugin(WorkerLoggerPlugin()) after dask client is setup.
Then in code sent to worker:
worker = get_worker() worker.logger.info(‘This is a log message’)
- setup(worker: Worker)
This is invoked once for each worker on their startup. The scheduler will also ensure that all workers invoke this.
- setup_logger(worker)
- teardown(worker: Worker)
Run when the worker to which the plugin is attached to is closed
leap_ec.distrib.probe module
A collection of probe functions tailored for distrib evaluation
- leap_ec.distrib.probe.log_pop(update_interval, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, header=True)
Regularly update a CSV formatted stream with snapshots of the given population.
This is useful for asynchronous.steady_state() to regularly probe the regularly updated population.
- Parameters
update_interval – how often should we write a row?
stream – open stream to which to write rows
header – True if we want a header for the CSV file
- Returns
a function for saving regular population snapshots
- leap_ec.distrib.probe.log_worker_location(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, header=True)
When debugging dask distribution configurations, this function can be used to track what machine and process was used to evaluate a given individual. Accumulates this information to the given stream as a CSV.
Suitable for being passed as the evaluated_probe argument for leap.distrib.asynchronous.steady_state().
- Parameters
stream – to which we want to write the machine details
header – True if we want a header for the CSV file
- Returns
a function for recording where individuals are evaluated
leap_ec.distrib.synchronous module
This provides a synchronous fitness evaluation pipeline operator.
- leap_ec.distrib.synchronous.eval_pool(next_individual='__no__default__', client='__no__default__', size='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})
concurrently evaluate size individuals
This is similar to ops.pool() in that it’s a “sink” for accumulating individuals by “pulling” individuals from upstream the pipeline via next_individual. However, it’s also like ops.evaluate() in that these individuals are concurrently evaluated via a map/reduce approach. We use dask to implement this evaluation mechanism.
If an exception is thrown while evaluating an individual, NaN is assigned as its fitness, individual.is_viable is set to False, and the associated exception is assigned to individual.exception as a post mortem aid; also core.context[‘leap’][‘distrib’][‘non_viables’] count is incremented if you want to track the number of non-viable individuals (i.e., those that have an exception thrown during evaluation); just remember to reset that between runs if that variable has been updated.
- Parameters
next_individual – iterator/generator for individual provider
client – dask client through which we submit individuals to be evaluated
size – how many individuals to evaluate simultaneously.
context – for storing count of non-viable individuals
- Returns
the pool of evaluated individuals
- leap_ec.distrib.synchronous.eval_population(population='__no__default__', client='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})
Concurrently evaluate all the individuals in the given population
- Parameters
population – to be evaluated
client – dask client
context – for storing count of non-viable individuals
- Returns
evaluated population