leap_ec.distrib package

Submodules

leap_ec.distrib.asynchronous module

This provides an asynchronous steady-state fitness evaluation pipeline operator.

A common feature here is a population of evaluated individuals that is asynchronously updated via dask.

leap_ec.distrib.asynchronous.eval_population(population, client, context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})

Concurrently evaluate all the individuals in the given population

Parameters
  • population – to be evaluated

  • client – dask client

  • context – for storing count of non-viable individuals

Returns

dask distrib iterator for futures

leap_ec.distrib.asynchronous.greedy_insert_into_pop(individual, pop, max_size)

Insert the given individual into the pop of evaluated individuals.

This is greedy because we always compare the new individual with the current weakest in the pop. This is similar to tournament selection.

Just insert individuals if the pop isn’t at capacity yet

Parameters
  • individual – that was just evaluated

  • pop – of already evaluated individuals

Returns

None

leap_ec.distrib.asynchronous.replace_if(new_individual, pop, index)

Convenience function for possibly replacing pop[index] individual with new_individual depending on which has higher fitness.

Parameters
  • new_individual – is a newly evaluated individual

  • pop – of already evaluated individuals

  • index – of individual in pop to be compared against

Returns

None

leap_ec.distrib.asynchronous.steady_state(client, max_births, init_pop_size, pop_size, representation, problem, offspring_pipeline, inserter=<function greedy_insert_into_pop>, count_nonviable=False, evaluated_probe=None, pop_probe=None, context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})

Implements an asynchronous steady-state EA

Parameters
  • client – Dask client that should already be set-up

  • max_births – how many births are we allowing?

  • init_pop_size – size of initial population sent directly to workers at start

  • pop_size – how large should the population be?

  • representation – of the individuals

  • problem – to be solved

  • offspring_pipeline – for creating new offspring from the pop

  • inserter – function with signature (new_individual, pop, popsize) used to insert newly evaluated individuals into the population; defaults to greedy_insert_into_pop()

  • count_nonviable – True if we want to count non-viable individuals towards the birth budget

  • evaluated_probe – is a function taking an individual that is given the next evaluated individual; can be used to print newly evaluated individuals

  • pop_probe – is an optional function that writes a snapshot of the population to a CSV formatted stream ever N births

Returns

the population containing the final individuals

leap_ec.distrib.asynchronous.tournament_insert_into_pop(individual, pop, max_size)

Insert the given individual into the pop of evaluated individuals.

Randomly select an individual in the pop, and the individual will replace the selected individual iff it has a better fitness. This is essentially binary tournament selection.

Just insert individuals if the pop isn’t at capacity yet

TODO as with tournament selection, we should have an optional k to specify the tournament size. However, we have to be mindful that this is already k=2, so we would have to draw k-1 individuals from the population for comparison.

Parameters
  • individual – that was just evaluated

  • pop – of already evaluated individuals

  • max_size – of the pop

Returns

None

leap_ec.distrib.evaluate module

contains common evaluate() used in sync.eval_pool and async.eval_pool

