Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Atelier 1 - Introduction à la variance de blocs

🎯 But pédagogique

Montrer comment la dépendance spatiale s’atténue avec la taille du support (effet de support).


🎓 Objectif

Montrer l’effet de la taille de support (bloc) sur la variance, comparée à :

  • la variance ponctuelle ;

  • la variance de bloc théorique (calculée à partir de l’intégrale de la fonction de covariance) ;

  • la variance de bloc expérimentale, obtenue :

    • par échantillonnage aléatoire ;

    • ou par moyenne sur sous-blocs.


🔍 Concepts clés

  • Effet de support : réduction de la variance lorsque la taille du bloc augmente.

  • Covariance spatiale : mesure de la dépendance entre valeurs en fonction de la distance.

  • Variance de bloc : mesure de la variabilité moyenne sur une surface ou un volume donné.


Source
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from ipywidgets import VBox, Layout, IntSlider, FloatSlider, Dropdown, Output
from numpy.fft import fft2, ifft2, fftshift
from scipy.stats import norm
from IPython.display import display
from numpy.lib.stride_tricks import sliding_window_view

# --- DÉCLARATION DU CACHE ---
# Ce dictionnaire stockera les résultats des calculs lourds
simulation_cache = {}

# --- 1. FONCTIONS GÉOSTATISTIQUES (inchangées) ---

def theoretical_block_variance_fast(
    range_x: float, range_y: float, sill: float, nugget: float, 
    block_size: int, pixel_size: float, angle_deg: float, 
    model_name: str, n_points: int = 40 
) -> float:
    if block_size <= 1:
        return sill + nugget
    theta = np.deg2rad(angle_deg)
    coords_1d = np.linspace(-block_size * pixel_size / 2, block_size * pixel_size / 2, n_points)
    x_pts, y_pts = np.meshgrid(coords_1d, coords_1d)
    x_pts_flat, y_pts_flat = x_pts.ravel(), y_pts.ravel()
    dx = np.subtract.outer(x_pts_flat, x_pts_flat)
    dy = np.subtract.outer(y_pts_flat, y_pts_flat)
    cos_t, sin_t = np.cos(theta), np.sin(theta)
    h_rot_x = dx * cos_t + dy * sin_t
    h_rot_y = -dx * sin_t + dy * cos_t
    range_x, range_y = max(range_x, 1e-9), max(range_y, 1e-9)
    h_aniso = np.sqrt((h_rot_x / range_x)**2 + (h_rot_y / range_y)**2)
    if model_name == 'Sphérique':
        cov_mat_structured = np.where(h_aniso < 1, sill * (1 - 1.5 * h_aniso + 0.5 * h_aniso**3), 0.0)
    elif model_name == 'Exponentiel':
        cov_mat_structured = sill * np.exp(-3 * h_aniso)
    elif model_name == 'Gaussien':
        cov_mat_structured = sill * np.exp(-3 * h_aniso**2)
    else:
        cov_mat_structured = np.zeros_like(h_aniso)
    mean_structured_covariance = np.mean(cov_mat_structured)
    block_area = (block_size * pixel_size)**2
    regularized_nugget_variance = nugget / block_area
    return mean_structured_covariance + regularized_nugget_variance

def variogram_model(hx, hy, range_x, range_y, sill, nugget, angle_deg, model_name):
    theta = np.deg2rad(angle_deg)
    hx_rot = hx * np.cos(theta) + hy * np.sin(theta)
    hy_rot = -hx * np.sin(theta) + hy * np.cos(theta)
    range_x, range_y = max(range_x, 1e-9), max(range_y, 1e-9)
    h = np.sqrt((hx_rot / range_x)**2 + (hy_rot / range_y)**2)
    if model_name == 'Sphérique':
        gamma = np.where(h < 1, sill * (1.5 * h - 0.5 * h**3), sill)
    elif model_name == 'Exponentiel':
        gamma = sill * (1 - np.exp(-3 * h))
    elif model_name == 'Gaussien':
        gamma = sill * (1 - np.exp(-3 * h**2))
    else:
        raise ValueError("Modèle de variogramme non reconnu")
    return gamma

