"""
Created on Wed Jun 18 12:49:42 2025
@author: Nahuel Mendez & Sebastian Jaroszewicz
"""
import numpy as np
from scipy import stats
from concurrent.futures import ThreadPoolExecutor
import os
import warnings,logging
logger = logging.getLogger(__name__)
# MFDFA (Multifractal Detrended Fluctuation Analysis) - Auxiliary function
def _process_scale(s_val, profile_data, q_values_arr, poly_order, N_len, process_from_end=False):
"""
Processes a single scale 's' for MFDFA calculation using vectorized matrix operations.
This function divides the profile data into segments of length 's_val', performs
polynomial detrending on all segments simultaneously using a least-squares
solution, and computes the fluctuation function F_q for all values in q_values_arr.
Parameters:
-----------
s_val : int
The current scale (window size) to process.
profile_data : ndarray
The integrated profile of the time series (1D array).
q_values_arr : ndarray
1D array of q exponents to compute the multifractal spectrum.
poly_order : int
Order of the polynomial for local detrending (e.g., 1 for linear).
N_len : int
Total length of the profile_data.
process_from_end : bool, optional
If True, processes segments from both the beginning and the end of the
series (2Ns segments total), ensuring no data is lost. Default is False.
Returns:
--------
tuple: (int, ndarray)
A tuple containing:
- s_val: The processed scale.
- F_q_for_this_s: 1D array of shape (len(q_values_arr),) containing
the fluctuation values for each q.
"""
if s_val <= poly_order: # Evitamos errores de grado de polinomio
return s_val, np.zeros(len(q_values_arr))
# 1. Preparar segmentos de forma matricial (Hacia adelante)
num_segments = N_len // s_val
# Cortamos la serie para que sea divisible por s_val y hacemos reshape
# shape: (num_segments, s_val)
segments_fwd = profile_data[:num_segments * s_val].reshape(num_segments, s_val)
# 2. Preparar segmentos (Hacia atrás)
if process_from_end:
segments_bwd = profile_data[N_len - num_segments * s_val:].reshape(num_segments, s_val)
all_segments = np.vstack([segments_fwd, segments_bwd])
else:
all_segments = segments_fwd
# 3. DETRENDING VECTORIZADO (El "Modo Rápido")
# Creamos una sola matriz de Vandermonde para todos los segmentos
x = np.arange(s_val)
A = np.vander(x, poly_order + 1) # Matriz base para el ajuste
# Resolvemos los coeficientes para TODOS los segmentos a la vez
# A @ Coeffs = all_segments.T
coeffs, _, _, _ = np.linalg.lstsq(A, all_segments.T, rcond=None)
# Calculamos la tendencia y restamos
trends = (A @ coeffs).T
detrended = all_segments - trends
# 4. Cálculo de Varianzas en bloque
# axis=1 calcula la media de cada segmento (fila)
segment_variances = np.mean(detrended**2, axis=1)
# Filtramos varianzas cero para evitar logs de números negativos o ceros
segment_variances = segment_variances[segment_variances > 0]
if len(segment_variances) == 0:
return s_val, np.zeros(len(q_values_arr))
# 5. Cálculo de F_q (Vectorizado también para q)
F_q_for_this_s = np.zeros(len(q_values_arr))
# Para q != 0 (Usamos broadcasting de numpy)
q_non_zero = q_values_arr != 0
q_vals = q_values_arr[q_non_zero][:, np.newaxis] # Preparamos para operar con matrices
# F_q = [mean(var^(q/2))]^(1/q)
f_q_vals = np.mean(segment_variances**(q_vals / 2.0), axis=1)**(1.0 / q_vals.flatten())
F_q_for_this_s[q_non_zero] = f_q_vals
# Para q == 0 (Caso especial: Exponente del promedio de logs)
if np.any(q_values_arr == 0):
q_zero_idx = np.where(q_values_arr == 0)[0][0]
F_q_for_this_s[q_zero_idx] = np.exp(0.5 * np.mean(np.log(segment_variances)))
return s_val, F_q_for_this_s
#-----------------------------------------------------------------------------------------------------#
#--------MFDFA (Multifractal Detrended Fluctuation Analysis) in parallel using multiprocessing--------#
#-----------------------------------------------------------------------------------------------------#
[docs]
def mfdfa(data, q_values, scales, order=1, num_cores=None,
segments_from_both_ends=False,scale_range_for_hq=None,validate=True):
"""
Performs Multifractal Detrended Fluctuations Analysis (MFDFA) in parallel.
Parameters:
-----------
data : array_like
The time series to analyze (one-dimensional).
q_values : array_like
The range of q moments for the analysis.
scales : array_like
The scales (segment lengths) to consider. Must be integers.
order : int, optional
The order of the polynomial for detrending (default is 1, linear).
num_cores : int, optional
Number of CPU cores to use. If None, use os.cpu_count().
segments_from_both_ends : bool, optional
If True, segments are taken from the start and end of the series.
If False (default) segments are taken only from the start.
scale_range_for_hq : tuple or list, optional
Tuple (min_s, max_s) defines the scale range to be used to calculate
the exponent h(q). If None (default), all valid scales are used.
validate: bool, optional
If True (default), theoretical and concavity masks are applied to validate numerical results
If False, numerical values are returned without a validation step.
Return:
--------
q : ndarray
q-values or moment exponents
h_q : ndarray
The generalized Hurst exponent for each value of q.
tau_q : ndarray
The mass scaling function for each value of q.
alpha : ndarray
The singularity (or Hölder) exponent.
f_alpha : ndarray
The singularity spectrum.
F_q_s : ndarray
The fluctuation function F_q(s) for each q and s..
"""
# =========================================================================
# --- CHECKS AND SETTINGS ---
# =========================================================================
data_arr = np.asarray(data)
q_values_arr = np.asarray(q_values)
scales_arr = np.asarray(scales, dtype=int)
#.Check the s_min
if np.min(scales_arr)<order+1:
scales_arr=scales_arr[scales_arr>=order+1]
warnings.warn("min(scale) is less than m+1. Using a safe min_scale")
# Compute the integrated profile of the time series
profile_data = np.cumsum(data_arr - np.mean(data_arr))
N_len = len(profile_data)
if num_cores is None:
# Attempt to get CPU count, default to 1 if os.cpu_count() returns None
num_cores = os.cpu_count() if os.cpu_count() is not None else 1
num_cores = min(num_cores, len(scales_arr), os.cpu_count() if os.cpu_count() else 1)
# Prepare arguments for each task including 'segments_from_both_ends'
tasks = [(s_val, profile_data, q_values_arr, order, N_len, segments_from_both_ends) for s_val in scales_arr]
# =========================================================================
# --- COMPUTING F_q(s) ---
# =========================================================================
F_q_s_matrix = np.full((len(q_values_arr), len(scales_arr)), np.nan)
if num_cores == 1:
# sequential mode
results_list = [_process_scale(*task) for task in tasks]
else:
# parallel mode
with ThreadPoolExecutor(max_workers=num_cores) as executor:
results_list = list(executor.map(lambda t: _process_scale(*t), tasks))
scale_to_original_idx = {s_val: i for i, s_val in enumerate(scales_arr)}
for s_processed, F_q_for_s_processed_arr in results_list:
if s_processed in scale_to_original_idx:
original_idx = scale_to_original_idx[s_processed]
F_q_s_matrix[:, original_idx] = F_q_for_s_processed_arr
# =========================================================================
# --- COMPUTING h(q) ---
# =========================================================================
valid_scales_mask = np.any(np.isfinite(F_q_s_matrix) & (F_q_s_matrix > 0), axis=0)
if not np.any(valid_scales_mask):
warnings.warn("All fluctuacion functions F_q(s) are zero or Nan. Cannot calculate exponents.")
nan_res = np.full(len(q_values_arr), np.nan)
return q_values_arr,nan_res, nan_res, nan_res, nan_res, F_q_s_matrix
F_q_s_filtered = F_q_s_matrix[:, valid_scales_mask]
scales_filtered = scales_arr[valid_scales_mask]
if len(scales_filtered) < 2:
warnings.warn("There is no enough valid scales for fitting. At least 2 is needed.")
nan_res = np.full(len(q_values_arr), np.nan)
return q_values_arr, nan_res, nan_res, nan_res, nan_res, F_q_s_matrix
#.Initialize array with Nan
h_q_arr = np.full(len(q_values_arr), np.nan)
for i_q_idx in range(len(q_values_arr)):
Fqs_to_fit = F_q_s_filtered[i_q_idx, :]
scales_to_fit = scales_filtered
# Apply the range for h(q) fit if its given
if scale_range_for_hq is not None and isinstance(scale_range_for_hq, (list, tuple)) and len(scale_range_for_hq) == 2:
min_s_fit, max_s_fit = scale_range_for_hq
fit_region_mask = (scales_filtered >= min_s_fit) & (scales_filtered <= max_s_fit)
#.Apply mask to F(q,s)
scales_to_fit = scales_filtered[fit_region_mask]
Fqs_to_fit = F_q_s_filtered[i_q_idx, :][fit_region_mask]
valid_points_mask = np.isfinite(Fqs_to_fit) & (Fqs_to_fit > 0)
scales_final_for_fit = scales_to_fit[valid_points_mask]
Fqs_final_for_fit = Fqs_to_fit[valid_points_mask]
# Check if we have got enough values for a linear fit
if len(scales_final_for_fit) < 2:
continue # If not, continue to the next q
log_scales = np.log(scales_final_for_fit)
log_Fqs = np.log(Fqs_final_for_fit)
try:
fit = stats.linregress(log_scales, log_Fqs, alternative='two-sided')
h_q_arr[i_q_idx] = fit.slope
#.Assess the quality of the linear fit
if fit.rvalue**2<0.85 or fit.pvalue>0.05:
#Analyze residuals
linear_fit =fit.slope*log_scales+fit.intercept
fit_residuals=log_Fqs-linear_fit
shapiro_test=stats.shapiro(fit_residuals)
if shapiro_test.pvalue<0.05: #.Significance of 5%
warnings.warn(f"Not good fit in h(q) for q={q_values_arr[i_q_idx]}",RuntimeWarning)
except ValueError as e:
warnings.warn(f"Could not calculate h(q) for q={q_values_arr[i_q_idx]:.2f} due to a regression error. This point will be skipped.", RuntimeWarning)
logger.error(f"Linear regression failed for q={q_values_arr[i_q_idx]:.2f}.", exc_info=True)
continue
# =========================================================================
# --- COMPUTING MULTIFRACTAL PARAMETERS ---
# =========================================================================
tau_q_arr = q_values_arr * h_q_arr - 1
valid_hq_mask = ~np.isnan(h_q_arr)
if not np.any(valid_hq_mask) or len(q_values_arr[valid_hq_mask]) < 2:
warnings.warn("It was not possible to calculate h(q) for sufficient q values. The multifractal spectrum cannot be determined.")
nan_alpha_res = np.full(len(q_values_arr), np.nan)
return q_values_arr, h_q_arr, tau_q_arr, nan_alpha_res, nan_alpha_res, F_q_s_matrix
q_filt = q_values_arr[valid_hq_mask]
tau_q_filt = tau_q_arr[valid_hq_mask]
if len(q_filt) < 2:
alpha_arr = np.full(len(q_values_arr), np.nan)
f_alpha_arr = np.full(len(q_values_arr), np.nan)
else:
alpha_filt = np.gradient(tau_q_filt, q_filt)
alpha_arr = np.interp(q_values_arr, q_filt, alpha_filt, left=np.nan, right=np.nan)
alpha_arr[np.isnan(h_q_arr)] = np.nan
f_alpha_arr = q_values_arr * alpha_arr - tau_q_arr
# =========================================================================
# --- ASSESMENT OF NUMERICAL LIMITS ---
# =========================================================================
#. Singularity spectrum cant be negative or greater than 1.
#. Alpha must be positive
if validate:
initial_mask = (f_alpha_arr >= 0) & (alpha_arr > 0) & (f_alpha_arr <= 1)
#.Find q=0 or nearest
center_idx = np.argmin(np.abs(q_values))
# Verify if is a valid point
if not initial_mask[center_idx]:
warnings.warn("The central point is not valid. Can't define a robust range.")
return q_values_arr,nan_res, nan_res, nan_res, nan_res, F_q_s_matrix
# Expand from center to the right
valid_end_idx = center_idx
while valid_end_idx + 1 < len(initial_mask) and initial_mask[valid_end_idx + 1]:
valid_end_idx += 1
# Expand from the left to the center
valid_start_idx = center_idx
while valid_start_idx - 1 >= 0 and initial_mask[valid_start_idx - 1]:
valid_start_idx -= 1
# Build theoretical mask
theoretical_mask = np.zeros_like(initial_mask, dtype=bool)
theoretical_mask[valid_start_idx : valid_end_idx + 1] = True
#.Sigularity spectra cannot grow (from max to edges)
if len(f_alpha_arr) < 3:
return np.ones_like(f_alpha_arr, dtype=bool)
# Start from the mid
idx_mid = np.argmin(np.abs(alpha_arr - np.median(alpha_arr)))
#.From max to the right
idx_derecho = idx_mid
while idx_derecho + 1 < len(f_alpha_arr) and f_alpha_arr[idx_derecho + 1] < f_alpha_arr[idx_derecho]:
idx_derecho += 1
#.From max to the left
idx_izquierdo = idx_mid
while idx_izquierdo - 1 >= 0 and f_alpha_arr[idx_izquierdo - 1] < f_alpha_arr[idx_izquierdo]:
idx_izquierdo -= 1
#.Build concavity mask
concavity_mask = np.zeros_like(f_alpha_arr, dtype=bool)
concavity_mask[idx_izquierdo : idx_derecho + 1] = True
#.Build final mask (valid only when both are True)
final_mask = theoretical_mask & concavity_mask
# =========================================================================
# --- APPLY MASKS ---
# =========================================================================
if not np.any(final_mask):
warnings.warn("No q-values valid for this scale range. Probably the series is not multifractal.")
nan_res = np.full(len(q_values_arr), np.nan)
# Return empty arrays
return q_values_arr, nan_res, nan_res, nan_res, nan_res, F_q_s_matrix
#.Trim arrays
q_valid = q_values[final_mask]
h_q_valid = h_q_arr[final_mask]
tau_q_valid = tau_q_arr[final_mask]
alpha_valid = alpha_arr[final_mask]
f_alpha_valid = f_alpha_arr[final_mask]
F_q_s_valid_matrix = F_q_s_matrix[final_mask, :]
logger.info(f"q-values trimmed to: [{np.min(q_valid):.2f},{np.max(q_valid):.2f}]")
return q_valid, h_q_valid, tau_q_valid, alpha_valid, f_alpha_valid, F_q_s_valid_matrix
else:
#.No validation filters applied
return q_values_arr, h_q_arr, tau_q_arr, alpha_arr, f_alpha_arr, F_q_s_matrix
#--Bootstrapping
[docs]
def bootstrap_multifractal_parameters(scales, F_q_s, q_values, n_boot=1000, conf_interval=0.95):
"""
Performs a fully vectorized bootstrap for MFDFA parameters (h, tau, alpha, f_alpha).
Uses tensor linear algebra instead of for-loops for resampling.
Parameters:
-----------
scales : array (N_scales,)
The scales 's' used in the analysis.
F_q_s : array (N_q, N_scales)
Fluctuation function matrix (pre-filtered for NaNs/Zeros).
q_values : array (N_q,)
The q exponents.
n_boot : int
Number of bootstrap iterations.
conf_interval : float
Confidence level (e.g., 0.95 for 95% CI).
Returns:
--------
dict
Dictionary with keys 'h', 'tau', 'alpha', 'f_alpha'.
Each value is a tuple: (mean, ci_lower, ci_upper, err_low, err_high).
"""
# ---------------------------------------------------------
# 1. DATA PREPARATION (Log-Log Domain)
# ---------------------------------------------------------
scales = np.asarray(scales)
log_scales = np.log(scales)
log_Fqs = np.log(F_q_s)
N_q, N_s = log_Fqs.shape
# ---------------------------------------------------------
# 2. BASE OLS FIT & RESIDUALS
# ---------------------------------------------------------
# Design Matrix X: [log_scales, 1] -> Shape (N_s, 2)
X = np.vstack([log_scales, np.ones(N_s)]).T
# Pre-calculate the OLS Projector: (X^T X)^-1 X^T -> Shape (2, N_s)
# This fixed geometry is what makes the bootstrap ultra-fast.
try:
OLS_projector = np.linalg.inv(X.T @ X) @ X.T
except np.linalg.LinAlgError:
warnings.warn("Singular matrix in Bootstrap OLS. Returning NaNs.")
nan_arr = np.full(N_q, np.nan)
return {k: (nan_arr, nan_arr, nan_arr, nan_arr, nan_arr) for k in ['h','tau','alpha','f_alpha']}
# Vectorized original fit for all q values simultaneously
# log_Fqs.T is (N_s, N_q). Resulting Betas are (2, N_q)
Betas_orig = OLS_projector @ log_Fqs.T
# Predictions and Residuals
# Y_pred: (N_s, 2) @ (2, N_q) -> (N_s, N_q). Transposed to (N_q, N_s) to match input
Y_pred = (X @ Betas_orig).T
Residuals = log_Fqs - Y_pred
# ---------------------------------------------------------
# 3. TENSOR BOOTSTRAP (The Vectorized Magic)
# ---------------------------------------------------------
# Generate random indices for residual resampling with replacement
# Shape: (N_s, n_boot)
boot_indices = np.random.randint(0, N_s, size=(N_s, n_boot))
# Construct the Bootstrap Residuals Tensor
# Res_boot shape: (N_q, N_s, n_boot) using fancy indexing
Res_boot = Residuals[:, boot_indices]
# Add to original prediction (Broadcasting)
# Expand Y_pred to (N_q, N_s, 1) for 3D addition
Y_boot = Y_pred[:, :, np.newaxis] + Res_boot
# ---------------------------------------------------------
# 4. MASSIVE OLS SOLUTION
# ---------------------------------------------------------
# Flatten last two dimensions to solve all bootstraps in one matrix mult
Y_boot_flat = Y_boot.transpose(1, 0, 2).reshape(N_s, -1) # Shape (N_s, N_q * n_boot)
# Single matrix multiplication
Betas_boot_flat = OLS_projector @ Y_boot_flat # Shape (2, N_q * n_boot)
# Reshape back to (2, N_q, n_boot). Row 0 contains h(q) slopes.
h_boot_dist = Betas_boot_flat[0, :].reshape(N_q, n_boot)
# ---------------------------------------------------------
# 5. ERROR PROPAGATION (Tau, Alpha, f_Alpha)
# ---------------------------------------------------------
# A. Tau(q) = q * h(q) - 1
q_col = q_values[:, np.newaxis]
tau_boot_dist = q_col * h_boot_dist - 1
# B. Alpha(q) = d(tau)/dq
# Compute gradient along axis 0 (q-axis) for each bootstrap column
alpha_boot_dist = np.gradient(tau_boot_dist, q_values, axis=0)
# C. f(alpha) = q * alpha - tau
f_alpha_boot_dist = q_col * alpha_boot_dist - tau_boot_dist
# ---------------------------------------------------------
# 6. STATISTICS (Percentiles)
# ---------------------------------------------------------
alpha_level = 1.0 - conf_interval
lp, up = (alpha_level / 2.0) * 100, (1.0 - alpha_level / 2.0) * 100
results = {}
def get_stats(dist_matrix):
"""Helper to compute mean and confidence intervals across bootstrap axis."""
mean_val = np.mean(dist_matrix, axis=1)
ci_low = np.percentile(dist_matrix, lp, axis=1)
ci_high = np.percentile(dist_matrix, up, axis=1)
# Absolute Error Bars (distance to mean) for plotting
err_low = mean_val - ci_low
err_high = ci_high - mean_val
return mean_val, ci_low, ci_high, err_low, err_high
results['h'] = get_stats(h_boot_dist)
results['tau'] = get_stats(tau_boot_dist)
results['alpha'] = get_stats(alpha_boot_dist)
results['f_alpha'] = get_stats(f_alpha_boot_dist)
return results