随着越来越多的随机图API出现,质量也参差不齐。
可能图片也不是你喜欢的类型,可能你喜欢的又太慢了,本文就来教你从api再到图片获取全过程。

前提条件

  • Linux操作系统
  • 宝塔面板
  • 已安装PHP
  • 自定义域名用于网站
  • Python环境(需要用到图片爬虫的情况下)

实践操作

步骤1:创建项目

  1. 登录宝塔面板。
  2. 进入“网站”部分,点击“添加站点”。
  3. 按照提示添加您的网站。

步骤2:设置域名

  1. 创建两个文件夹:pcpe,这些文件夹位于您网站的根目录下。
  2. pc文件夹用于存放桌面用户的图片,pe文件夹用于存放移动设备用户的图片。

步骤3:创建PHP脚本

  1. 在网站的根目录中创建一个名为index.php的文件。
  2. 将以下PHP代码添加到index.php文件中:
  3. 可以通过自动判断也可以type参数来自定义返回什么类型的图片 也可以通过info参数来自定义返回类型
<?php
function getRandomImageFromFolder($folderPath, $allowedExtensions) {
    $imageArray = glob($folderPath . '/*.{'. implode(',', $allowedExtensions) .'}', GLOB_BRACE);

    if (count($imageArray) == 0) {
    header('Content-Type: application/json; charset=utf-8');
    die(json_encode(['error' => '未找到图片。请先将一些图片上传到指定文件夹。'], JSON_UNESCAPED_UNICODE));
    }



    $selectedImage = $imageArray[array_rand($imageArray)];
    return $selectedImage;
}

$agent = $_SERVER['HTTP_USER_AGENT'];
$scriptDirectory = __DIR__;  // 获取脚本所在目录的绝对路径

if (isset($_GET['type']) && $_GET['type'] === 'avatar') {
    $selectedImage = getRandomImageFromFolder($scriptDirectory . '/avatar', ['gif', 'jpg', 'png', 'jpeg', 'webp', 'bmp', 'JPG']);
} else if ((stripos($agent, 'android') !== false || stripos($agent, 'iphone') !== false) || isset($_GET['type']) && $_GET['type'] === 'pe') {
        $selectedImage = getRandomImageFromFolder($scriptDirectory . '/pe', ['gif', 'jpg', 'png', 'jpeg', 'webp', 'bmp', 'JPG']);
} else {
        $selectedImage = getRandomImageFromFolder($scriptDirectory . '/pc', ['gif', 'jpg', 'png', 'jpeg', 'webp', 'bmp']);
}


$imageInfo = getimagesize($selectedImage);
$imageWidth = $imageInfo[0];
$imageHeight = $imageInfo[1];
$imageContent = file_get_contents($selectedImage);

if (isset($_GET['info']) && $_GET['info'] == 'json') {
    header('Content-Type: application/json');
    $imageRelativePath = str_replace($scriptDirectory, '', $selectedImage);  // 获取相对于脚本目录的路径
    $baseURL = 'http://' . $_SERVER['HTTP_HOST'];
    $imageURL = $baseURL . $imageRelativePath;  // 构建图片的完整 URL
    echo json_encode([
        'image_url' => $imageURL,
        'width' => $imageWidth,
        'height' => $imageHeight
    ]);
} else {
    header('Content-Type: image/png');
    echo $imageContent;
}
?>
如果你觉得你采集的图像足够的多 可以将 getRandomImageFromFolder 方法换成下方代码,可避免高频率出现相同图片
function getRandomImageFromFolder($folderPath, $allowedExtensions) {
    // 检查会话是否已启动,如果未启动,就启动
    if (session_status() == PHP_SESSION_NONE) {
        session_start();
    }
    
    // 初始化数组以存储选定的图像和时间戳
    if (!isset($_SESSION['selectedImages'])) {
        $_SESSION['selectedImages'] = [];
        $_SESSION['lastSelectionTime'] = time();
    }
    
    // 检查自上次选择后是否已过5分钟
    if (time() - $_SESSION['lastSelectionTime'] >= 300) {
        $_SESSION['selectedImages'] = [];
        $_SESSION['lastSelectionTime'] = time();
    }
    
    $imageArray = glob($folderPath . '/*.{'. implode(',', $allowedExtensions) .'}', GLOB_BRACE);

    // 从图像数组中删除已经出现过的
    $availableImages = array_diff($imageArray, $_SESSION['selectedImages']);
    
    if (count($availableImages) == 0) {
        header('Content-Type: application/json; charset=utf-8');
        die(json_encode(['error' => '未找到图片。请先将一些图片上传到指定文件夹。'], JSON_UNESCAPED_UNICODE));
    }

    // 随机选择图像
    $selectedImage = $availableImages[array_rand($availableImages)];
    
    // 将所选图像存储在会话中并更新时间戳
    $_SESSION['selectedImages'][] = $selectedImage;
    $_SESSION['lastSelectionTime'] = time();
    
    return $selectedImage;
}