def aggregate(field: np.ndarray, block_size: int) -> np.ndarray:
    """
    Moyenne mobile glissante sur un champ 2D.
    Si field est (n,n) et block_size=b, le résultat est (n-b+1, n-b+1).
    """
    if block_size <= 1:
        return field

    # Vue glissante de taille (block_size, block_size)
    windows = sliding_window_view(field, (block_size, block_size))
    # windows.shape = (n-b+1, n-b+1, b, b)

    # Moyenne sur les axes du bloc
    return windows.mean(axis=(-1, -2))

def variance_blocks(field, max_support):
    sizes = list(range(1, max_support + 1))
    variances = []
    for s in sizes:
        if field.shape[0] < s: break
        aggregated_field = aggregate(field, s)
        block_variance = np.var(aggregated_field, ddof=1) if aggregated_field.size > 1 else np.nan
        variances.append(block_variance)
    return sizes[:len(variances)], variances

def fftma_simulation(size, range_x, range_y, sill, nugget, angle_deg, model_name, seed=None):
    if seed: np.random.seed(seed)
    size_big = 2 * size
    x, y = np.arange(-size_big // 2, size_big // 2), np.arange(-size_big // 2, size_big // 2)
    hx, hy = np.meshgrid(x, y, indexing='ij')
    gamma_structured = variogram_model(hx, hy, range_x, range_y, sill, nugget, angle_deg, model_name)
    cov = sill - gamma_structured
    cov[size_big // 2, size_big // 2] += nugget
    cov_fft = fft2(fftshift(cov))
    spectrum = np.maximum(0, np.real(cov_fft))
    white_noise = np.random.normal(0, 1, (size_big, size_big))
    wn_fft = fft2(white_noise)
    field_fft = wn_fft * np.sqrt(spectrum)
    field_big = np.real(ifft2(field_fft))
    center = size_big // 2
    field = field_big[center - size // 2:center + size // 2, center - size // 2:center + size // 2]
    return field


# --- 2. NOUVELLE STRUCTURE D'INTERFACE INTERACTIVE ---

# Définition des widgets
# ... (inchangés, mais je les remets pour que le code soit complet)
slider_style = {'description_width': '160px'}
slider_layout = Layout(width='450px')
support_slider = IntSlider(min=1, max=40, step=1, value=1, description='Support (pixels)', style=slider_style, layout=slider_layout, continuous_update=False)
range_y_slider = FloatSlider(min=1, max=50, step=1, value=15, description='Portée petite ($a_p$)', style=slider_style, layout=slider_layout)
range_x_slider = FloatSlider(min=1, max=50, step=1, value=30, description='Portée grande ($a_g$)', style=slider_style, layout=slider_layout)
sill_slider = FloatSlider(min=0.1, max=10, step=0.1, value=1.0, description='Palier structurel ($c_1$)', style=slider_style, layout=slider_layout)
nugget_slider = FloatSlider(min=0, max=1, step=0.01, value=0.1, description='Effet de pépite ($c_0$)', style=slider_style, layout=slider_layout)
angle_slider = FloatSlider(min=0, max=180, step=1, value=45, description='Angle ($\\theta$)', style=slider_style, layout=slider_layout)
model_dropdown = Dropdown(options=['Sphérique', 'Exponentiel', 'Gaussien'], value='Sphérique', description='Modèle', style=slider_style, layout=slider_layout)
button = widgets.Button(description="Lancer/Maj Simulation", button_style='success', layout=Layout(width='200px', margin='10px 0 0 165px'))
out = Output()


def update_display(support_value):
    """
    FONCTION RAPIDE : Met à jour uniquement l'affichage en utilisant les données du cache.
    """
    if not simulation_cache: # Si le cache est vide, ne rien faire
        return

    # Récupérer les données pré-calculées
    field = simulation_cache['field']
    bsizes = simulation_cache['bsizes']
    var_exp = simulation_cache['var_exp']
    var_theo = simulation_cache['var_theo']
    sill = sill_slider.value
    nugget = nugget_slider.value

    # Recréer les graphiques
    fig, axs = plt.subplots(1, 2, figsize=(14, 6))

    # 1. Image agrégée (seul calcul rapide ici)
    agg = aggregate(field, support_value)
    v_range = 2.5 * np.sqrt(sill + nugget)
    im = axs[0].imshow(agg, cmap='jet', origin='lower', vmin=-v_range, vmax=v_range)
    axs[0].set_title(f'Champ agrégé (support = {support_value}x{support_value})')
    axs[0].axis('off')
    fig.colorbar(im, ax=axs[0], label='Valeur')

    # 2. Graphique des variances (simple affichage)
    axs[1].plot(bsizes, var_exp, 'o-', label='Variance expérimentale', zorder=2)
    axs[1].plot(bsizes, var_theo, 's--', label='Variance théorique', zorder=1, color='orange')
    axs[1].axvline(support_value, color='red', linestyle=':', label=f'Support sélectionné = {support_value}')
    axs[1].set_xlabel('Taille du support (pixels)')
    axs[1].set_ylabel('Variance de bloc')
    axs[1].set_title('Variance des blocs vs. Taille de support')
    axs[1].legend(loc='upper right')
    axs[1].grid(True, linestyle='--', alpha=0.6)
    axs[1].set_xlim(0, max(bsizes) + 1)
    axs[1].set_ylim(bottom=0, top=(sill + nugget) * 1.05)
    
    plt.tight_layout()
    
    # Mettre à jour la sortie
    out.clear_output(wait=True)
    with out:
        display(fig)
    plt.close(fig)

def run_full_simulation(b):
    """
    FONCTION LENTE : Exécute la simulation complète et stocke les résultats dans le cache.
    Déclenchée uniquement par le bouton.
    """
    global simulation_cache # On modifie le cache global
    
    # Lire tous les paramètres des widgets
    params = {
        'range_x': max(range_x_slider.value, range_y_slider.value),
        'range_y': min(range_x_slider.value, range_y_slider.value),
        'sill': sill_slider.value,
        'nugget': nugget_slider.value,
        'angle_deg': angle_slider.value,
        'model_name': model_dropdown.value
    }
    max_support = support_slider.max

    # 1. Lancer la simulation (partie la plus lente)
    field_sim = fftma_simulation(size=256, seed=4263, **params)
    field = (field_sim - np.mean(field_sim)) / np.std(field_sim) * np.sqrt(params['sill'] + params['nugget'])

    # 2. Calculer les courbes de variance (également lent)
    bsizes, var_exp = variance_blocks(field, max_support)
    var_theo = [
        theoretical_block_variance_fast(block_size=b, pixel_size=1.0, **params)
        for b in bsizes
    ]
    
    # 3. Stocker tous les résultats dans le cache
    simulation_cache = {
        'field': field,
        'bsizes': bsizes,
        'var_exp': var_exp,
        'var_theo': var_theo
    }
    
    # 4. Appeler la fonction d'affichage rapide
    update_display(support_slider.value)

def on_support_slider_change(change):
    """
    Appelée à chaque fois que le curseur 'Support' change.
    """
    update_display(change.new)

# --- 3. CONNEXION DES WIDGETS AUX FONCTIONS ---
button.on_click(run_full_simulation)
support_slider.observe(on_support_slider_change, names='value')

# Affichage de l'interface
ui = VBox([
    support_slider, range_x_slider, range_y_slider, sill_slider,
    nugget_slider, angle_slider, model_dropdown, button
])

print("1. Configurez les paramètres, puis cliquez sur 'Lancer/Maj Simulation'.")
print("2. Déplacez ensuite le curseur 'Support' pour une mise à jour instantanée.")
display(ui, out)
1. Configurez les paramètres, puis cliquez sur 'Lancer/Maj Simulation'.
2. Déplacez ensuite le curseur 'Support' pour une mise à jour instantanée.
Loading...
Loading...