"""Implements classes for generating data by schema."""
import csv
import inspect
import json
import pickle
import re
from typing import Any, Callable, Sequence
from mimesis.exceptions import (
AliasesTypeError,
FieldArityError,
FieldError,
FieldNameError,
FieldsetError,
SchemaError,
)
from mimesis.locales import Locale
from mimesis.providers.base import BaseProvider
from mimesis.providers.generic import Generic
from mimesis.random import Random
from mimesis.types import JSON, CallableSchema, Key, MissingSeed, Seed
__all__ = [
"BaseField",
"Field",
"Fieldset",
"Schema",
"SchemaContext",
"SchemaBuilder",
"FieldHandler",
"RegisterableFieldHandler",
"RegisterableFieldHandlers",
]
FieldCache = dict[str, Callable[[Any], Any]]
FieldHandler = Callable[[Random, Any], Any]
RegisterableFieldHandler = tuple[str, FieldHandler]
RegisterableFieldHandlers = Sequence[RegisterableFieldHandler]
[docs]
class BaseField:
[docs]
def __init__(
self,
locale: Locale = Locale.DEFAULT,
seed: Seed = MissingSeed,
) -> None:
"""Base class for fields.
This class is used as a base class for :class:`Field` and :class:`Fieldset`.
:attr: aliases: A dictionary of aliases for standard fields.
:param locale: Locale.
:param seed: Seed for random.
"""
self.seed = seed
self._generic = Generic(locale, seed)
self._cache: FieldCache = {}
self._handlers: dict[str, FieldHandler] = {}
self.aliases: dict[str, str] = {}
[docs]
def reseed(self, seed: Seed = MissingSeed) -> None:
"""Reseed the random generator.
:param seed: Seed for random.
"""
self._generic.reseed(seed)
[docs]
def get_random_instance(self) -> Random:
"""Get a random object from Generic.
:return: Random object.
"""
return self._generic.random
def _explicit_lookup(self, name: str) -> Any:
"""An explicit method lookup.
This method is called when the field
defined explicitly, like this: ``provider.method``
:param name: The field name.
:return: Callable object.
:raise FieldError: When field is invalid.
"""
provider_name, method_name = name.split(".", 1)
try:
provider = getattr(self._generic, provider_name)
return getattr(provider, method_name)
except AttributeError:
raise FieldError(name)
def _fuzzy_lookup(self, name: str) -> Any:
"""A fuzzy method lookup.
This method is called when the field definition
is fuzzy, like this: ``method``
:param name: The field name.
:return: Callable object.
:raise FieldError: When field is invalid.
"""
for provider in dir(self._generic):
provider = getattr(self._generic, provider)
if isinstance(provider, BaseProvider):
if name in dir(provider):
return getattr(provider, name)
raise FieldError(name)
def _lookup_method(self, name: str) -> Any:
"""Lookup method by the field name.
:param name: The field name.
:return: Callable object.
:raise FieldError: When field is invalid.
"""
# Check if the field is defined in aliases
name = self.aliases.get(name, name)
# Support additional delimiters
name = re.sub(r"[/:\s]", ".", name)
if name.count(".") > 1:
raise FieldError(name)
if name not in self._cache:
if "." not in name:
method = self._fuzzy_lookup(name)
else:
method = self._explicit_lookup(name)
self._cache[name] = method
return self._cache[name]
def _validate_aliases(self) -> bool:
"""Validate aliases."""
if not isinstance(self.aliases, dict) or any(
not isinstance(key, str) or not isinstance(value, str)
for key, value in self.aliases.items()
):
# Reset to valid state
self.aliases = {}
raise AliasesTypeError()
return True
[docs]
def register_handler(self, field_name: str, field_handler: FieldHandler) -> None:
"""Register a new field handler.
:param field_name: Name of the field.
:param field_handler: Callable object.
"""
if not isinstance(field_name, str):
raise TypeError("Field name must be a string.")
if not field_name.isidentifier():
raise FieldNameError(field_name)
if not callable(field_handler):
raise TypeError("Handler must be a callable object.")
callable_signature = inspect.signature(field_handler)
if len(callable_signature.parameters) <= 1:
raise FieldArityError()
if field_name not in self._handlers:
self._handlers[field_name] = field_handler
[docs]
def handle(
self, field_name: str | None = None
) -> Callable[[FieldHandler], FieldHandler]:
"""Decorator for registering a custom field handler.
You can use this decorator only for functions,
not for any other callables.
.. versionadded:: 12.0.0
:param field_name: Name of the field.
If not specified, the name of the function is used.
:return: Decorator.
"""
def decorator(field_handler: FieldHandler) -> FieldHandler:
_field_name = field_name or field_handler.__name__
self.register_handler(_field_name, field_handler)
return field_handler
return decorator
[docs]
def register_handlers(self, fields: RegisterableFieldHandlers) -> None:
"""Register the new field handlers.
:param fields: A sequence of sequences with field name and handler.
:return: None.
"""
for name, handler in fields:
self.register_handler(name, handler)
[docs]
def unregister_handler(self, field_name: str) -> None:
"""Unregister a field handler.
:param field_name: Name of the field.
"""
self._handlers.pop(field_name, None)
[docs]
def unregister_handlers(self, field_names: Sequence[str] = ()) -> None:
"""Unregister a field handlers with given names.
:param field_names: Names of the fields.
:return: None.
"""
for name in field_names:
self.unregister_handler(name)
[docs]
def unregister_all_handlers(self) -> None:
"""Unregister all custom field handlers.
:return: None.
"""
self._handlers.clear()
def __str__(self) -> str:
return f"{self.__class__.__name__} <{self._generic.locale}>"
[docs]
class Field(BaseField):
"""Greedy field (evaluates immediately).
.. warning::
There is no case when you need to instance **field** in loops.
If you are doing this:
>>> for i in range(1000):
... field = Field()
You're doing it **wrong**! It is a terrible idea that will lead to a memory leak.
Forewarned is forearmed.
Here is an example of how to use it:
>>> _ = Field()
>>> _('username')
Dogtag_1836
"""
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self.perform(*args, **kwargs)
[docs]
class Fieldset(BaseField):
"""Greedy fieldset (evaluates immediately).
Works like a field, but returns a list of values.
Here is an example:
>>> fieldset = Fieldset(i=100)
>>> fieldset('username')
['pot_1821', 'vhs_1915', ..., 'reviewed_1849']
You may also specify the number of iterations by passing the **i** keyword
argument to the callable instance of fieldset:
>>> fieldset = Fieldset()
>>> fieldset('username', i=2)
['pot_1821', 'vhs_1915']
When **i** is not specified, the reasonable default is used — **10**.
See "Field vs Fieldset" section of documentation for more details.
:cvar fieldset_default_iterations: Default iterations. Default is **10**.
:cvar fieldset_iterations_kwarg: Keyword argument for iterations. Default is **i**.
"""
fieldset_default_iterations: int = 10
fieldset_iterations_kwarg: str = "i"
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize fieldset.
Accepts additional keyword argument **i** which is used
to specify the number of iterations.
The name of the keyword argument can be changed by
overriding **fieldset_iterations_kwarg** attribute of this class.
"""
self._iterations = kwargs.pop(
self.fieldset_iterations_kwarg,
self.fieldset_default_iterations,
)
super().__init__(*args, **kwargs)
def __call__(self, *args: Any, **kwargs: Any) -> list[Any]:
"""Perform fieldset.
:param args: Arguments for field.
:param kwargs: Keyword arguments for field.
:raises FieldsetError: If parameter **i** is less than 1.
:return: List of values.
"""
min_iterations = 1
iterations = kwargs.pop(
self.fieldset_iterations_kwarg,
self._iterations,
)
if iterations < min_iterations:
raise FieldsetError()
return [self.perform(*args, **kwargs) for _ in range(iterations)]
[docs]
class SchemaContext:
"""Context object passed to transformation functions."""
__slots__ = ("index", "iteration", "timestamp", "seed", "custom", "schema_builder")
[docs]
def __init__(
self,
index: int,
seed: Seed = MissingSeed,
custom: dict[str, Any] | None = None,
builder: "SchemaBuilder | None" = None,
) -> None:
"""Initialize context.
:param index: Current iteration index (0-based).
:param seed: Current seed state.
:param custom: Custom context data.
:param builder: Reference to SchemaBuilder for relational data.
"""
self.index = index
self.iteration = index + 1
self.seed = seed
self.custom = custom or {}
self.schema_builder = builder
[docs]
def pick_from(self, schema_name: str, field: str | None = None) -> Any:
"""Pick a random item from a registered schema.
:param schema_name: Name of the schema in builder registry.
:param field: Optional field to extract from item.
:return: Random item or field value.
:raises ValueError: If builder is not available or schema is not found.
"""
if not self.schema_builder:
raise ValueError("pick_from() requires SchemaBuilder")
return self.schema_builder._pick_from(schema_name, field)
[docs]
def ref(self, schema_name: str) -> list[JSON]:
"""Get all generated items from a schema.
:param schema_name: Name of the schema in builder registry.
:return: List of all items from that schema.
:raises ValueError: If builder is not available or schema is not found.
"""
if not self.schema_builder:
raise ValueError("ref() requires SchemaBuilder")
return self.schema_builder._get_data(schema_name)
[docs]
class Schema:
"""Class which return list of filled schemas."""
__slots__ = (
"iterations",
"_transformers",
"__counter",
"__schema",
"__seed",
"_custom_context",
)
[docs]
def __init__(
self,
schema: CallableSchema,
iterations: int = 10,
seed: Seed = MissingSeed,
) -> None:
"""Initialize schema.
:param schema: A schema (must be a callable object).
:param iterations: Number of iterations.
:param seed: Seed for random generator.
"""
if iterations < 1:
raise ValueError("Number of iterations should be greater than 1.")
if not callable(schema):
raise SchemaError()
self.__schema = schema
self.__seed = seed
self.__counter = 0
self.iterations = iterations
self._transformers: list[Callable[..., Any]] = []
self._custom_context: dict[str, Any] = {}
def _apply_transformers(self, item: JSON, ctx: SchemaContext) -> JSON:
"""Apply all transformers to an item.
:param item: The item to transform.
:param ctx: The context object.
:return: Transformed item.
"""
for transformer in self._transformers:
sig = inspect.signature(transformer)
param_count = len(sig.parameters)
if param_count == 1:
item = transformer(item)
elif param_count >= 2:
item = transformer(item, ctx)
else:
item = transformer(item)
return item
[docs]
def map(self, fn: Callable[..., Any]) -> "Schema":
"""Transform each generated item.
:param fn: Function to transform items.
Can accept (item) or (item, context).
:return: Self for chaining.
"""
self._transformers.append(fn)
return self
[docs]
def with_context(self, **kwargs: Any) -> "Schema":
"""Add custom context data.
:param kwargs: Custom context values.
:return: Self for chaining.
"""
self._custom_context.update(kwargs)
return self
[docs]
def to_csv(self, file_path: str, **kwargs: Any) -> None:
"""Export a schema as a CSV file.
:param file_path: The file path.
:param kwargs: The keyword arguments for :py:class:`csv.DictWriter` class.
"""
data = self.create()
with open(file_path, "w", encoding="utf-8", newline="") as fp:
fieldnames = list(data[0])
dict_writer = csv.DictWriter(fp, fieldnames, **kwargs)
dict_writer.writeheader()
dict_writer.writerows(data)
[docs]
def to_json(self, file_path: str, **kwargs: Any) -> None:
"""Export a schema as a JSON file.
:param file_path: File a path.
:param kwargs: Extra keyword arguments for :py:func:`json.dump` class.
"""
with open(file_path, "w", encoding="utf-8") as fp:
json.dump(self.create(), fp, **kwargs)
[docs]
def to_pickle(self, file_path: str, **kwargs: Any) -> None:
"""Export a schema as the pickled representation of the object to the file.
:param file_path: The file path.
:param kwargs: Extra keyword arguments for :py:func:`pickle.dump` class.
"""
with open(file_path, "wb") as fp:
pickle.dump(self.create(), fp, **kwargs)
def _create_item(self, index: int) -> JSON:
"""Create a single item with given index.
:param index: The index for the context.
:return: Generated and transformed item.
"""
ctx = SchemaContext(
index=index,
seed=self.__seed,
custom=self._custom_context,
)
result = self.__schema()
result = self._apply_transformers(result, ctx)
return result
[docs]
def create(self) -> list[JSON]:
"""Creates a list of a fulfilled schemas.
.. note::
This method evaluates immediately, so be careful when creating
large datasets otherwise you're risking running out of memory.
If you need a lazy version of this method, just use :meth:`iterator` or
the iterator protocol of :class:`Schema`
:return: List of fulfilled schemas.
"""
index = 0
results: list[JSON] = []
while len(results) < self.iterations:
result = self._create_item(index)
if result is not None:
results.append(result)
index += 1
return results
[docs]
def iterator(self) -> "Schema":
"""Return an iterator for the schema.
:return: Iterator object.
"""
return iter(self)
def __next__(self) -> JSON:
"""Return the next item from the iterator."""
while self.__counter < self.iterations:
result = self._create_item(self.__counter)
self.__counter += 1
if result is not None:
return result
raise StopIteration
def __iter__(self) -> "Schema":
"""Return the iterator object itself."""
self.__counter = 0
return self
[docs]
class SchemaBuilder:
"""Builder for creating related schemas with references."""
__slots__ = ("_schemas", "_data", "_seed", "_random")
[docs]
def __init__(self, seed: Seed = MissingSeed) -> None:
"""Initialize relation schema.
:param seed: Seed for random generator.
"""
self._schemas: dict[str, Schema] = {}
self._data: dict[str, list[JSON]] = {}
self._seed = seed
if seed is MissingSeed:
self._random = Random()
else:
# Type narrowing: seed is not MissingSeed here
self._random = Random(seed) # type: ignore[arg-type]
[docs]
def define(self, name: str, schema: Schema) -> Schema:
"""Register a schema with a name.
:param name: Name to register schema under.
:param schema: Schema instance.
:return: The schema for chaining.
"""
self._schemas[name] = schema
return schema
def _pick_from(self, schema_name: str, field: str | None = None) -> Any:
"""Pick random item from generated data.
:param schema_name: Name of schema.
:param field: Optional field to extract.
:return: Random item or field value.
"""
if schema_name not in self._data:
raise ValueError(f"Schema '{schema_name}' not yet generated")
items = self._data[schema_name]
if not items:
raise ValueError(f"Schema '{schema_name}' has no items")
item = self._random.choice(items)
return item[field] if field else item
def _get_data(self, schema_name: str) -> list[JSON]:
"""Get all data for a schema.
:param schema_name: Name of schema.
:return: List of items.
"""
if schema_name not in self._data:
raise ValueError(f"Schema '{schema_name}' not yet generated")
return self._data[schema_name]
def _wrap_transformer(self, orig_fn: Callable[..., JSON]) -> Callable[..., Any]:
"""Wrap a transformer to inject SchemaBuilder context.
:param orig_fn: Original transformer function.
:return: Wrapped transformer function.
"""
def wrapped_transformer(item: JSON, ctx: SchemaContext) -> JSON:
new_ctx = SchemaContext(
index=ctx.index,
seed=ctx.seed,
custom=ctx.custom,
builder=self,
)
sig = inspect.signature(orig_fn)
if len(sig.parameters) >= 2:
return orig_fn(item, new_ctx)
return orig_fn(item)
return wrapped_transformer
[docs]
def create(self, **counts: int) -> dict[str, list[JSON]]:
"""Create all schemas with specified counts.
:param counts: Schema names and their counts.
:return: Dictionary of schema names to generated data.
"""
result: dict[str, list[JSON]] = {}
for name, count in counts.items():
if name not in self._schemas:
raise ValueError(f"Schema '{name}' is not defined")
schema = self._schemas[name]
# Wrap transformers to inject builder context
temp_transformers = [
self._wrap_transformer(transformer)
for transformer in schema._transformers
]
old_transformers = schema._transformers
schema._transformers = temp_transformers
old_iterations = schema.iterations
schema.iterations = count
data = schema.create()
schema.iterations = old_iterations
self._data[name] = data
result[name] = data
schema._transformers = old_transformers
return result