步骤4:注意事项和考虑

  1. PHP代码中包括逻辑以确定用户是从移动设备还是桌面电脑访问网站。
  2. 脚本会自动判断要去的文件夹加载随机图片,取决于设备类型。
  3. 确保根据您的偏好和文件夹结构调整图像文件扩展名和路径。
  4. 默认是返回图片 可以选择提供info参数为json 将返回图片信息以及url

获取图片

爬虫

微信公众号文章图片

import re
import requests
import os
import random
import string
from bs4 import BeautifulSoup
import urllib
from PIL import Image

def generate_random_string(length):
    letters_and_digits = string.ascii_letters + string.digits
    return ''.join(random.choice(letters_and_digits) for _ in range(length))

def wechat_picture_download(url, path):
    # 头部文件
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36',
        'Connection': 'close'
    }
    # 请求
    response = requests.get(url, headers=headers)
    print(response.status_code)  # 200 代表正常返回
    response.raise_for_status()  # 如果异常抛出错误
    # bs4解析
    bs = BeautifulSoup(response.text, 'html.parser')
    title = bs.select('h1')[0].text.strip()
    # title = bs.find_all('h1')[0].text.strip()
    # 过滤标题中的中文
    title = re.findall('[\u4e00-\u9fa5a-zA-Z0-9]+', title, re.S)
    title = '-'.join(title)
    # 解析
    imglist = []
    for img in bs.select('img'):
        if 'data-src' in img.attrs:
            imglist.append(img['data-src'])
    # 判断路径是否存在
    if not os.path.exists(path):
        os.mkdir(path)
    if not os.path.exists(os.path.join(path, title)):
        os.mkdir(os.path.join(path, title))
    num = 0
    for jpg_url in imglist:
        try:
            random_filename = generate_random_string(10)  # 随机文件名
            print('正在下载第', str(num + 1) + '张图片')
            urllib.request.urlretrieve(jpg_url, os.path.join(path, title, random_filename + '.png'))
            # time.sleep(1)
            num += 1
        except Exception as reason:
            print(str(reason))
    print('-' * 10, '全部下载完成!!', '-' * 10)
    # 过滤图片
    file_list = os.listdir(os.path.join(path, title))
    for file in file_list:
        if file.split('.')[-1] == 'png':
            filename = os.path.join(path, title, file)
            img = Image.open(filename)
            imgSize = img.size
            img.close()
            if imgSize[0] > 100 and imgSize[1] > 100:
                pass
                # print(imgSize)
            else:
                os.remove(filename)  # 删除文件
                print('正在删除:', file)
    print('-' * 10, '过滤完成!!', '-' * 10)
    return True


# 待爬取网站
url = r'https://mp.weixin.qq.com/s/MbLpuD0P9xFeGAInJ9DoVg'  # 文章链接
path = r'img' #保存图片的文件夹

wechat_picture_download(url, path)

B站Up主动态相簿

进入Up主主页 链接最后面那串数字就是Uid

