#include "queue.h" #include "config.h" #include "json.h" #include "discord.h" #include "notify.h" #include "jellyfin.h" #include #include #include #include #include #include #include #include #include Download g_downloads[MAX_DL]; int g_dl_count = 0; pthread_mutex_t g_dl_mutex = PTHREAD_MUTEX_INITIALIZER; QNode *g_q_head = NULL; QNode *g_q_tail = NULL; pthread_mutex_t g_q_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t g_q_cond = PTHREAD_COND_INITIALIZER; /* ── History ──────────────────────────────────────────────────── */ void history_save(void) { char path[512]; config_history_path(path, sizeof(path)); FILE *f = fopen(path, "w"); if (!f) return; pthread_mutex_lock(&g_dl_mutex); fputc('[', f); for (int i = 0; i < g_dl_count; i++) { if (i > 0) fputc(',', f); fprintf(f, "{\"id\":"); fwrite_json_str(f, g_downloads[i].id); fprintf(f, ",\"name\":"); fwrite_json_str(f, g_downloads[i].name); fprintf(f, ",\"status\":"); fwrite_json_str(f, g_downloads[i].status); fprintf(f, ",\"cover_url\":"); fwrite_json_str(f, g_downloads[i].cover_url); fprintf(f, ",\"url\":"); fwrite_json_str(f, g_downloads[i].url); fprintf(f, ",\"dest_dir\":"); fwrite_json_str(f, g_downloads[i].dest_dir); fprintf(f, ",\"filename\":"); fwrite_json_str(f, g_downloads[i].filename); fprintf(f, ",\"downloaded\":%ld,\"total\":%ld}", g_downloads[i].downloaded, g_downloads[i].total); } fprintf(f, "]\n"); pthread_mutex_unlock(&g_dl_mutex); fclose(f); } void history_load(void) { char path[512]; config_history_path(path, sizeof(path)); FILE *f = fopen(path, "r"); if (!f) return; fseek(f, 0, SEEK_END); long sz = ftell(f); fseek(f, 0, SEEK_SET); char *buf = malloc(sz + 1); if (!buf) { fclose(f); return; } fread(buf, 1, sz, f); buf[sz] = 0; fclose(f); int n; char **arr = json_array(buf, &n); free(buf); if (!arr) return; for (int i = 0; i < n && g_dl_count < MAX_DL; i++) { char *id = json_str(arr[i], "id"); char *name = json_str(arr[i], "name"); char *status = json_str(arr[i], "status"); char *cover = json_str(arr[i], "cover_url"); char *url = json_str(arr[i], "url"); char *destdir = json_str(arr[i], "dest_dir"); char *fname = json_str(arr[i], "filename"); char *dl_s = json_str(arr[i], "downloaded"); char *tot_s = json_str(arr[i], "total"); if (id && name && status) { Download *d = &g_downloads[g_dl_count++]; d->active = 1; d->cancelled = 0; strncpy(d->id, id, 63); strncpy(d->name, name, 255); strncpy(d->status, status, 63); strncpy(d->cover_url,cover?cover:"", 511); strncpy(d->url, url?url:"", 511); strncpy(d->dest_dir, destdir?destdir:"", 511); strncpy(d->filename, fname?fname:"", 255); d->downloaded = dl_s ? atol(dl_s) : 0; d->total = tot_s ? atol(tot_s) : 0; d->speed_bps = 0; d->speed_ts = 0; d->speed_bytes = 0; if (strcmp(d->status,"downloading")==0 || strcmp(d->status,"queued")==0) strncpy(d->status, "interrupted", 63); } free(id); free(name); free(status); free(cover); free(url); free(destdir); free(fname); free(dl_s); free(tot_s); free(arr[i]); } free(arr); fprintf(stderr, "iptv-dl: loaded %d history entries\n", g_dl_count); } void clean_partials(void) { char cmd[512]; snprintf(cmd, sizeof(cmd), "find '%s' '%s' -name '*.part' -delete 2>/dev/null", g_cfg.dl_dir_tv, g_cfg.dl_dir_mov); system(cmd); } /* ── Download worker ──────────────────────────────────────────── */ static int xprogress(void *ud, curl_off_t total, curl_off_t now, curl_off_t ul_total, curl_off_t ul_now) { (void)ul_total; (void)ul_now; const char *id = (const char*)ud; int cancel = 0; time_t cur = time(NULL); pthread_mutex_lock(&g_dl_mutex); for (int i = 0; i < g_dl_count; i++) { if (strcmp(g_downloads[i].id, id) == 0) { g_downloads[i].downloaded = (long)now; g_downloads[i].total = (long)total; cancel = g_downloads[i].cancelled; if (g_downloads[i].speed_ts && cur != g_downloads[i].speed_ts) { long dt = (long)(cur - g_downloads[i].speed_ts); long db = (long)now - g_downloads[i].speed_bytes; if (dt > 0 && db >= 0) g_downloads[i].speed_bps = (float)db / dt; } if (cur != g_downloads[i].speed_ts) { g_downloads[i].speed_ts = cur; g_downloads[i].speed_bytes = (long)now; } break; } } pthread_mutex_unlock(&g_dl_mutex); return cancel; } static void do_download(DlTask *t) { char cmd[1024]; snprintf(cmd, sizeof(cmd), "mkdir -p '%s'", t->dest_dir); system(cmd); char dest[768]; snprintf(dest, sizeof(dest), "%s/%s", t->dest_dir, t->filename); char dest_part[780]; snprintf(dest_part, sizeof(dest_part), "%s.part", dest); pthread_mutex_lock(&g_dl_mutex); int already_cancelled = 0; for (int i = 0; i < g_dl_count; i++) { if (strcmp(g_downloads[i].id, t->id) == 0) { if (g_downloads[i].cancelled) { strncpy(g_downloads[i].status, "cancelled", 63); already_cancelled = 1; } else { strncpy(g_downloads[i].status, "downloading", 63); g_downloads[i].speed_bps = 0; g_downloads[i].speed_ts = 0; g_downloads[i].speed_bytes = 0; } break; } } pthread_mutex_unlock(&g_dl_mutex); if (already_cancelled) { history_save(); free(t); return; } history_save(); CURL *c = curl_easy_init(); FILE *f = fopen(dest_part, "wb"); if (!f) { pthread_mutex_lock(&g_dl_mutex); for (int i = 0; i < g_dl_count; i++) { if (strcmp(g_downloads[i].id, t->id) == 0) { snprintf(g_downloads[i].status, 63, "error: can't open file"); break; } } pthread_mutex_unlock(&g_dl_mutex); history_save(); curl_easy_cleanup(c); free(t); return; } curl_easy_setopt(c, CURLOPT_URL, t->url); curl_easy_setopt(c, CURLOPT_WRITEDATA, f); curl_easy_setopt(c, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(c, CURLOPT_TIMEOUT, CURL_DL_TIMEOUT); curl_easy_setopt(c, CURLOPT_XFERINFOFUNCTION, xprogress); curl_easy_setopt(c, CURLOPT_XFERINFODATA, t->id); curl_easy_setopt(c, CURLOPT_NOPROGRESS, 0L); if (g_cfg.bind_iface[0]) curl_easy_setopt(c, CURLOPT_INTERFACE, g_cfg.bind_iface); if (g_cfg.max_recv_speed > 0) curl_easy_setopt(c, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)g_cfg.max_recv_speed); CURLcode res = curl_easy_perform(c); fclose(f); curl_easy_cleanup(c); struct stat _st; int _nonempty = (stat(dest_part, &_st) == 0 && _st.st_size > 0); char notify_msg[512] = ""; int scan_jf = 0; pthread_mutex_lock(&g_dl_mutex); for (int i = 0; i < g_dl_count; i++) { if (strcmp(g_downloads[i].id, t->id) == 0) { g_downloads[i].speed_bps = 0; if (g_downloads[i].cancelled) { strncpy(g_downloads[i].status, "cancelled", 63); remove(dest_part); } else if (res == CURLE_OK && _nonempty) { rename(dest_part, dest); strncpy(g_downloads[i].status, "done", 63); snprintf(notify_msg, sizeof(notify_msg), ":white_check_mark: Downloaded: **%s**", t->name); scan_jf = 1; } else if (res == CURLE_OK && !_nonempty) { strncpy(g_downloads[i].status, "error: empty file (bad url?)", 63); remove(dest_part); snprintf(notify_msg, sizeof(notify_msg), ":x: Download failed: **%s** — empty file", t->name); } else { remove(dest_part); snprintf(g_downloads[i].status, 63, "error: %s", curl_easy_strerror(res)); snprintf(notify_msg, sizeof(notify_msg), ":x: Download failed: **%s** — %s", t->name, curl_easy_strerror(res)); } break; } } pthread_mutex_unlock(&g_dl_mutex); history_save(); if (notify_msg[0]) { discord_notify(notify_msg); notify_add(notify_msg, scan_jf ? "done" : "error"); } if (scan_jf) { trigger_jellyfin_scan(); update_show_manifest(t->dest_dir, t->name); } free(t); } void *dl_worker(void *arg) { (void)arg; for (;;) { pthread_mutex_lock(&g_q_mutex); while (!g_q_head) pthread_cond_wait(&g_q_cond, &g_q_mutex); QNode *node = g_q_head; g_q_head = g_q_head->next; if (!g_q_head) g_q_tail = NULL; pthread_mutex_unlock(&g_q_mutex); do_download(node->task); free(node); } return NULL; } /* ── Queue entry point ────────────────────────────────────────── */ void queue_download(const char *stream_id, const char *filename, const char *dest_dir, const char *name, int is_movie, const char *cover_url) { DlTask *t = calloc(1, sizeof(DlTask)); if (is_movie) snprintf(t->url, sizeof(t->url), "%s/movie/%s/%s/%s", g_cfg.stream_base, g_cfg.iptv_user, g_cfg.iptv_pass, stream_id); else snprintf(t->url, sizeof(t->url), "%s/series/%s/%s/%s", g_cfg.stream_base, g_cfg.iptv_user, g_cfg.iptv_pass, stream_id); strncpy(t->dest_dir, dest_dir, sizeof(t->dest_dir)-1); strncpy(t->filename, filename, sizeof(t->filename)-1); strncpy(t->name, name, sizeof(t->name)-1); strncpy(t->id, stream_id, sizeof(t->id)-1); strncpy(t->cover_url, cover_url?cover_url:"", sizeof(t->cover_url)-1); char full_path[800]; snprintf(full_path, sizeof(full_path), "%s/%s", dest_dir, filename); struct stat _exist; if (stat(full_path, &_exist) == 0 && _exist.st_size > 0) { free(t); return; } pthread_mutex_lock(&g_dl_mutex); for (int i = 0; i < g_dl_count; i++) { if (strcmp(g_downloads[i].id, stream_id) == 0 && (strcmp(g_downloads[i].status, "queued") == 0 || strcmp(g_downloads[i].status, "downloading") == 0)) { pthread_mutex_unlock(&g_dl_mutex); free(t); return; } } if (g_dl_count < MAX_DL) { Download *d = &g_downloads[g_dl_count++]; d->active = 1; d->cancelled = 0; strncpy(d->id, stream_id, 63); strncpy(d->name, name, 255); strncpy(d->status, "queued", 63); strncpy(d->cover_url,cover_url?cover_url:"", 511); strncpy(d->url, t->url, 511); strncpy(d->dest_dir, dest_dir, 511); strncpy(d->filename, filename, 255); d->is_movie = is_movie; d->downloaded = 0; d->total = 0; d->speed_bps = 0; d->speed_ts = 0; d->speed_bytes = 0; } pthread_mutex_unlock(&g_dl_mutex); history_save(); QNode *node = malloc(sizeof(QNode)); node->task = t; node->next = NULL; pthread_mutex_lock(&g_q_mutex); if (g_q_tail) g_q_tail->next = node; else g_q_head = node; g_q_tail = node; pthread_cond_signal(&g_q_cond); pthread_mutex_unlock(&g_q_mutex); }