leap_ec.distrib.evaluate.evaluate(individual='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})

concurrently evaluate the given individual

This is what’s invoked on each dask worker to evaluate each individual.

We log the start and end times for evaluation.

An individual is viable if an exception is NOT thrown, else it is NOT a viable individual. If not viable, we increment the context[‘leap’][ ‘distrib’][‘non_viable’] count to track such instances.

This function sets:

individual.start_eval_time has the time() of when evaluation started. individual.stop_eval_time has the time() of when evaluation finished. individual.is_viable is True if viable, else False individual.exception will be assigned any raised exceptions individual.fitness will be NaN if not viable, else the calculated fitness individual.hostname is the name of the host on which this individual was evaluated individual.pid is the process ID associated with evaluating the individual

Parameters

individual – to be evaluated

Returns

evaluated individual

leap_ec.distrib.evaluate.is_viable(individual)

evaluate.evaluate() will set an individual’s fitness to NaN and the attributes is_viable to False, and will assign any exception triggered during the individuals evaluation to exception. This just checks the individual’s is_viable; if it doesn’t have one, this assumes it is viable.

Parameters

individual – to be checked if viable

Returns

True if individual is viable

leap_ec.distrib.individual module

Subclass of core.Individual that adds some state relevant for distrib runs.

Adds:

  • uuid for each individual

  • birth ID, a unique birth number; first individual has ID 0, the last N-1.

class leap_ec.distrib.individual.DistributedIndividual(genome, decoder=None, problem=None)

Bases: RobustIndividual

birth_id = count(202)

Core individual that has unique UUID and birth ID.

clone()

Create a ‘clone’ of this Individual, copying the genome, but not fitness.

The fitness of the clone is set to None. A new UUID is generated and assigned to sefl.uuid. The parents set is updated to include the UUID of the parent. A shallow copy of the parent is made, too, so that ancillary state is also copied.

A deep copy of the genome will be created, so if your Individual has a custom genome type, it’s important that it implements the __deepcopy__() method.

>>> from leap_ec.binary_rep.problems import MaxOnes
>>> from leap_ec.decoder import IdentityDecoder
>>> import numpy as np
>>> genome = np.array([0, 1, 1, 0])
>>> ind = Individual(genome, IdentityDecoder(), MaxOnes())
>>> ind_copy = ind.clone()
>>> ind_copy.genome == ind.genome
array([ True,  True,  True,  True])
>>> ind_copy.problem == ind.problem
True
>>> ind_copy.decoder == ind.decoder
True

leap_ec.distrib.logger module

This provides a dask logging plugin that reports the hostname, worker ID, and process ID for each worker. This is useful for checking that all workers have been sanely assigned to targeted resources.

Note that once this plugin is installed that dask will ensure that each worker restarted after a failure gets the plugin re-installed, too.

class leap_ec.distrib.logger.EvaluatorLogFilter

Bases: Filter

Convenience for adding hostname and worker ID to log messages

Cribbed from https://stackoverflow.com/questions/55584115/python-logging-how-to-track-hostname-in-logs

filter(record)

Determine if the specified record is to be logged.

Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.

class leap_ec.distrib.logger.WorkerLoggerPlugin(verbose=False, *args, **kwargs)

Bases: WorkerPlugin

This dask worker plugin adds a logger for each worker that reports the hostname, worker ID, and process ID.

Usage:

client.register_worker_plugin(WorkerLoggerPlugin()) after dask client is setup.

Then in code sent to worker:

worker = get_worker() worker.logger.info(‘This is a log message’)

setup(worker: Worker)

This is invoked once for each worker on their startup. The scheduler will also ensure that all workers invoke this.

setup_logger(worker)
teardown(worker: Worker)

Run when the worker to which the plugin is attached to is closed

leap_ec.distrib.probe module

A collection of probe functions tailored for distrib evaluation

leap_ec.distrib.probe.log_pop(update_interval, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, header=True)

Regularly update a CSV formatted stream with snapshots of the given population.

This is useful for asynchronous.steady_state() to regularly probe the regularly updated population.

Parameters
  • update_interval – how often should we write a row?

  • stream – open stream to which to write rows

  • header – True if we want a header for the CSV file

Returns

a function for saving regular population snapshots

leap_ec.distrib.probe.log_worker_location(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, header=True)

When debugging dask distribution configurations, this function can be used to track what machine and process was used to evaluate a given individual. Accumulates this information to the given stream as a CSV.

Suitable for being passed as the evaluated_probe argument for leap.distrib.asynchronous.steady_state().

Parameters
  • stream – to which we want to write the machine details

  • header – True if we want a header for the CSV file

Returns

a function for recording where individuals are evaluated

leap_ec.distrib.synchronous module

This provides a synchronous fitness evaluation pipeline operator.

leap_ec.distrib.synchronous.eval_pool(next_individual='__no__default__', client='__no__default__', size='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})

concurrently evaluate size individuals

This is similar to ops.pool() in that it’s a “sink” for accumulating individuals by “pulling” individuals from upstream the pipeline via next_individual. However, it’s also like ops.evaluate() in that these individuals are concurrently evaluated via a map/reduce approach. We use dask to implement this evaluation mechanism.

If an exception is thrown while evaluating an individual, NaN is assigned as its fitness, individual.is_viable is set to False, and the associated exception is assigned to individual.exception as a post mortem aid; also core.context[‘leap’][‘distrib’][‘non_viables’] count is incremented if you want to track the number of non-viable individuals (i.e., those that have an exception thrown during evaluation); just remember to reset that between runs if that variable has been updated.

Parameters
  • next_individual – iterator/generator for individual provider

  • client – dask client through which we submit individuals to be evaluated

  • size – how many individuals to evaluate simultaneously.

  • context – for storing count of non-viable individuals

Returns

the pool of evaluated individuals

leap_ec.distrib.synchronous.eval_population(population='__no__default__', client='__no__default__', context={'leap': {'distrib': {'non_viable': 0}, 'generation': 100}})

Concurrently evaluate all the individuals in the given population

Parameters
  • population – to be evaluated

  • client – dask client

  • context – for storing count of non-viable individuals

Returns

evaluated population

Module contents