import json
import os
import time

import requests

def random_ua():
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36',
        'Connection': 'close'
    }
    return headers

def get_content(next_offset,host_uid):
    urlone = 'https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history?visitor_uid=283764718&host_uid='+ host_uid +'&offset_dynamic_id=' + next_offset + '&need_top=1&platform=web'
    print("获取网址为" + urlone)
    try:
        r1 = requests.get(urlone, headers=random_ua(), timeout=10)
        print("连接成功!")
        url_list = []
        data = json.loads(r1.content)
        next_offset = data['data']['next_offset']
        has_more = data['data']['has_more']
        for dict_data in data['data']['cards']:
            data2 = json.loads(dict_data['card'])
            if (dict_data['card'][2] == 'i' or dict_data['card'][2] == 'c'):#i和c是具体读取到的数据的区分,我自己做的区分
                for item in data2['item']['pictures']:
                    url_list.append(item['img_src'])
            elif (dict_data['card'][2] == 'a'):
                continue  #这种情况发的是视频,没有图片所以直接略过了
                # print(data2['first_frame'])
        return url_list, next_offset, has_more
    except Exception:
        print(Exception)
        print("链接网络过程中出错!")
    finally:
        print("连接网络后返回!")

#  创建文件夹
def path_creat():
    _path = "b_img"
    if not os.path.exists(_path):
        os.mkdir(_path)
    return _path
# 根据url下载图片
def download(url, file_name):
    try:
        image = requests.get(url=url, headers=random_ua()).content  # 获取图片

    except Exception:
        print(Exception)
        print("网络请求出错!")
    try:
        with open(file_name, 'wb') as f:
            f.write(image)
            f.close()
    except Exception:
        f.close()
        print(Exception)
        print("保存文件过程出错!")

# 主函数
if __name__ == '__main__':
    path = path_creat()  # 创建保存B站封面的文件夹
    has_more = 1
    count = 0
    next_offset = "0"#初始化一些参数
    host_uid = "168687092"#up主账号
    #具体的链接写在方法里面了,要在里面组合一下,可以随时修改
    #在初始化的第一页请求中,next_offset确实是等于0的
    while (has_more == 1):
        imageurl_lists, next_offset, has_more = get_content(str(next_offset))
        print("next=" + str(next_offset))
        print("has=" + str(has_more))
        # print(type(has_more))
        time_start = time.time()  # 获取开始时间
        numb = len(imageurl_lists)
        if imageurl_lists == None:
            continue
        for it in imageurl_lists:
            strl = it.split("/")
            download(it, path + strl[5])#切割了一下,用来保存图片当成文件名,还可以防止图片格式不对
        count += 1
        print("循环结束!第" + str(count))
        time_end = time.time()  # 获取结束时间
        runtime = time_end - time_start  # 运行时间
        runtime = time.strftime("%H:%M:%S", time.gmtime(runtime))  # 将运行时间转换成时分秒格式
        print('第' + str(count) + '轮运行结束,有' + str(numb) + '组数据,已用运行时间为', runtime)

    time_end = time.time()  # 获取结束时间
    runtime = time_end - time_start  # 运行时间
    runtime = time.strftime("%H:%M:%S", time.gmtime(runtime))  # 将运行时间转换成时分秒格式
    print('运行结束,总共运行'+str(count)+'轮,总使用运行时间为', runtime)
    print('finished')


B站Up主动态专栏

import os
import random
import string
import time
import requests
from bs4 import BeautifulSoup


def generate_random_string(length):
    letters_and_digits = string.ascii_letters + string.digits
    return ''.join(random.choice(letters_and_digits) for _ in range(length))


def get_image_urls_from_article(url):
    html = requests.get(url).text
    soup = BeautifulSoup(html, 'html.parser')
    img_attr = soup.find_all("img")
    url_list = []
    for i in img_attr:
        url_list.append("https:" + i["data-src"])
    return url_list


