1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
synthetic_lst.py
----------------
從 YAML config 產生合成 LST (Land Surface Temperature) 時空場。
重點功能:
- 支援多種地形類別(含 ocean, plain, mountain, river, desert, forest,
wetland, urban, basin, plateau, lake, grassland, mesa)
- 支援高度場 (Perlin-like 多尺度噪聲 / Gaussian blur)
- 支援空間相關隨機場 (FFT-based Gaussian RF)
- 時序成分包含:年週期、週期(7 天可調)、日變化(雙峰 Gaussian)、時間自相關
- 逐時間 chunk 寫入 Zarr(預設)或可切換成 netCDF(需考慮 I/O)
- 強烈建議先用 test 模式(低解析度)測試
"""
import argparse
import os
import math
import yaml
import datetime
import numpy as np
import xarray as xr
import zarr
from scipy import ndimage, fftpack
from tqdm import tqdm
# -------------------------------
# Utilities: unit conversions
# -------------------------------
def c_to_k(arr):
return arr + 273.15
def c_to_f(arr):
return arr * 9.0 / 5.0 + 32.0
def convert_from_c(arr, unit:str, custom_scale=1.0, custom_offset=0.0):
if unit == "C": return arr
if unit == "K": return c_to_k(arr)
if unit == "F": return c_to_f(arr)
if unit == "custom": return arr * custom_scale + custom_offset
raise ValueError(f"Unsupported unit: {unit}")
# -------------------------------
# Config loader and test-mode override
# -------------------------------
def load_config(path:str) -> dict:
with open(path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg
def apply_test_mode_overrides(cfg:dict, test:bool):
# If test flag or meta.test_mode present, override to small sizes
test_mode = cfg.get("meta", {}).get("test_mode", {})
if test or test_mode.get("enabled", False):
print("[INFO] Test mode enabled -> applying safe overrides")
# spatial resolution override
spatial_res = test_mode.get("spatial_res", 0.5)
cfg["grid"]["resolution"] = spatial_res
# days override
n_days = test_mode.get("n_days", 30)
cfg["time"]["days"] = n_days
# reduce chunk size for safe IO if present
if "output" in cfg and "chunk" in cfg["output"]:
cfg["output"]["chunk"] = {"time": 72, "lat": 100, "lon": 100}
return cfg
# -------------------------------
# Grid builder
# -------------------------------
def build_grid(cfg:dict):
lat0, lat1 = cfg["grid"]["lat_range"]
lon0, lon1 = cfg["grid"]["lon_range"]
res = cfg["grid"]["resolution"]
lats = np.arange(lat0, lat1, res)
lons = np.arange(lon0, lon1, res)
return lats, lons
# -------------------------------
# Elevation generator (multiscale noise)
# -------------------------------
# def generate_elevation(nlat:int, nlon:int, cfg:dict, seed:int=0) -> np.ndarray:
# """
# Generate smoother elevation field using layered Gaussian filtered noise
# to simulate fractal terrain with larger continuous mountain and basin regions.
# Adds a large-scale slope to create a more natural gradual terrain change.
# Output in meters within [min_altitude, max_altitude].
# """
# rng = np.random.RandomState(seed)
# base = np.zeros((nlat, nlon))
# # 多層頻率的 Gaussian-filtered noise,加權累加 (fBm-like)
# freqs = [64, 32, 16, 8, 4]
# weights = [0.5, 0.25, 0.15, 0.07, 0.03]
# for f, w in zip(freqs, weights):
# noise = rng.normal(size=(nlat, nlon))
# smooth_noise = ndimage.gaussian_filter(noise, sigma=f)
# base += w * smooth_noise
# # normalize to 0..1
# bmin, bmax = base.min(), base.max()
# norm = (base - bmin) / (bmax - bmin + 1e-12)
# # 加入一個大尺度的坡度,使地形更有分區特徵
# y_indices = np.linspace(0, 1, nlat)[:, None]
# slope = 0.3 * y_indices # 0~0.3 的南北坡度調整,可以調整強度
# norm = norm * 0.7 + slope * 0.3 # 混合坡度和噪聲
# # 最終標準化
# norm = (norm - norm.min()) / (norm.max() - norm.min() + 1e-12)
# # 轉換成高度值
# alt_min = cfg["elevation"].get("min_altitude", 0)
# alt_max = cfg["elevation"].get("max_altitude", 5000)
# elev = alt_min + norm * (alt_max - alt_min)
# return elev
# # -------------------------------
# # Simple terrain classification
# # -------------------------------
# def classify_terrain(elev:np.ndarray, cfg:dict, seed:int=0) -> np.ndarray:
# """
# Produce an integer-coded terrain map corresponding to enabled terrain types
# Order of assignment: ocean margin, lakes, rivers, mountains, plateau, basin, mesa,
# deserts (patchy), forest/grassland/wetland/urban/plain fallback.
# Returns terrain_code array and a dict mapping code->name.
# """
# rng = np.random.RandomState(seed+1)
# elev = np.asarray(elev)
# nlat, nlon = elev.shape
# terrain_codes = np.full((nlat, nlon), fill_value=-1, dtype=np.int16)
# types = cfg["terrain"]["types"]
# name2code = {}
# code = 0
# # assign codes for enabled types in deterministic order
# for t in types:
# if t.get("enabled", False):
# name2code[t["name"]] = code
# code += 1
# # helper
# def set_mask(name, mask):
# cid = name2code[name]
# terrain_codes[mask] = cid
# # ocean margin: outer 5% -> ocean if ocean enabled
# if "ocean" in name2code:
# rows, cols = elev.shape
# mlat = max(1, int(0.05 * rows))
# mlon = max(1, int(0.05 * cols))
# ocean_mask = np.zeros_like(elev, dtype=bool)
# ocean_mask[:mlat, :] = True
# ocean_mask[-mlat:, :] = True
# ocean_mask[:, :mlon] = True
# ocean_mask[:, -mlon:] = True
# set_mask("ocean", ocean_mask)
# # lakes: randomly place some low-elevation basins
# if "lake" in name2code and cfg["water_features"].get("lake_count", 0) > 0:
# lc = cfg["water_features"]["lake_count"]
# size_min, size_max = cfg["water_features"].get("lake_size_range", [2, 8])
# for _ in range(lc):
# cy = rng.randint(0, nlat)
# cx = rng.randint(0, nlon)
# r = rng.randint(size_min, size_max)
# y0, y1 = max(0, cy - r), min(nlat, cy + r)
# x0, x1 = max(0, cx - r), min(nlon, cx + r)
# lake_mask = np.zeros_like(elev, dtype=bool)
# lake_mask[y0:y1, x0:x1] = True
# set_mask("lake", lake_mask)
# # rivers: simple meandering lines from high to ocean edge
# if "river" in name2code and cfg["water_features"].get("river_count", 0) > 0:
# rc = cfg["water_features"]["river_count"]
# rw = cfg["water_features"].get("river_width", 2)
# for _ in range(rc):
# y = rng.randint(int(0.2*nlat), int(0.8*nlat))
# x = int(0.9*nlon)
# for _step in range(nlon):
# if 0 <= y < nlat and 0 <= x < nlon:
# y0 = max(0, y-rw); y1 = min(nlat, y+rw+1)
# x0 = max(0, x-rw); x1 = min(nlon, x+rw+1)
# terrain_codes[y0:y1, x0:x1] = name2code["river"]
# x -= rng.randint(1,4)
# y += rng.randint(-2,3)
# if x <= 0:
# break
# # mountains: elevation threshold
# if "mountain" in name2code:
# thr_val = cfg["terrain"]["types"][ [t["name"] for t in cfg["terrain"]["types"]].index("mountain") ] .get("min_elev_m", cfg["elevation"].get("mountain_altitude_bias", 1500))
# # safe fallback: treat top 10% elev as mountain if threshold missing
# if thr_val is None or not isinstance(thr_val, (int, float)):
# thr_val = np.percentile(elev, 90)
# mask_m = elev >= thr_val
# terrain_codes[mask_m] = name2code["mountain"]
# # plateau and mesa: patches using elevation bands
# if "plateau" in name2code:
# mask_plateau = (elev > np.percentile(elev, 60)) & (elev < np.percentile(elev, 85))
# terrain_codes[mask_plateau] = name2code["plateau"]
# if "mesa" in name2code:
# mask_mesa = (elev > np.percentile(elev, 45)) & (elev < np.percentile(elev, 60))
# terrain_codes[mask_mesa] = name2code["mesa"]
# # basin: low elevation enclosed areas -> approximate by low-elev patches
# if "basin" in name2code:
# lowmask = elev < np.percentile(elev, 20)
# # apply morphological closing to make basin areas larger
# closed = ndimage.grey_closing(lowmask.astype(int), size=(5,5)).astype(bool)
# terrain_codes[closed] = name2code["basin"]
# # deserts (patchy): random low vegetation areas in low to mid elevations
# if "desert" in name2code:
# randmask = rng.rand(*elev.shape) < 0.02
# elev_mask = elev < np.percentile(elev, 60)
# terrain_codes[np.logical_and(randmask, elev_mask)] = name2code["desert"]
# # forests / grasslands / wetland / urban / plain fallback
# # priority: lake/river/mountain/plateau/basin/mesa/desert already set
# for name in ["forest", "grassland", "wetland", "urban", "plain"]:
# if name in name2code:
# # assign to remaining undefined (-1)
# terrain_codes[terrain_codes==-1] = name2code[name]
# # final fill: any still -1 -> plain code or 0
# if (terrain_codes==-1).any():
# fallback = name2code.get("plain", list(name2code.values())[0])
# terrain_codes[terrain_codes==-1] = fallback
# return terrain_codes, name2code
def generate_base_terrain(nlat, nlon, seed):
rng = np.random.RandomState(seed)
base = np.zeros((nlat, nlon))
freqs = [64, 32, 16, 8, 4]
weights = [0.5, 0.25, 0.15, 0.07, 0.03]
for f, w in zip(freqs, weights):
noise = rng.normal(size=(nlat, nlon))
smooth = ndimage.gaussian_filter(noise, sigma=f)
base += w * smooth
base = (base - base.min()) / (base.max() - base.min() + 1e-12)
# 多方向坡度:南北+東西+斜向
y = np.linspace(0, 1, nlat)[:, None]
x = np.linspace(0, 1, nlon)[None, :]
slope = 0.2 * y + 0.1 * x + 0.1 * (y * x) + 0.1 * (1 - y) * x # 可調係數
base = base * 0.6 + slope * 0.4
base = (base - base.min()) / (base.max() - base.min() + 1e-12)
return base
def generate_ocean_mask(nlat, nlon, seed):
rng = np.random.RandomState(seed + 10)
ocean_mask = np.zeros((nlat, nlon), dtype=bool)
# 海洋中心位置和大致範圍(可隨機調整)
center_y = int(nlat * 0.7 + rng.uniform(-0.1, 0.1) * nlat)
center_x = int(nlon * 0.3 + rng.uniform(-0.1, 0.1) * nlon)
max_radius_y = int(nlat * rng.uniform(0.1, 0.6))
max_radius_x = int(nlon * rng.uniform(0.1, 0.8))
# 海洋基本形狀
y, x = np.ogrid[:nlat, :nlon]
# 計算距離中心的歸一化距離場(非橢圓公式,而是曼哈頓距離 + 隨機形狀)
dist_y = np.abs(y - center_y) / max_radius_y
dist_x = np.abs(x - center_x) / max_radius_x
dist_norm = (dist_y + dist_x) / 2 # 曼哈頓距離平均,避免完美橢圓
# 產生平滑噪聲製造不規則
noise = rng.normal(size=(nlat, nlon))
smooth_noise = ndimage.gaussian_filter(noise, sigma=20)
smooth_noise = (smooth_noise - smooth_noise.min()) / (smooth_noise.max() - smooth_noise.min() + 1e-12)
# 結合距離與噪聲,距離越近,越可能是海洋
mask_score = 1 - dist_norm + 0.5 * smooth_noise
# 閾值決定海洋區域,閾值可調整
threshold = 0.6 + rng.uniform(-0.05, 0.05)
ocean_mask = mask_score > threshold
# 加入隨機噪聲變形海岸線(海洋邊界不規則)
noise = rng.normal(size=(nlat, nlon))
smooth_noise = ndimage.gaussian_filter(noise, sigma=15)
threshold = 0.15 # 控制變形幅度
irregular_mask = smooth_noise > threshold
ocean_mask &= irregular_mask | ocean_mask # 保持大致海洋區域,同時加不規則邊界
# 再次平滑形狀,去除孤島小塊,保留大塊海洋
ocean_mask = ndimage.binary_closing(ocean_mask, structure=np.ones((15, 15)))
ocean_mask = ndimage.binary_opening(ocean_mask, structure=np.ones((7, 7)))
# 建立距離邊界的距離圖,海水越靠邊界越淺
dist_to_land = ndimage.distance_transform_edt(ocean_mask)
dist_to_edge = 1 - dist_to_land / (dist_to_land.max() + 1e-12) # 正規化0~1,越遠越深
# 加入海溝噪聲,讓海底地形多變
trench_noise = rng.normal(size=(nlat, nlon))
trench_smooth = ndimage.gaussian_filter(trench_noise, sigma=10)
trench_smooth = (trench_smooth - trench_smooth.min()) / (trench_smooth.max() - trench_smooth.min() + 1e-12)
trench_effect = trench_smooth * dist_to_edge # 越深海溝影響越大
# 合成海底深度因子
ocean_depth_factor = dist_to_edge + 0.3 * trench_effect
ocean_depth_factor = np.clip(ocean_depth_factor, 0, 1)
ocean_depth_factor[~ocean_mask] = 0 # 非海洋區域深度為0
return ocean_mask, ocean_depth_factor
def generate_mountain_mask(nlat, nlon, base_terrain, ocean_mask, seed):
rng = np.random.RandomState(seed + 20)
mountain_noise = rng.normal(size=(nlat, nlon))
mountain_smooth = ndimage.gaussian_filter(mountain_noise, sigma=40)
mountain_smooth = (mountain_smooth - mountain_smooth.min()) / (mountain_smooth.max() - mountain_smooth.min() + 1e-12)
mountain_candidate = (mountain_smooth > 0.8) & (~ocean_mask)
dist = ndimage.distance_transform_edt(~mountain_candidate)
mountain_mask = dist < 8
return mountain_mask
def generate_hill_mask(nlat, nlon, base_terrain, mountain_mask, ocean_mask, seed):
rng = np.random.RandomState(seed + 25)
hill_noise = rng.normal(size=(nlat, nlon))
hill_smooth = ndimage.gaussian_filter(hill_noise, sigma=25)
hill_smooth = (hill_smooth - hill_smooth.min()) / (hill_smooth.max() - hill_smooth.min() + 1e-12)
hill_candidate = (hill_smooth > 0.65) & (~mountain_mask) & (~ocean_mask)
dist = ndimage.distance_transform_edt(~hill_candidate)
hill_mask = dist < 6
return hill_mask
def generate_basin_mask(nlat, nlon, base_terrain, mountain_mask, ocean_mask, hill_mask, seed):
rng = np.random.RandomState(seed + 30)
lowland = base_terrain < 0.3
lowland &= ~ocean_mask
lowland &= ~mountain_mask
lowland &= ~hill_mask
basin_mask = ndimage.binary_opening(lowland, structure=np.ones((5, 5)))
basin_mask = ndimage.binary_closing(basin_mask, structure=np.ones((9, 9)))
mountain_expanded = ndimage.binary_dilation(mountain_mask, iterations=10)
hill_expanded = ndimage.binary_dilation(hill_mask, iterations=5)
basin_mask &= (mountain_expanded | hill_expanded)
return basin_mask
def generate_lake_mask(nlat, nlon, basin_mask, seed, lake_count=3, lake_size_range=(4, 10)):
rng = np.random.RandomState(seed + 40)
lake_mask = np.zeros((nlat, nlon), dtype=bool)
basin_coords = np.array(np.where(basin_mask)).T
if len(basin_coords) == 0:
return lake_mask
for _ in range(lake_count):
idx = rng.randint(len(basin_coords))
cy, cx = basin_coords[idx]
r = rng.randint(lake_size_range[0], lake_size_range[1])
y0, y1 = max(0, cy - r), min(nlat, cy + r)
x0, x1 = max(0, cx - r), min(nlon, cx + r)
y_idx, x_idx = np.ogrid[y0:y1, x0:x1]
dist = (y_idx - cy) ** 2 + (x_idx - cx) ** 2
circle_mask = dist <= r ** 2
sub_basin = basin_mask[y0:y1, x0:x1]
lake_mask[y0:y1, x0:x1][circle_mask & sub_basin] = True
return lake_mask
def generate_river_mask(nlat, nlon, base_terrain, ocean_mask, mountain_mask, lake_mask, seed, river_count=3, river_width=2):
rng = np.random.RandomState(seed + 50)
river_mask = np.zeros((nlat, nlon), dtype=bool)
mountain_coords = np.array(np.where(mountain_mask)).T
if len(mountain_coords) == 0:
return river_mask
for _ in range(river_count):
# 從山脈隨機點開始
idx = rng.randint(len(mountain_coords))
y, x = mountain_coords[idx]
path = []
for _ in range(nlon * 2):
path.append((y, x))
# 河流蛇形偏移,朝低海拔方向流動
neighbors = []
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y + dy, x + dx
if 0 <= ny < nlat and 0 <= nx < nlon:
neighbors.append((ny, nx))
# 從鄰近點中選擇海拔最低點
next_cell = min(neighbors, key=lambda pos: base_terrain[pos[0], pos[1]])
if ocean_mask[next_cell]:
# 到海洋停止
break
if lake_mask[next_cell]:
# 到湖泊停止
break
y, x = next_cell
# 標記河流路徑
for (ry, rx) in path:
y0, y1 = max(0, ry - river_width), min(nlat, ry + river_width + 1)
x0, x1 = max(0, rx - river_width), min(nlon, rx + river_width + 1)
river_mask[y0:y1, x0:x1] = True
return river_mask
def classify_terrain(elev, ocean_mask, mountain_mask, hill_mask, basin_mask, lake_mask, river_mask, cfg):
nlat, nlon = elev.shape
terrain_codes = np.full((nlat, nlon), fill_value=-1, dtype=np.int16)
types = cfg["terrain"]["types"]
name2code = {}
code = 0
for t in types:
if t.get("enabled", False):
name2code[t["name"]] = code
code += 1
def set_mask(name, mask):
cid = name2code[name]
terrain_codes[mask] = cid
# 按優先順序設地形
if "ocean" in name2code:
set_mask("ocean", ocean_mask)
if "mountain" in name2code:
set_mask("mountain", mountain_mask)
if "hill" in name2code:
set_mask("hill", hill_mask)
if "basin" in name2code:
set_mask("basin", basin_mask)
if "lake" in name2code:
set_mask("lake", lake_mask)
if "river" in name2code:
set_mask("river", river_mask)
# 其他地形:plain, forest, grassland, urban, desert, wetland, plateau, mesa
remaining = terrain_codes == -1
# 平原 (低中高 elevation 覆蓋區域,但排除已標記)
if "plain" in name2code:
plain_mask = remaining & (elev < 0.5)
set_mask("plain", plain_mask)
remaining = terrain_codes == -1
# 森林 (中高 elevation)
if "forest" in name2code:
forest_mask = remaining & (elev >= 0.5) & (elev < 0.7)
set_mask("forest", forest_mask)
remaining = terrain_codes == -1
# 草原 (低中 elevation)
if "grassland" in name2code:
grass_mask = remaining & (elev >= 0.3) & (elev < 0.5)
set_mask("grassland", grass_mask)
remaining = terrain_codes == -1
# 沙漠 (desert): 通常在低海拔、低植被區域,且隨機分布一點點乾燥區塊
if "desert" in name2code:
rng = np.random.RandomState(123)
desert_base = remaining & (elev < 0.4)
desert_noise = rng.rand(nlat, nlon)
desert_mask = desert_base & (desert_noise < 0.05) # 5% 機率生成沙漠塊
set_mask("desert", desert_mask)
remaining = terrain_codes == -1
# 濕地 (wetland): 一般靠近水源(湖泊、河流)且低海拔
if "wetland" in name2code:
wetland_base = remaining & (elev < 0.4)
near_water = ndimage.binary_dilation(lake_mask | river_mask, iterations=5)
wetland_mask = wetland_base & near_water
set_mask("wetland", wetland_mask)
remaining = terrain_codes == -1
# 城市(隨機散布)
if "urban" in name2code:
rng = np.random.RandomState(42)
urban_mask = remaining & (rng.rand(nlat, nlon) < 0.01)
set_mask("urban", urban_mask)
remaining = terrain_codes == -1
# 台地、plateau, mesa 按 elevation 範圍設置
if "mesa" in name2code:
mesa_mask = remaining & (elev >= 0.6) & (elev < 0.75)
set_mask("mesa", mesa_mask)
remaining = terrain_codes == -1
if "plateau" in name2code:
plateau_mask = remaining & (elev >= 0.75)
set_mask("plateau", plateau_mask)
remaining = terrain_codes == -1
# 最後剩餘的用平原補齊
if (terrain_codes == -1).any() and "plain" in name2code:
terrain_codes[terrain_codes == -1] = name2code["plain"]
return terrain_codes, name2code
def smooth_terrain_codes(terrain_codes, iterations=3):
structure = np.ones((3, 3), dtype=bool)
smoothed = terrain_codes.copy()
for _ in range(iterations):
smoothed = ndimage.grey_dilation(smoothed, footprint=structure)
smoothed = ndimage.grey_erosion(smoothed, footprint=structure)
return smoothed
def generate_terrain(nlat, nlon, cfg, seed=0):
base_terrain = generate_base_terrain(nlat, nlon, seed)
ocean_mask, ocean_depth_factor = generate_ocean_mask(nlat, nlon, seed)
mountain_mask = generate_mountain_mask(nlat, nlon, base_terrain, ocean_mask, seed)
hill_mask = generate_hill_mask(nlat, nlon, base_terrain, mountain_mask, ocean_mask, seed)
basin_mask = generate_basin_mask(nlat, nlon, base_terrain, mountain_mask, ocean_mask, hill_mask, seed)
lake_mask = generate_lake_mask(nlat, nlon, basin_mask, seed,
lake_count=cfg["water_features"].get("lake_count", 3),
lake_size_range=cfg["water_features"].get("lake_size_range", (4, 10)))
river_mask = generate_river_mask(nlat, nlon, base_terrain, ocean_mask, mountain_mask, lake_mask, seed,
river_count=cfg["water_features"].get("river_count", 3),
river_width=cfg["water_features"].get("river_width", 2))
terrain_codes, name2code = classify_terrain(
base_terrain, ocean_mask, mountain_mask, hill_mask, basin_mask, lake_mask, river_mask, cfg)
terrain_codes = smooth_terrain_codes(terrain_codes, iterations=4)
elev_cfg = cfg.get("elevation", {})
max_alt = elev_cfg.get("max_altitude", 5000)
min_alt = elev_cfg.get("min_altitude", -500)
river_bias = elev_cfg.get("river_altitude_bias", -10)
mountain_bias = elev_cfg.get("mountain_altitude_bias", 2000)
basin_bias = elev_cfg.get("basin_altitude_bias", -200)
plateau_bias = elev_cfg.get("plateau_altitude_bias", 1500)
mesa_bias = elev_cfg.get("mesa_altitude_bias", 800)
elev = np.zeros((nlat, nlon))
for name, code in name2code.items():
mask = terrain_codes == code
if name == "ocean":
min_alt = -1 if min_alt >= 0 else min_alt
elev[mask] = min_alt * (1 - ocean_depth_factor[mask])
elif name == "mountain":
# 高山海拔依 base_terrain 正規化至 max_altitude,外加 mountain_bias
elev[mask] = base_terrain[mask] * (max_alt - mountain_bias) + mountain_bias
elif name == "hill":
elev[mask] = base_terrain[mask] * 1500 + 500
elif name == "basin":
elev[mask] = base_terrain[mask] * 500 + basin_bias
elif name == "lake":
elev[mask] = 0
elif name == "river":
elev[mask] = base_terrain[mask] * 300 + river_bias
elif name == "plateau":
elev[mask] = base_terrain[mask] * 2000 + plateau_bias
elif name == "mesa":
elev[mask] = base_terrain[mask] * 1200 + mesa_bias
else:
elev[mask] = base_terrain[mask] * 1000
elev = ndimage.gaussian_filter(elev, sigma=3)
# 海洋高度檢查
if "ocean" in name2code:
ocean_code = name2code["ocean"]
plain_code = name2code["plain"]
# 找出海洋區域且海拔 > 0 的位置索引
mask_fix = (terrain_codes == ocean_code) & (elev > 0)
# 將這些位置的地形類別改為平原
terrain_codes[mask_fix] = plain_code
return elev, terrain_codes, name2code
# -------------------------------
# Spatial Gaussian Random Field via FFT
# -------------------------------
def make_gaussian_random_field(nlat:int, nlon:int, res_deg:float, corr_length_km:float, sigma:float, seed:int=0) -> np.ndarray:
"""
Generate a stationary Gaussian random field with approximate exponential cov/distance.
Implementation: generate white noise in Fourier domain, apply radial filter exp(-k^2 * alpha)
corr_length_km -> convert to pixel sigma (approx)
Returns array of shape (nlat, nlon) with zero mean and std = sigma.
"""
# convert corr_length in km to pixel units:
km_per_deg = 111.0 # approx
pixel_km = res_deg * km_per_deg
corr_pixels = max(1.0, corr_length_km / (pixel_km + 1e-12))
# construct frequency grids
rng = np.random.RandomState(seed+2)
wn = rng.normal(size=(nlat, nlon))
# FFT filter kernel: gaussian low-pass with scale ~ corr_pixels
fx = fftpack.fftfreq(nlon)
fy = fftpack.fftfreq(nlat)
kx = fx[np.newaxis,:]
ky = fy[:,np.newaxis]
k2 = kx**2 + ky**2
# choose alpha such that filter width corresponds to corr_pixels: alpha ~ (1/(2*pi*scale))^2
scale = corr_pixels
if scale <= 0:
alpha = 1.0
else:
alpha = 1.0 / (2.0 * (np.pi * scale)**2 + 1e-12)
W = np.exp(-k2 / (alpha + 1e-12))
F = fftpack.fft2(wn) * W
field = np.real(fftpack.ifft2(F))
# normalize to sigma
field = (field - field.mean()) / (field.std() + 1e-12) * sigma
return field
# -------------------------------
# Temporal components
# -------------------------------
def seasonal_component(day_of_year:np.ndarray, A_season:float, phase_shift_days:float=0.0):
# day_of_year can be scalar or array
phi = -2.0 * np.pi * (phase_shift_days / 365.0)
A_season = np.random.uniform(A_season * 0.3, A_season)
return A_season * np.sin(2.0 * np.pi * (day_of_year / 365.0) + phi)
def weekly_component(day_index:np.ndarray, A_week:float, cycle_days:int=7):
A_week = np.random.uniform(A_week * 0.5, A_week)
return A_week * np.sin(2.0 * np.pi * (day_index / float(cycle_days)))
def diurnal_double_gaussian(hour:float, cfg_diurnal:dict, A_day: float = 1.0):
A_day = np.random.uniform(A_day * 0.8, A_day)
# two gaussian peaks: morning and afternoon/evening
# 基本日振幅
base_amp = cfg_diurnal.get("day_amp_C", 5.0) * A_day
# 早晚峰值比例
Bm = base_amp * 0.6
Ba = base_amp * 0.9
# 峰值位置與寬度
hm = cfg_diurnal.get("morning_peak_hour", 10.0)
ha = cfg_diurnal.get("afternoon_peak_hour", 15.0)
sigma_h = cfg_diurnal.get("peak_width_hours", 3.0)
# 高斯曲線疊加
val = (
Bm * np.exp(-0.5 * ((hour - hm) / sigma_h) ** 2)
+ Ba * np.exp(-0.5 * ((hour - ha) / sigma_h) ** 2)
)
return val
# -------------------------------
# Main generation loop (chunked)
# -------------------------------
def generate(cfg:dict, out_path:str, test_mode:bool=False):
# set random seed
seed = cfg.get("random", {}).get("seed", 42)
np.random.seed(seed)
# build grid
lats, lons = build_grid(cfg)
nlat, nlon = lats.size, lons.size
print(f"[INFO] Grid: {nlat} lat x {nlon} lon (resolution {cfg['grid']['resolution']} deg)")
total_points = nlat * nlon
print(f"[INFO] Total spatial points: {total_points}")
# time coords
days = cfg["time"]["days"]
hours_per_day = cfg["time"]["hours_per_day"]
nt = int(days * hours_per_day)
print(f"[INFO] Total time points: {nt}")
start_date = datetime.datetime.fromisoformat(cfg["time"].get("start_date", "2025-01-01"))
# resource safety check warning
approx_size = nt * nlat * nlon * 4.0 / (1024**3) # GB if float32
print(f"[INFO] Estimated raw data (float32) size: {approx_size:.2f} GB")
if approx_size > 200 and not test_mode:
print("WARNING: Estimated size > 200 GB. Ensure you have sufficient disk and plan chunking accordingly.")
# proceed but warn user
# generate elevation and terrain
# elev = generate_elevation(nlat, nlon, cfg, seed=seed)
# terrain_map, name2code = classify_terrain(elev, cfg, seed=seed)
elev, terrain_map, name2code = generate_terrain(nlat, nlon, cfg, seed=seed)
# produce base temperature field (°C) per pixel using terrain base and lapse rate if applicable
T0 = np.zeros_like(elev, dtype=float)
# map terrain code back to name
inv_map = {v:k for k, v in name2code.items()}
# find terrain settings dict for quick lookup
t_settings = {t["name"]: t for t in cfg["terrain"]["types"]}
for code, name in inv_map.items():
mask = (terrain_map == code)
tcfg = t_settings.get(name, {})
base = tcfg.get("base_temp_C", 20.0)
if name == "mountain" or tcfg.get("lapse_rate", 0.0) != 0.0:
lapse = tcfg.get("lapse_rate", cfg["terrain"].get("mountain", {}).get("lapse_rate", cfg["terrain"]["types"][0].get("lapse_rate", -0.0065)))
# apply lapse: T = base_at_sea_level + lapse * elev
T0[mask] = base + lapse * elev[mask]
else:
T0[mask] = base
# prepare spatial field S(x,y)
S = np.zeros_like(T0)
sp_corr = cfg.get("temperature_model", {}).get("spatial_noise_correlation", 0.0)
if sp_corr > 0.0:
corr_km = cfg.get("modulation", {}).get("spatial_field", {}).get("corr_length_km", 10.0) if cfg.get("modulation") else 10.0
sigma = cfg.get("modulation", {}).get("spatial_field", {}).get("sigma_C", 0.8) if cfg.get("modulation") else 0.8
S = make_gaussian_random_field(nlat, nlon, cfg["grid"]["resolution"], corr_km, sigma, seed=seed)
# output directory
output_dir=cfg["output"].get("directory", "")
os.makedirs(name=output_dir, exist_ok=True)
# prepare store
out_cfg = cfg.get("output", {})
filename = out_cfg.get("filename", None)
out_format = out_cfg.get("format", "zarr").lower()
if out_path is not None:
filename = out_path
if filename is None:
filename = "synthetic_LST"
if out_format not in ("zarr","netcdf"):
out_format = "zarr"
if out_format == "zarr":
filename = f'{filename}.zarr'
elif out_format == "netcdf":
filename = f'{filename}.nc'
filename = os.path.join(output_dir, filename)
chunk_cfg = cfg["output"].get("chunk", {"time": 168, "lat": 200, "lon":200})
ctime = int(chunk_cfg.get("time", min(168, nt)))
clat = int(chunk_cfg.get("lat", min(200, nlat)))
clon = int(chunk_cfg.get("lon", min(200, nlon)))
times = np.array([(start_date + datetime.timedelta(hours=i)).isoformat() for i in range(nt)], dtype=object)
# create zarr arrays (time, lat, lon)
store_path = out_path
if out_format == "zarr":
print(f"[INFO] Creating Zarr store at {store_path} ...")
zroot = zarr.open_group(store_path, mode="w")
# coords
zroot.create_group("coords")
zroot["coords/time"] = times
zroot["coords/lat"] = lats
zroot["coords/lon"] = lons
# terrain code
zroot.attrs['terrain_code'] = inv_map
# create dataset array
arr = zroot.create_array("temperature", shape=(nt, nlat, nlon), dtype="f4", chunks=(ctime, clat, clon), compressor=zarr.Blosc(cname='zstd', clevel=3))
# store terrain and elevation (2D)
zroot.create_array("elevation", data=elev.astype("f4"), chunks=(clat, clon), compressor=zarr.Blosc(cname='zstd', clevel=3))
zroot.create_array("terrain", data=terrain_map.astype("i2"), chunks=(clat, clon))
elif out_format == "netcdf":
print(f"[INFO] Preparing NetCDF store at {store_path} ...")
data = np.zeros((nt, nlat, nlon), dtype=np.float32)
else:
raise NotImplementedError(f"Unsupported output format: {out_format}")
# temporal loop by chunk
print("[INFO] Start temporal loop and write chunks ...")
A_season = cfg["temperature_model"].get("seasonal_amplitude", 10.0)
A_week = cfg["temperature_model"].get("weekly_amplitude", 2.0)
A_day = cfg["temperature_model"].get("daily_amplitude", 5.0)
noise_sigma = cfg["temperature_model"].get("noise_sigma", 1.0)
rho_t = cfg["temperature_model"].get("temporal_autocorrelation", 0.0)
# get diurnal cfg
di_cfg = cfg.get("temperature_model", {})
# We'll keep last time-step field for temporal autocorrelation
last_field = None
# convert units
unit=cfg["output"].get("temperature_unit", "C")
custom_scale=cfg["output"]["custom_unit"].get("scale", 1.0)
custom_offset=cfg["output"]["custom_unit"].get("offset", 0.0)
for tstart in tqdm(range(0, nt, ctime), desc="Generating time chunks"):
tend = min(nt, tstart + ctime)
nchunk = tend - tstart
chunk_data = np.zeros((nchunk, nlat, nlon), dtype=np.float32)
for i in range(nchunk):
idx = tstart + i
dt = start_date + datetime.timedelta(hours=idx)
day_index = (dt - start_date).days
day_of_year = dt.timetuple().tm_yday
hour = dt.hour + dt.minute/60.0
# components
Ts = seasonal_component(day_of_year, A_season, cfg.get("time", {}).get("phase_shift_days",0.0))
Tw = weekly_component(day_index, A_week, cfg.get("time", {}).get("cycle_days", 7))
Td = diurnal_double_gaussian(hour, di_cfg, A_day)
# spatial modulation factor (optional)
temporal_mod = 1.0
if cfg.get("modulation", {}) and cfg["modulation"].get("spatial_field", {}):
alpha = cfg["modulation"]["spatial_field"].get("temporal_modulation", 0.0)
temporal_mod = 1.0 + alpha * math.sin(2.0 * math.pi * (day_of_year / 365.0))
# assemble
field = T0 + Ts + Tw + Td + S * temporal_mod
# add random Gaussian noise (sensor)
field = field + np.random.normal(0.0, noise_sigma, size=field.shape)
# temporal autocorrelation: simple AR(1) with rho_t
if (rho_t is not None) and (rho_t > 0.0) and (last_field is not None):
field = rho_t * last_field + (1.0 - rho_t) * field
last_field = field
chunk_data[i,:,:] = convert_from_c(field, unit=unit, custom_scale=custom_scale, custom_offset=custom_offset).astype(np.float32)
if out_format == "zarr":
tqdm.write(f"[INFO] Writing time slice {tstart} .. {tend - 1} to zarr ...")
zroot["temperature"][tstart:tend, :, :] = chunk_data
elif out_format == "netcdf":
tqdm.write(f"[INFO] Writing time slice {tstart} .. {tend - 1} to netcdf ...")
data[tstart:tend, :, :] = chunk_data
if out_format == "netcdf":
print(f"[INFO] Saving...")
codes = np.array(list(inv_map.keys()))
names = np.array(list(inv_map.values()))
ds = xr.Dataset(
{
"temperature": (["time", "lat", "lon"], data),
"elevation": (["lat", "lon"], elev.astype(np.float32)),
"terrain": (["lat", "lon"], terrain_map.astype(np.int16)),
"terrain_code_keys": (["code"], codes),
"terrain_code_names": (["code"], names),
},
coords={
"time": times,
"lat": lats,
"lon": lons,
},
)
temp_unit = cfg["output"].get("temperature_unit", "C")
ds.temperature.attrs["units"] = f"{temp_unit}"
ds.elevation.attrs["units"] = "meters"
ds.terrain.attrs["description"] = "Terrain class"
ds.terrain_code_keys.attrs["description"] = "Terrain class codes (integers)"
ds.terrain_code_names.attrs["description"] = "Terrain class names (strings)"
ds.to_netcdf(path=filename, mode="w")
print("[INFO] NetCDF generation complete.")
elif out_format == "zarr":
print("[INFO] Zarr generation complete.")
print("[INFO] ALL complete. Dataset store located at:", store_path)
# -------------------------------
# CLI
# -------------------------------
def main():
parser = argparse.ArgumentParser(description="Synthetic LST dataset generator (reads YAML config)")
parser.add_argument("--config", "-c", type=str, default="config.yaml", help="Path to YAML config file")
parser.add_argument("--out", "-o", type=str, default=None, help="Output path (file or folder, depending on format)")
parser.add_argument("--test", action="store_true", help="Enable quick test mode (lower resolution, fewer days)")
args = parser.parse_args()
cfg = load_config(args.config)
cfg = apply_test_mode_overrides(cfg, args.test)
# ensure modulation config present to avoid key errors
if "modulation" not in cfg:
cfg["modulation"] = {
"spatial_field": {
"sigma_C": 0.8,
"corr_length_km": 10.0,
"temporal_modulation": 0.0
}
}
# create output dir if needed
out_path = args.out if args.out else cfg.get("output", {}).get("filename", None)
if out_path is None:
raise ValueError("No output path specified either by CLI or config file.")
# 若輸出路徑已存在,非測試模式下提示警告
if os.path.exists(out_path) and not args.test:
print(f"[WARNING] Output path '{out_path}' already exists. Existing data may be overwritten.")
# main function
generate(cfg, out_path, test_mode=args.test)
if __name__ == "__main__":
main()
|