def download_image(url, output_directory):
    random_filename = generate_random_string(10)
    r = requests.get(url)
    if r.status_code == 200:
        image_path = os.path.join(output_directory, random_filename + '.png')
        with open(image_path, 'wb') as f:
            f.write(r.content)
        print("Image downloaded:", url)
    else:
        print("Failed to download:", url)


def main(mid):
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36',
        'Connection': 'close'
    }

    upurl = f"https://api.bilibili.com/x/space/wbi/article?mid={mid}&pn=1&ps=30&sort=publish_time&web_location=1550101&platform=web&w_rid=167ebb32914ec5324a8f92d9f06ad149&wts=1693338648"
    output_directory = 'b_img_ss'
    if not os.path.exists(output_directory):
        os.mkdir(output_directory)

    while True:
        r = requests.get(upurl, headers=headers, timeout=10)
        data = r.json()
        articles = data['data']['articles']


        for article in articles:
            article_url = 'https://www.bilibili.com/read/cv' + str(article['id'])
            image_urls = get_image_urls_from_article(article_url)

            for image_url in image_urls:
                download_image(image_url, output_directory)
                # time.sleep(1)

            # exit(0)
        if data['data']['pn'] * data['data']['ps'] >= data['data']['count']:
            break

        upurl = f"https://api.bilibili.com/x/space/wbi/article?mid={mid}&pn={data['data']['pn'] + 1}&ps=30&sort=publish_time&web_location=1550101&platform=web&w_rid=167ebb32914ec5324a8f92d9f06ad149&wts=1693338648"
        # time.sleep(1)


if __name__ == "__main__":
    mid = '106450408' #up主uid
    main(mid)

图片压缩不推荐

压缩效果最好,但同时容易将复杂图像压的糊成一片,可以自行测试调试参数为最优解
import os
import concurrent.futures
from PIL import Image


def compress_and_convert_image(input_path, output_path_jpeg, output_path_webp, quality_jpeg, quality_webp):
    try:
        img = Image.open(input_path)

        img_jpeg = img.convert("RGB")
        img_jpeg.save(output_path_jpeg, format="JPEG", quality=quality_jpeg, optimize=True)
        print(f"已压缩并保存为JPEG:{output_path_jpeg}")

        img.save(output_path_webp, format="WebP", quality=quality_webp)
        print(f"已保存为WebP:{output_path_webp}")

    except (IOError, OSError, Image.UnidentifiedImageError) as e:
        print(f"无法处理图像 '{input_path}': {e}")


def process_images_multithread(input_folder, output_folder_jpeg, output_folder_webp, quality_jpeg, quality_webp,
                               num_threads=4):
    try:
        if not os.path.exists(output_folder_jpeg):
            os.makedirs(output_folder_jpeg)
        if not os.path.exists(output_folder_webp):
            os.makedirs(output_folder_webp)

        image_files = [f for f in os.listdir(input_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif'))]

        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            for image_file in image_files:
                input_path = os.path.join(input_folder, image_file)
                output_jpeg_path = os.path.join(output_folder_jpeg, os.path.splitext(image_file)[0] + ".jpg")
                output_webp_path = os.path.join(output_folder_webp, image_file.replace(".", ".webp"))

                executor.submit(compress_and_convert_image, input_path, output_jpeg_path, output_webp_path,
                                quality_jpeg, quality_webp)
    except Exception as e:
        print(f"发生错误:{e}")


# 输入和输出文件夹路径
input_folder_path = 'b_img_s'  # 输入图像文件夹路径
output_folder_path_jpeg = 'b_img_s_low'  # 输出压缩后JPEG图像文件夹路径
output_folder_path_webp = 'b_img_s_low_webp'  # 输出WebP图像文件夹路径
compression_quality_jpeg = 15  # JPEG压缩质量
compression_quality_webp = 15  # WebP压缩质量
num_threads = 8  # 并发线程数

process_images_multithread(input_folder_path, output_folder_path_jpeg, output_folder_path_webp,
                           compression_quality_jpeg, compression_quality_webp, num_threads)

图片分类

import os
from PIL import Image

def identify_and_categorize_images(folder_path):
    avatar_folder = "img/avatar" #存放方形图
    pc_folder = "img/pc" # 存放横向图
    pe_folder = "img/pe" # 存放竖向图

    os.makedirs(avatar_folder, exist_ok=True)
    os.makedirs(pc_folder, exist_ok=True)
    os.makedirs(pe_folder, exist_ok=True)

    image_files = os.listdir(folder_path)
    for image_file in image_files:
        image_path = os.path.join(folder_path, image_file)
        with Image.open(image_path) as img:
            width, height = img.size
            avg_dimension = (width + height) / 2
            tolerance = avg_dimension * 0.22  # 22% 容差值

            if abs(width - height) <= tolerance:
                destination_folder = avatar_folder
            elif width > height:
                destination_folder = pc_folder
            else:
                destination_folder = pe_folder

            destination_path = os.path.join(destination_folder, image_file)
            img.close()  # Close the image handle before moving
            os.rename(image_path, destination_path)

if __name__ == "__main__":
    folder_path = "b_img_low" ## 需要分类的图片文件夹路径
    identify_and_categorize_images(folder_path)

图像转换推荐

同样实现了图像压缩功能 且对影响图像质量最小
import os
from PIL import Image
import concurrent.futures

input_folder = 'img/pc_yt'  # 原图路径
output_folder_jpeg = 'img/pc_jpeg'  # jpg转换保存路径
output_folder_webp = 'img/pc_webp'  # webp转换保存路径

def convert_image(filename):
    if filename.lower().endswith(('.png', '.jpeg', '.jpg')):
        img_path = os.path.join(input_folder, filename)
        img = Image.open(img_path)

        width, height = img.size
        if width > 1920 or height > 1080:
            img.thumbnail((1920, 1080))
            print("调整分辨率: {}".format(filename))
        else:
            print("跳过图像: {}".format(filename))

        img_jpeg = img.convert('RGB')
        jpeg_output_filename = os.path.splitext(filename)[0] + '.jpg'
        jpeg_output_path = os.path.join(output_folder_jpeg, jpeg_output_filename)
        img_jpeg.save(jpeg_output_path, format='JPEG', quality=80)

        img_webp = img.convert('RGB')
        webp_output_filename = os.path.splitext(filename)[0] + '.webp'
        webp_output_path = os.path.join(output_folder_webp, webp_output_filename)
        img_webp.save(webp_output_path, format='WebP', quality=80)

if __name__ == '__main__':
    # 创建输出文件夹
    if not os.path.exists(output_folder_jpeg):
        os.makedirs(output_folder_jpeg)
    if not os.path.exists(output_folder_webp):
        os.makedirs(output_folder_webp)

    process_count = 4  # 自定义进程数量
    with concurrent.futures.ProcessPoolExecutor(max_workers=process_count) as executor:
        for filename in os.listdir(input_folder):
            executor.submit(convert_image, filename)

    print("转换完成!")

去重

import os
import hashlib
from PIL import Image
from shutil import move

def calculate_md5(file_path):
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def find_duplicate_images(folder_path):
    image_hashes = {}
    duplicate_images = []

    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            file_path = os.path.join(root, filename)
            if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif','.webp')):
                image_hash = calculate_md5(file_path)
                if image_hash in image_hashes:
                    duplicate_images.append(file_path)
                else:
                    image_hashes[image_hash] = file_path

    return duplicate_images

def main():
    source_folder = "temp" //需要去重图片文件夹
    move_folder = "temp_move" //重复图片移动文件夹

    if not os.path.exists(move_folder):
        os.makedirs(move_folder)

    duplicate_images = find_duplicate_images(source_folder)

    for duplicate_image in duplicate_images:
        new_path = os.path.join(move_folder, os.path.basename(duplicate_image))
        move(duplicate_image, new_path)
        print(f"Moved: {duplicate_image} -> {new_path}")

if __name__ == "__main__":
    main()